fix: add tests

This commit is contained in:
Sergio Conde
2026-02-01 21:13:58 +01:00
parent 4d8430d360
commit 4de19f50d9
3 changed files with 559 additions and 0 deletions

View File

@@ -6,6 +6,7 @@ import os
import platform
import re
import sys
import tempfile
from unittest.mock import mock_open, MagicMock, patch
import pytest
@@ -2900,3 +2901,68 @@ def test_main_set_ham_empty_string(capsys):
out, _ = capsys.readouterr()
assert "ERROR: Ham radio callsign cannot be empty or contain only whitespace characters" in out
assert excinfo.value.code == 1
# OTA-related tests
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_main_ota_update_file_not_found(capsys):
"""Test --ota-update with non-existent file"""
sys.argv = [
"",
"--ota-update",
"/nonexistent/firmware.bin",
"--host",
"192.168.1.100",
]
mt_config.args = sys.argv
with pytest.raises(SystemExit) as pytest_wrapped_e:
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
@patch("meshtastic.ota.ESP32WiFiOTA")
@patch("meshtastic.__main__.meshtastic.util.our_exit")
def test_main_ota_update_retries(mock_our_exit, mock_ota_class, capsys):
"""Test --ota-update retries on failure"""
# Create a temporary firmware file
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
f.write(b"fake firmware data")
firmware_file = f.name
try:
sys.argv = ["", "--ota-update", firmware_file, "--host", "192.168.1.100"]
mt_config.args = sys.argv
# Mock the OTA class to fail all 5 retries
mock_ota = MagicMock()
mock_ota_class.return_value = mock_ota
mock_ota.hash_bytes.return_value = b"\x00" * 32
mock_ota.hash_hex.return_value = "a" * 64
mock_ota.update.side_effect = Exception("Connection failed")
# Mock isinstance to return True
with patch("meshtastic.__main__.isinstance", return_value=True):
with patch("meshtastic.tcp_interface.TCPInterface") as mock_tcp:
mock_iface = MagicMock()
mock_iface.hostname = "192.168.1.100"
mock_iface.localNode = MagicMock(autospec=Node)
mock_tcp.return_value = mock_iface
with patch("time.sleep"):
main()
# Should have exhausted all retries and called our_exit
# Note: our_exit might be called twice - once for TCP check, once for failure
assert mock_our_exit.call_count >= 1
# Check the last call was for OTA failure
last_call_args = mock_our_exit.call_args[0][0]
assert "OTA update failed" in last_call_args
finally:
os.unlink(firmware_file)

View File

