mirror of
https://github.com/meshtastic/python.git
synced 2026-04-17 13:31:43 -04:00
Merge pull request #898 from skgsergio/feat/esp32-unified-ota
feat: Add ESP32 WiFi Unified OTA update support
This commit is contained in:
@@ -37,6 +37,7 @@ try:
|
||||
except ImportError as e:
|
||||
have_test = False
|
||||
|
||||
import meshtastic.ota
|
||||
import meshtastic.util
|
||||
import meshtastic.serial_interface
|
||||
import meshtastic.tcp_interface
|
||||
@@ -60,7 +61,7 @@ except ImportError as e:
|
||||
have_powermon = False
|
||||
powermon_exception = e
|
||||
meter = None
|
||||
from meshtastic.protobuf import channel_pb2, config_pb2, portnums_pb2, mesh_pb2
|
||||
from meshtastic.protobuf import admin_pb2, channel_pb2, config_pb2, portnums_pb2, mesh_pb2
|
||||
from meshtastic.version import get_active_version
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -452,6 +453,41 @@ def onConnected(interface):
|
||||
waitForAckNak = True
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).rebootOTA()
|
||||
|
||||
if args.ota_update:
|
||||
closeNow = True
|
||||
waitForAckNak = True
|
||||
|
||||
if not isinstance(interface, meshtastic.tcp_interface.TCPInterface):
|
||||
meshtastic.util.our_exit(
|
||||
"Error: OTA update currently requires a TCP connection to the node (use --host)."
|
||||
)
|
||||
|
||||
ota = meshtastic.ota.ESP32WiFiOTA(args.ota_update, interface.hostname)
|
||||
|
||||
print(f"Triggering OTA update on {interface.hostname}...")
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).startOTA(
|
||||
ota_mode=admin_pb2.OTAMode.OTA_WIFI,
|
||||
ota_file_hash=ota.hash_bytes()
|
||||
)
|
||||
|
||||
print("Waiting for device to reboot into OTA mode...")
|
||||
time.sleep(5)
|
||||
|
||||
retries = 5
|
||||
while retries > 0:
|
||||
try:
|
||||
ota.update()
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
retries -= 1
|
||||
if retries == 0:
|
||||
meshtastic.util.our_exit(f"\nOTA update failed: {e}")
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
print("\nOTA update completed successfully!")
|
||||
|
||||
if args.enter_dfu:
|
||||
closeNow = True
|
||||
waitForAckNak = True
|
||||
@@ -1912,10 +1948,18 @@ def addRemoteAdminArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars
|
||||
|
||||
group.add_argument(
|
||||
"--reboot-ota",
|
||||
help="Tell the destination node to reboot into factory firmware (ESP32)",
|
||||
help="Tell the destination node to reboot into factory firmware (ESP32, firmware version <2.7.18)",
|
||||
action="store_true",
|
||||
)
|
||||
|
||||
group.add_argument(
|
||||
"--ota-update",
|
||||
help="Perform an OTA update on the local node (ESP32, firmware version >=2.7.18, WiFi/TCP only for now). "
|
||||
"Specify the path to the firmware file.",
|
||||
metavar="FIRMWARE_FILE",
|
||||
action="store",
|
||||
)
|
||||
|
||||
group.add_argument(
|
||||
"--enter-dfu",
|
||||
help="Tell the destination node to enter DFU mode (NRF52)",
|
||||
|
||||
@@ -655,7 +655,7 @@ class Node:
|
||||
return self._sendAdmin(p, onResponse=onResponse)
|
||||
|
||||
def rebootOTA(self, secs: int = 10):
|
||||
"""Tell the node to reboot into factory firmware."""
|
||||
"""Tell the node to reboot into factory firmware (firmware < 2.7.18)."""
|
||||
self.ensureSessionKey()
|
||||
p = admin_pb2.AdminMessage()
|
||||
p.reboot_ota_seconds = secs
|
||||
@@ -668,6 +668,22 @@ class Node:
|
||||
onResponse = self.onAckNak
|
||||
return self._sendAdmin(p, onResponse=onResponse)
|
||||
|
||||
def startOTA(
|
||||
self,
|
||||
ota_mode: admin_pb2.OTAMode.ValueType,
|
||||
ota_file_hash: bytes,
|
||||
):
|
||||
"""Tell the node to start OTA mode (firmware >= 2.7.18)."""
|
||||
if self != self.iface.localNode:
|
||||
raise ValueError("startOTA only possible in local node")
|
||||
|
||||
self.ensureSessionKey()
|
||||
p = admin_pb2.AdminMessage()
|
||||
p.ota_request.reboot_ota_mode = ota_mode
|
||||
p.ota_request.ota_hash = ota_file_hash
|
||||
|
||||
return self._sendAdmin(p)
|
||||
|
||||
def enterDFUMode(self):
|
||||
"""Tell the node to enter DFU mode (NRF52)."""
|
||||
self.ensureSessionKey()
|
||||
|
||||
128
meshtastic/ota.py
Normal file
128
meshtastic/ota.py
Normal file
@@ -0,0 +1,128 @@
|
||||
"""Meshtastic ESP32 Unified OTA
|
||||
"""
|
||||
import os
|
||||
import hashlib
|
||||
import socket
|
||||
import logging
|
||||
from typing import Optional, Callable
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _file_sha256(filename: str):
|
||||
"""Calculate SHA256 hash of a file."""
|
||||
sha256_hash = hashlib.sha256()
|
||||
|
||||
with open(filename, "rb") as f:
|
||||
for byte_block in iter(lambda: f.read(4096), b""):
|
||||
sha256_hash.update(byte_block)
|
||||
|
||||
return sha256_hash
|
||||
|
||||
|
||||
class OTAError(Exception):
|
||||
"""Exception for OTA errors."""
|
||||
|
||||
|
||||
class ESP32WiFiOTA:
|
||||
"""ESP32 WiFi Unified OTA updates."""
|
||||
|
||||
def __init__(self, filename: str, hostname: str, port: int = 3232):
|
||||
self._filename = filename
|
||||
self._hostname = hostname
|
||||
self._port = port
|
||||
self._socket: Optional[socket.socket] = None
|
||||
|
||||
if not os.path.exists(self._filename):
|
||||
raise FileNotFoundError(f"File {self._filename} does not exist")
|
||||
|
||||
self._file_hash = _file_sha256(self._filename)
|
||||
|
||||
def _read_line(self) -> str:
|
||||
"""Read a line from the socket."""
|
||||
if not self._socket:
|
||||
raise ConnectionError("Socket not connected")
|
||||
|
||||
line = b""
|
||||
while not line.endswith(b"\n"):
|
||||
char = self._socket.recv(1)
|
||||
|
||||
if not char:
|
||||
raise ConnectionError("Connection closed while waiting for response")
|
||||
|
||||
line += char
|
||||
|
||||
return line.decode("utf-8").strip()
|
||||
|
||||
def hash_bytes(self) -> bytes:
|
||||
"""Return the hash as bytes."""
|
||||
return self._file_hash.digest()
|
||||
|
||||
def hash_hex(self) -> str:
|
||||
"""Return the hash as a hex string."""
|
||||
return self._file_hash.hexdigest()
|
||||
|
||||
def update(self, progress_callback: Optional[Callable[[int, int], None]] = None):
|
||||
"""Perform the OTA update."""
|
||||
with open(self._filename, "rb") as f:
|
||||
data = f.read()
|
||||
size = len(data)
|
||||
|
||||
logger.info(f"Starting OTA update with {self._filename} ({size} bytes, hash {self.hash_hex()})")
|
||||
|
||||
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._socket.settimeout(15)
|
||||
try:
|
||||
self._socket.connect((self._hostname, self._port))
|
||||
logger.debug(f"Connected to {self._hostname}:{self._port}")
|
||||
|
||||
# Send start command
|
||||
self._socket.sendall(f"OTA {size} {self.hash_hex()}\n".encode("utf-8"))
|
||||
|
||||
# Wait for OK from the device
|
||||
while True:
|
||||
response = self._read_line()
|
||||
if response == "OK":
|
||||
break
|
||||
|
||||
if response == "ERASING":
|
||||
logger.info("Device is erasing flash...")
|
||||
elif response.startswith("ERR "):
|
||||
raise OTAError(f"Device reported error: {response}")
|
||||
else:
|
||||
logger.warning(f"Unexpected response: {response}")
|
||||
|
||||
# Stream firmware
|
||||
sent_bytes = 0
|
||||
chunk_size = 1024
|
||||
while sent_bytes < size:
|
||||
chunk = data[sent_bytes : sent_bytes + chunk_size]
|
||||
self._socket.sendall(chunk)
|
||||
sent_bytes += len(chunk)
|
||||
|
||||
if progress_callback:
|
||||
progress_callback(sent_bytes, size)
|
||||
else:
|
||||
print(f"[{sent_bytes / size * 100:5.1f}%] Sent {sent_bytes} of {size} bytes...", end="\r")
|
||||
|
||||
if not progress_callback:
|
||||
print()
|
||||
|
||||
# Wait for OK from device
|
||||
logger.info("Firmware sent, waiting for verification...")
|
||||
while True:
|
||||
response = self._read_line()
|
||||
if response == "OK":
|
||||
logger.info("OTA update completed successfully!")
|
||||
break
|
||||
|
||||
if response.startswith("ERR "):
|
||||
raise OTAError(f"OTA update failed: {response}")
|
||||
elif response != "ACK":
|
||||
logger.warning(f"Unexpected final response: {response}")
|
||||
|
||||
finally:
|
||||
if self._socket:
|
||||
self._socket.close()
|
||||
self._socket = None
|
||||
@@ -6,6 +6,7 @@ import os
|
||||
import platform
|
||||
import re
|
||||
import sys
|
||||
import tempfile
|
||||
from unittest.mock import mock_open, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
@@ -2900,3 +2901,68 @@ def test_main_set_ham_empty_string(capsys):
|
||||
out, _ = capsys.readouterr()
|
||||
assert "ERROR: Ham radio callsign cannot be empty or contain only whitespace characters" in out
|
||||
assert excinfo.value.code == 1
|
||||
|
||||
|
||||
# OTA-related tests
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_mt_config")
|
||||
def test_main_ota_update_file_not_found(capsys):
|
||||
"""Test --ota-update with non-existent file"""
|
||||
sys.argv = [
|
||||
"",
|
||||
"--ota-update",
|
||||
"/nonexistent/firmware.bin",
|
||||
"--host",
|
||||
"192.168.1.100",
|
||||
]
|
||||
mt_config.args = sys.argv
|
||||
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
main()
|
||||
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
assert pytest_wrapped_e.value.code == 1
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_mt_config")
|
||||
@patch("meshtastic.ota.ESP32WiFiOTA")
|
||||
@patch("meshtastic.__main__.meshtastic.util.our_exit")
|
||||
def test_main_ota_update_retries(mock_our_exit, mock_ota_class, capsys):
|
||||
"""Test --ota-update retries on failure"""
|
||||
# Create a temporary firmware file
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
f.write(b"fake firmware data")
|
||||
firmware_file = f.name
|
||||
|
||||
try:
|
||||
sys.argv = ["", "--ota-update", firmware_file, "--host", "192.168.1.100"]
|
||||
mt_config.args = sys.argv
|
||||
|
||||
# Mock the OTA class to fail all 5 retries
|
||||
mock_ota = MagicMock()
|
||||
mock_ota_class.return_value = mock_ota
|
||||
mock_ota.hash_bytes.return_value = b"\x00" * 32
|
||||
mock_ota.hash_hex.return_value = "a" * 64
|
||||
mock_ota.update.side_effect = Exception("Connection failed")
|
||||
|
||||
# Mock isinstance to return True
|
||||
with patch("meshtastic.__main__.isinstance", return_value=True):
|
||||
with patch("meshtastic.tcp_interface.TCPInterface") as mock_tcp:
|
||||
mock_iface = MagicMock()
|
||||
mock_iface.hostname = "192.168.1.100"
|
||||
mock_iface.localNode = MagicMock(autospec=Node)
|
||||
mock_tcp.return_value = mock_iface
|
||||
|
||||
with patch("time.sleep"):
|
||||
main()
|
||||
|
||||
# Should have exhausted all retries and called our_exit
|
||||
# Note: our_exit might be called twice - once for TCP check, once for failure
|
||||
assert mock_our_exit.call_count >= 1
|
||||
# Check the last call was for OTA failure
|
||||
last_call_args = mock_our_exit.call_args[0][0]
|
||||
assert "OTA update failed" in last_call_args
|
||||
|
||||
finally:
|
||||
os.unlink(firmware_file)
|
||||
|
||||
@@ -1574,6 +1574,41 @@ def test_setOwner_valid_names(caplog):
|
||||
assert re.search(r'p.set_owner.short_name:VN:', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_start_ota_local_node():
|
||||
"""Test startOTA on local node"""
|
||||
iface = MagicMock(autospec=MeshInterface)
|
||||
anode = Node(iface, 1234567890, noProto=True)
|
||||
# Set up as local node
|
||||
iface.localNode = anode
|
||||
|
||||
amesg = admin_pb2.AdminMessage()
|
||||
with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
|
||||
with patch.object(anode, "_sendAdmin") as mock_send_admin:
|
||||
test_hash = b"\x01\x02\x03" * 8 # 24 bytes hash
|
||||
anode.startOTA(ota_mode=admin_pb2.OTAMode.OTA_WIFI, ota_file_hash=test_hash)
|
||||
|
||||
# Verify the OTA request was set correctly
|
||||
assert amesg.ota_request.reboot_ota_mode == admin_pb2.OTAMode.OTA_WIFI
|
||||
assert amesg.ota_request.ota_hash == test_hash
|
||||
mock_send_admin.assert_called_once_with(amesg)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_start_ota_remote_node_raises_error():
|
||||
"""Test startOTA on remote node raises ValueError"""
|
||||
iface = MagicMock(autospec=MeshInterface)
|
||||
local_node = Node(iface, 1234567890, noProto=True)
|
||||
remote_node = Node(iface, 9876543210, noProto=True)
|
||||
iface.localNode = local_node
|
||||
|
||||
test_hash = b"\x01\x02\x03" * 8
|
||||
with pytest.raises(ValueError, match="startOTA only possible in local node"):
|
||||
remote_node.startOTA(
|
||||
ota_mode=admin_pb2.OTAMode.OTA_WIFI, ota_file_hash=test_hash
|
||||
)
|
||||
|
||||
|
||||
# TODO
|
||||
# @pytest.mark.unitslow
|
||||
# def test_waitForConfig():
|
||||
|
||||
455
meshtastic/tests/test_ota.py
Normal file
455
meshtastic/tests/test_ota.py
Normal file
@@ -0,0 +1,455 @@
|
||||
"""Meshtastic unit tests for ota.py"""
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import tempfile
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from meshtastic.ota import (
|
||||
_file_sha256,
|
||||
ESP32WiFiOTA,
|
||||
OTAError,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_file_sha256():
|
||||
"""Test _file_sha256 calculates correct hash"""
|
||||
# Create a temporary file with known content
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
test_data = b"Hello, World!"
|
||||
f.write(test_data)
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
result = _file_sha256(temp_file)
|
||||
expected_hash = hashlib.sha256(test_data).hexdigest()
|
||||
assert result.hexdigest() == expected_hash
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_file_sha256_large_file():
|
||||
"""Test _file_sha256 handles files larger than chunk size"""
|
||||
# Create a temporary file with more than 4096 bytes
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
test_data = b"A" * 8192 # More than 4096 bytes
|
||||
f.write(test_data)
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
result = _file_sha256(temp_file)
|
||||
expected_hash = hashlib.sha256(test_data).hexdigest()
|
||||
assert result.hexdigest() == expected_hash
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_esp32_wifi_ota_init_file_not_found():
|
||||
"""Test ESP32WiFiOTA raises FileNotFoundError for non-existent file"""
|
||||
with pytest.raises(FileNotFoundError, match="does not exist"):
|
||||
ESP32WiFiOTA("/nonexistent/firmware.bin", "192.168.1.1")
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_esp32_wifi_ota_init_success():
|
||||
"""Test ESP32WiFiOTA initializes correctly with valid file"""
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
f.write(b"fake firmware data")
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
ota = ESP32WiFiOTA(temp_file, "192.168.1.1", 3232)
|
||||
assert ota._filename == temp_file
|
||||
assert ota._hostname == "192.168.1.1"
|
||||
assert ota._port == 3232
|
||||
assert ota._socket is None
|
||||
# Verify hash is calculated
|
||||
assert ota._file_hash is not None
|
||||
assert len(ota.hash_hex()) == 64 # SHA256 hex is 64 chars
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_esp32_wifi_ota_init_default_port():
|
||||
"""Test ESP32WiFiOTA uses default port 3232"""
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
f.write(b"fake firmware data")
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
||||
assert ota._port == 3232
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_esp32_wifi_ota_hash_bytes():
|
||||
"""Test hash_bytes returns correct bytes"""
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
test_data = b"firmware data"
|
||||
f.write(test_data)
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
||||
hash_bytes = ota.hash_bytes()
|
||||
expected_bytes = hashlib.sha256(test_data).digest()
|
||||
assert hash_bytes == expected_bytes
|
||||
assert len(hash_bytes) == 32 # SHA256 is 32 bytes
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_esp32_wifi_ota_hash_hex():
|
||||
"""Test hash_hex returns correct hex string"""
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
test_data = b"firmware data"
|
||||
f.write(test_data)
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
||||
hash_hex = ota.hash_hex()
|
||||
expected_hex = hashlib.sha256(test_data).hexdigest()
|
||||
assert hash_hex == expected_hex
|
||||
assert len(hash_hex) == 64 # SHA256 hex is 64 chars
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_esp32_wifi_ota_read_line_not_connected():
|
||||
"""Test _read_line raises ConnectionError when not connected"""
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
f.write(b"firmware")
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
||||
with pytest.raises(ConnectionError, match="Socket not connected"):
|
||||
ota._read_line()
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_esp32_wifi_ota_read_line_connection_closed():
|
||||
"""Test _read_line raises ConnectionError when connection closed"""
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
f.write(b"firmware")
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
||||
mock_socket = MagicMock()
|
||||
# Simulate connection closed
|
||||
mock_socket.recv.return_value = b""
|
||||
ota._socket = mock_socket
|
||||
|
||||
with pytest.raises(ConnectionError, match="Connection closed"):
|
||||
ota._read_line()
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_esp32_wifi_ota_read_line_success():
|
||||
"""Test _read_line successfully reads a line"""
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
f.write(b"firmware")
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
||||
mock_socket = MagicMock()
|
||||
# Simulate receiving "OK\n"
|
||||
mock_socket.recv.side_effect = [b"O", b"K", b"\n"]
|
||||
ota._socket = mock_socket
|
||||
|
||||
result = ota._read_line()
|
||||
assert result == "OK"
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch("meshtastic.ota.socket.socket")
|
||||
def test_esp32_wifi_ota_update_success(mock_socket_class):
|
||||
"""Test update() with successful OTA"""
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
test_data = b"A" * 1024 # 1KB of data
|
||||
f.write(test_data)
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
# Setup mock socket
|
||||
mock_socket = MagicMock()
|
||||
mock_socket_class.return_value = mock_socket
|
||||
|
||||
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
||||
|
||||
# Mock _read_line to return appropriate responses
|
||||
# First call: ERASING, Second call: OK (ready), Third call: OK (complete)
|
||||
with patch.object(ota, "_read_line") as mock_read_line:
|
||||
mock_read_line.side_effect = [
|
||||
"ERASING", # Device is erasing flash
|
||||
"OK", # Device ready for firmware
|
||||
"OK", # Device finished successfully
|
||||
]
|
||||
|
||||
ota.update()
|
||||
|
||||
# Verify socket was created and connected
|
||||
mock_socket_class.assert_called_once_with(
|
||||
socket.AF_INET, socket.SOCK_STREAM
|
||||
)
|
||||
mock_socket.settimeout.assert_called_once_with(15)
|
||||
mock_socket.connect.assert_called_once_with(("192.168.1.1", 3232))
|
||||
|
||||
# Verify start command was sent
|
||||
start_cmd = f"OTA {len(test_data)} {ota.hash_hex()}\n".encode("utf-8")
|
||||
mock_socket.sendall.assert_any_call(start_cmd)
|
||||
|
||||
# Verify firmware was sent (at least one chunk)
|
||||
assert mock_socket.sendall.call_count >= 2
|
||||
|
||||
# Verify socket was closed
|
||||
mock_socket.close.assert_called_once()
|
||||
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch("meshtastic.ota.socket.socket")
|
||||
def test_esp32_wifi_ota_update_with_progress_callback(mock_socket_class):
|
||||
"""Test update() with progress callback"""
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
test_data = b"A" * 1024 # 1KB of data
|
||||
f.write(test_data)
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
# Setup mock socket
|
||||
mock_socket = MagicMock()
|
||||
mock_socket_class.return_value = mock_socket
|
||||
|
||||
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
||||
|
||||
# Track progress callback calls
|
||||
progress_calls = []
|
||||
|
||||
def progress_callback(sent, total):
|
||||
progress_calls.append((sent, total))
|
||||
|
||||
# Mock _read_line
|
||||
with patch.object(ota, "_read_line") as mock_read_line:
|
||||
mock_read_line.side_effect = [
|
||||
"OK", # Device ready
|
||||
"OK", # Device finished
|
||||
]
|
||||
|
||||
ota.update(progress_callback=progress_callback)
|
||||
|
||||
# Verify progress callback was called
|
||||
assert len(progress_calls) > 0
|
||||
# First call should show some progress
|
||||
assert progress_calls[0][0] > 0
|
||||
# Total should be the firmware size
|
||||
assert progress_calls[0][1] == len(test_data)
|
||||
# Last call should show all bytes sent
|
||||
assert progress_calls[-1][0] == len(test_data)
|
||||
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch("meshtastic.ota.socket.socket")
|
||||
def test_esp32_wifi_ota_update_device_error_on_start(mock_socket_class):
|
||||
"""Test update() raises OTAError when device reports error during start"""
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
f.write(b"firmware")
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
mock_socket = MagicMock()
|
||||
mock_socket_class.return_value = mock_socket
|
||||
|
||||
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
||||
|
||||
with patch.object(ota, "_read_line") as mock_read_line:
|
||||
mock_read_line.return_value = "ERR BAD_HASH"
|
||||
|
||||
with pytest.raises(OTAError, match="Device reported error"):
|
||||
ota.update()
|
||||
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch("meshtastic.ota.socket.socket")
|
||||
def test_esp32_wifi_ota_update_device_error_on_finish(mock_socket_class):
|
||||
"""Test update() raises OTAError when device reports error after firmware sent"""
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
f.write(b"firmware")
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
mock_socket = MagicMock()
|
||||
mock_socket_class.return_value = mock_socket
|
||||
|
||||
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
||||
|
||||
with patch.object(ota, "_read_line") as mock_read_line:
|
||||
mock_read_line.side_effect = [
|
||||
"OK", # Device ready
|
||||
"ERR FLASH_ERR", # Error after firmware sent
|
||||
]
|
||||
|
||||
with pytest.raises(OTAError, match="OTA update failed"):
|
||||
ota.update()
|
||||
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch("meshtastic.ota.socket.socket")
|
||||
def test_esp32_wifi_ota_update_socket_cleanup_on_error(mock_socket_class):
|
||||
"""Test that socket is properly cleaned up on error"""
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
f.write(b"firmware")
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
mock_socket = MagicMock()
|
||||
mock_socket_class.return_value = mock_socket
|
||||
|
||||
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
||||
|
||||
# Simulate connection error
|
||||
mock_socket.connect.side_effect = ConnectionRefusedError("Connection refused")
|
||||
|
||||
with pytest.raises(ConnectionRefusedError):
|
||||
ota.update()
|
||||
|
||||
# Verify socket was closed even on error
|
||||
mock_socket.close.assert_called_once()
|
||||
assert ota._socket is None
|
||||
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch("meshtastic.ota.socket.socket")
|
||||
def test_esp32_wifi_ota_update_large_firmware(mock_socket_class):
|
||||
"""Test update() correctly chunks large firmware files"""
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
# Create a file larger than chunk_size (1024)
|
||||
test_data = b"B" * 3000
|
||||
f.write(test_data)
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
mock_socket = MagicMock()
|
||||
mock_socket_class.return_value = mock_socket
|
||||
|
||||
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
||||
|
||||
with patch.object(ota, "_read_line") as mock_read_line:
|
||||
mock_read_line.side_effect = [
|
||||
"OK", # Device ready
|
||||
"OK", # Device finished
|
||||
]
|
||||
|
||||
ota.update()
|
||||
|
||||
# Verify that all data was sent in chunks
|
||||
# 3000 bytes should be sent in ~3 chunks of 1024 bytes
|
||||
sendall_calls = [
|
||||
call
|
||||
for call in mock_socket.sendall.call_args_list
|
||||
if call[0][0]
|
||||
!= f"OTA {len(test_data)} {ota.hash_hex()}\n".encode("utf-8")
|
||||
]
|
||||
# Calculate total data sent (excluding the start command)
|
||||
total_sent = sum(len(call[0][0]) for call in sendall_calls)
|
||||
assert total_sent == len(test_data)
|
||||
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch("meshtastic.ota.socket.socket")
|
||||
def test_esp32_wifi_ota_update_unexpected_response_warning(mock_socket_class, caplog):
|
||||
"""Test update() logs warning on unexpected response during startup"""
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
f.write(b"firmware")
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
mock_socket = MagicMock()
|
||||
mock_socket_class.return_value = mock_socket
|
||||
|
||||
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
||||
|
||||
with patch.object(ota, "_read_line") as mock_read_line:
|
||||
mock_read_line.side_effect = [
|
||||
"UNKNOWN", # Unexpected response
|
||||
"OK", # Then proceed
|
||||
"OK", # Device finished
|
||||
]
|
||||
|
||||
with caplog.at_level(logging.WARNING):
|
||||
ota.update()
|
||||
|
||||
# Check that warning was logged for unexpected response
|
||||
assert "Unexpected response" in caplog.text
|
||||
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch("meshtastic.ota.socket.socket")
|
||||
def test_esp32_wifi_ota_update_unexpected_final_response(mock_socket_class, caplog):
|
||||
"""Test update() logs warning on unexpected final response after firmware upload"""
|
||||
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
||||
f.write(b"firmware")
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
mock_socket = MagicMock()
|
||||
mock_socket_class.return_value = mock_socket
|
||||
|
||||
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
||||
|
||||
with patch.object(ota, "_read_line") as mock_read_line:
|
||||
mock_read_line.side_effect = [
|
||||
"OK", # Device ready for firmware
|
||||
"UNKNOWN", # Unexpected final response (not OK, not ERR, not ACK)
|
||||
"OK", # Then succeed
|
||||
]
|
||||
|
||||
with caplog.at_level(logging.WARNING):
|
||||
ota.update()
|
||||
|
||||
# Check that warning was logged for unexpected final response
|
||||
assert "Unexpected final response" in caplog.text
|
||||
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
Reference in New Issue
Block a user