@@ -1550,6 +1550,41 @@ def test_setOwner_valid_names(caplog):
assert re.search(r'p.set_owner.short_name:VN:', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_start_ota_local_node():
"""Test startOTA on local node"""
iface = MagicMock(autospec=MeshInterface)
anode = Node(iface, 1234567890, noProto=True)
# Set up as local node
iface.localNode = anode
amesg = admin_pb2.AdminMessage()
with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
with patch.object(anode, "_sendAdmin") as mock_send_admin:
test_hash = b"\x01\x02\x03" * 8 # 24 bytes hash
anode.startOTA(ota_mode=admin_pb2.OTAMode.OTA_WIFI, ota_file_hash=test_hash)
# Verify the OTA request was set correctly
assert amesg.ota_request.reboot_ota_mode == admin_pb2.OTAMode.OTA_WIFI
assert amesg.ota_request.ota_hash == test_hash
mock_send_admin.assert_called_once_with(amesg)
@pytest.mark.unit
def test_start_ota_remote_node_raises_error():
"""Test startOTA on remote node raises ValueError"""
iface = MagicMock(autospec=MeshInterface)
local_node = Node(iface, 1234567890, noProto=True)
remote_node = Node(iface, 9876543210, noProto=True)
iface.localNode = local_node
test_hash = b"\x01\x02\x03" * 8
with pytest.raises(ValueError, match="startOTA only possible in local node"):
remote_node.startOTA(
ota_mode=admin_pb2.OTAMode.OTA_WIFI, ota_file_hash=test_hash
)
# TODO
# @pytest.mark.unitslow
# def test_waitForConfig():

View File

@@ -0,0 +1,458 @@
"""Meshtastic unit tests for ota.py"""
import hashlib
import os
import socket
import tempfile
from unittest.mock import MagicMock, mock_open, patch, call
import pytest
from meshtastic.ota import (
_file_sha256,
ESP32WiFiOTA,
OTAError,
)
@pytest.mark.unit
def test_file_sha256():
"""Test _file_sha256 calculates correct hash"""
# Create a temporary file with known content
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
test_data = b"Hello, World!"
f.write(test_data)
temp_file = f.name
try:
result = _file_sha256(temp_file)
expected_hash = hashlib.sha256(test_data).hexdigest()
assert result.hexdigest() == expected_hash
finally:
os.unlink(temp_file)
@pytest.mark.unit
def test_file_sha256_large_file():
"""Test _file_sha256 handles files larger than chunk size"""
# Create a temporary file with more than 4096 bytes
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
test_data = b"A" * 8192 # More than 4096 bytes
f.write(test_data)
temp_file = f.name
try:
result = _file_sha256(temp_file)
expected_hash = hashlib.sha256(test_data).hexdigest()
assert result.hexdigest() == expected_hash
finally:
os.unlink(temp_file)
@pytest.mark.unit
def test_esp32_wifi_ota_init_file_not_found():
"""Test ESP32WiFiOTA raises FileNotFoundError for non-existent file"""
with pytest.raises(FileNotFoundError, match="does not exist"):
ESP32WiFiOTA("/nonexistent/firmware.bin", "192.168.1.1")
@pytest.mark.unit
def test_esp32_wifi_ota_init_success():
"""Test ESP32WiFiOTA initializes correctly with valid file"""
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
f.write(b"fake firmware data")
temp_file = f.name
try:
ota = ESP32WiFiOTA(temp_file, "192.168.1.1", 3232)
assert ota._filename == temp_file
assert ota._hostname == "192.168.1.1"
assert ota._port == 3232
assert ota._socket is None
# Verify hash is calculated
assert ota._file_hash is not None
assert len(ota.hash_hex()) == 64 # SHA256 hex is 64 chars
finally:
os.unlink(temp_file)
@pytest.mark.unit
def test_esp32_wifi_ota_init_default_port():
"""Test ESP32WiFiOTA uses default port 3232"""
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
f.write(b"fake firmware data")
temp_file = f.name
try:
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
assert ota._port == 3232
finally:
os.unlink(temp_file)
@pytest.mark.unit
def test_esp32_wifi_ota_hash_bytes():
"""Test hash_bytes returns correct bytes"""
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
test_data = b"firmware data"
f.write(test_data)
temp_file = f.name
try:
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
hash_bytes = ota.hash_bytes()
expected_bytes = hashlib.sha256(test_data).digest()
assert hash_bytes == expected_bytes
assert len(hash_bytes) == 32 # SHA256 is 32 bytes
finally:
os.unlink(temp_file)
@pytest.mark.unit
def test_esp32_wifi_ota_hash_hex():
"""Test hash_hex returns correct hex string"""
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
test_data = b"firmware data"
f.write(test_data)
temp_file = f.name
try:
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
hash_hex = ota.hash_hex()
expected_hex = hashlib.sha256(test_data).hexdigest()
assert hash_hex == expected_hex
assert len(hash_hex) == 64 # SHA256 hex is 64 chars
finally:
os.unlink(temp_file)
@pytest.mark.unit
def test_esp32_wifi_ota_read_line_not_connected():
"""Test _read_line raises ConnectionError when not connected"""
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
f.write(b"firmware")
temp_file = f.name
try:
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
with pytest.raises(ConnectionError, match="Socket not connected"):
ota._read_line()
finally:
os.unlink(temp_file)
@pytest.mark.unit
def test_esp32_wifi_ota_read_line_connection_closed():
"""Test _read_line raises ConnectionError when connection closed"""
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
f.write(b"firmware")
temp_file = f.name
try:
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
mock_socket = MagicMock()
# Simulate connection closed
mock_socket.recv.return_value = b""
ota._socket = mock_socket
with pytest.raises(ConnectionError, match="Connection closed"):
ota._read_line()
finally:
os.unlink(temp_file)
@pytest.mark.unit
def test_esp32_wifi_ota_read_line_success():
"""Test _read_line successfully reads a line"""
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
f.write(b"firmware")
temp_file = f.name
try:
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
mock_socket = MagicMock()
# Simulate receiving "OK\n"
mock_socket.recv.side_effect = [b"O", b"K", b"\n"]
ota._socket = mock_socket
result = ota._read_line()
assert result == "OK"
finally:
os.unlink(temp_file)
@pytest.mark.unit
@patch("meshtastic.ota.socket.socket")
def test_esp32_wifi_ota_update_success(mock_socket_class):
"""Test update() with successful OTA"""
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
test_data = b"A" * 1024 # 1KB of data
f.write(test_data)
temp_file = f.name
try:
# Setup mock socket
mock_socket = MagicMock()
mock_socket_class.return_value = mock_socket
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
# Mock _read_line to return appropriate responses
# First call: ERASING, Second call: OK (ready), Third call: OK (complete)
with patch.object(ota, "_read_line") as mock_read_line:
mock_read_line.side_effect = [
"ERASING", # Device is erasing flash
"OK", # Device ready for firmware
"OK", # Device finished successfully
]
ota.update()
# Verify socket was created and connected
mock_socket_class.assert_called_once_with(
socket.AF_INET, socket.SOCK_STREAM
)
mock_socket.settimeout.assert_called_once_with(15)
mock_socket.connect.assert_called_once_with(("192.168.1.1", 3232))
# Verify start command was sent
start_cmd = f"OTA {len(test_data)} {ota.hash_hex()}\n".encode("utf-8")
mock_socket.sendall.assert_any_call(start_cmd)
# Verify firmware was sent (at least one chunk)
assert mock_socket.sendall.call_count >= 2
# Verify socket was closed
mock_socket.close.assert_called_once()
finally:
os.unlink(temp_file)
@pytest.mark.unit
@patch("meshtastic.ota.socket.socket")
def test_esp32_wifi_ota_update_with_progress_callback(mock_socket_class):
"""Test update() with progress callback"""
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
test_data = b"A" * 1024 # 1KB of data
f.write(test_data)
temp_file = f.name
try:
# Setup mock socket
mock_socket = MagicMock()
mock_socket_class.return_value = mock_socket
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
# Track progress callback calls
progress_calls = []
def progress_callback(sent, total):
progress_calls.append((sent, total))
# Mock _read_line
with patch.object(ota, "_read_line") as mock_read_line:
mock_read_line.side_effect = [
"OK", # Device ready
"OK", # Device finished
]
ota.update(progress_callback=progress_callback)
# Verify progress callback was called
assert len(progress_calls) > 0
# First call should show some progress
assert progress_calls[0][0] > 0
# Total should be the firmware size
assert progress_calls[0][1] == len(test_data)
# Last call should show all bytes sent
assert progress_calls[-1][0] == len(test_data)
finally:
os.unlink(temp_file)
@pytest.mark.unit
@patch("meshtastic.ota.socket.socket")
def test_esp32_wifi_ota_update_device_error_on_start(mock_socket_class):
"""Test update() raises OTAError when device reports error during start"""
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
f.write(b"firmware")
temp_file = f.name
try:
mock_socket = MagicMock()
mock_socket_class.return_value = mock_socket
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
with patch.object(ota, "_read_line") as mock_read_line:
mock_read_line.return_value = "ERR BAD_HASH"
with pytest.raises(OTAError, match="Device reported error"):
ota.update()
finally:
os.unlink(temp_file)
@pytest.mark.unit
@patch("meshtastic.ota.socket.socket")
def test_esp32_wifi_ota_update_device_error_on_finish(mock_socket_class):
"""Test update() raises OTAError when device reports error after firmware sent"""
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
f.write(b"firmware")
temp_file = f.name
try:
mock_socket = MagicMock()
mock_socket_class.return_value = mock_socket
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
with patch.object(ota, "_read_line") as mock_read_line:
mock_read_line.side_effect = [
"OK", # Device ready
"ERR FLASH_ERR", # Error after firmware sent
]
with pytest.raises(OTAError, match="OTA update failed"):
ota.update()
finally:
os.unlink(temp_file)
@pytest.mark.unit
@patch("meshtastic.ota.socket.socket")
def test_esp32_wifi_ota_update_socket_cleanup_on_error(mock_socket_class):
"""Test that socket is properly cleaned up on error"""
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
f.write(b"firmware")
temp_file = f.name
try:
mock_socket = MagicMock()
mock_socket_class.return_value = mock_socket
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
# Simulate connection error
mock_socket.connect.side_effect = ConnectionRefusedError("Connection refused")
with pytest.raises(ConnectionRefusedError):
ota.update()
# Verify socket was closed even on error
mock_socket.close.assert_called_once()
assert ota._socket is None
finally:
os.unlink(temp_file)
@pytest.mark.unit
@patch("meshtastic.ota.socket.socket")
def test_esp32_wifi_ota_update_large_firmware(mock_socket_class):
"""Test update() correctly chunks large firmware files"""
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
# Create a file larger than chunk_size (1024)
test_data = b"B" * 3000
f.write(test_data)
temp_file = f.name
try:
mock_socket = MagicMock()
mock_socket_class.return_value = mock_socket
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
with patch.object(ota, "_read_line") as mock_read_line:
mock_read_line.side_effect = [
"OK", # Device ready
"OK", # Device finished
]
ota.update()
# Verify that all data was sent in chunks
# 3000 bytes should be sent in ~3 chunks of 1024 bytes
sendall_calls = [
call
for call in mock_socket.sendall.call_args_list
if call[0][0]
!= f"OTA {len(test_data)} {ota.hash_hex()}\n".encode("utf-8")
]
# Calculate total data sent (excluding the start command)
total_sent = sum(len(call[0][0]) for call in sendall_calls)
assert total_sent == len(test_data)
finally:
os.unlink(temp_file)
@pytest.mark.unit
@patch("meshtastic.ota.socket.socket")
def test_esp32_wifi_ota_update_unexpected_response_warning(mock_socket_class, caplog):
"""Test update() logs warning on unexpected response during startup"""
import logging
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
f.write(b"firmware")
temp_file = f.name
try:
mock_socket = MagicMock()
mock_socket_class.return_value = mock_socket
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
with patch.object(ota, "_read_line") as mock_read_line:
mock_read_line.side_effect = [
"UNKNOWN", # Unexpected response
"OK", # Then proceed
"OK", # Device finished
]
with caplog.at_level(logging.WARNING):
ota.update()
# Check that warning was logged for unexpected response
assert "Unexpected response" in caplog.text
finally:
os.unlink(temp_file)
@pytest.mark.unit
@patch("meshtastic.ota.socket.socket")
def test_esp32_wifi_ota_update_unexpected_final_response(mock_socket_class, caplog):
"""Test update() logs warning on unexpected final response after firmware upload"""
import logging
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
f.write(b"firmware")
temp_file = f.name
try:
mock_socket = MagicMock()
mock_socket_class.return_value = mock_socket
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
with patch.object(ota, "_read_line") as mock_read_line:
mock_read_line.side_effect = [
"OK", # Device ready for firmware
"UNKNOWN", # Unexpected final response (not OK, not ERR, not ACK)
"OK", # Then succeed
]
with caplog.at_level(logging.WARNING):
ota.update()
# Check that warning was logged for unexpected final response
assert "Unexpected final response" in caplog.text
finally:
os.unlink(temp_file)