mirror of
https://github.com/meshtastic/python.git
synced 2026-04-17 13:31:43 -04:00
456 lines
14 KiB
Python
456 lines
14 KiB
Python
"""Meshtastic unit tests for ota.py"""
|
|
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
import socket
|
|
import tempfile
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from meshtastic.ota import (
|
|
_file_sha256,
|
|
ESP32WiFiOTA,
|
|
OTAError,
|
|
)
|
|
|
|
|
|
@pytest.mark.unit
|
|
def test_file_sha256():
|
|
"""Test _file_sha256 calculates correct hash"""
|
|
# Create a temporary file with known content
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
test_data = b"Hello, World!"
|
|
f.write(test_data)
|
|
temp_file = f.name
|
|
|
|
try:
|
|
result = _file_sha256(temp_file)
|
|
expected_hash = hashlib.sha256(test_data).hexdigest()
|
|
assert result.hexdigest() == expected_hash
|
|
finally:
|
|
os.unlink(temp_file)
|
|
|
|
|
|
@pytest.mark.unit
|
|
def test_file_sha256_large_file():
|
|
"""Test _file_sha256 handles files larger than chunk size"""
|
|
# Create a temporary file with more than 4096 bytes
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
test_data = b"A" * 8192 # More than 4096 bytes
|
|
f.write(test_data)
|
|
temp_file = f.name
|
|
|
|
try:
|
|
result = _file_sha256(temp_file)
|
|
expected_hash = hashlib.sha256(test_data).hexdigest()
|
|
assert result.hexdigest() == expected_hash
|
|
finally:
|
|
os.unlink(temp_file)
|
|
|
|
|
|
@pytest.mark.unit
|
|
def test_esp32_wifi_ota_init_file_not_found():
|
|
"""Test ESP32WiFiOTA raises FileNotFoundError for non-existent file"""
|
|
with pytest.raises(FileNotFoundError, match="does not exist"):
|
|
ESP32WiFiOTA("/nonexistent/firmware.bin", "192.168.1.1")
|
|
|
|
|
|
@pytest.mark.unit
|
|
def test_esp32_wifi_ota_init_success():
|
|
"""Test ESP32WiFiOTA initializes correctly with valid file"""
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
f.write(b"fake firmware data")
|
|
temp_file = f.name
|
|
|
|
try:
|
|
ota = ESP32WiFiOTA(temp_file, "192.168.1.1", 3232)
|
|
assert ota._filename == temp_file
|
|
assert ota._hostname == "192.168.1.1"
|
|
assert ota._port == 3232
|
|
assert ota._socket is None
|
|
# Verify hash is calculated
|
|
assert ota._file_hash is not None
|
|
assert len(ota.hash_hex()) == 64 # SHA256 hex is 64 chars
|
|
finally:
|
|
os.unlink(temp_file)
|
|
|
|
|
|
@pytest.mark.unit
|
|
def test_esp32_wifi_ota_init_default_port():
|
|
"""Test ESP32WiFiOTA uses default port 3232"""
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
f.write(b"fake firmware data")
|
|
temp_file = f.name
|
|
|
|
try:
|
|
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
|
assert ota._port == 3232
|
|
finally:
|
|
os.unlink(temp_file)
|
|
|
|
|
|
@pytest.mark.unit
|
|
def test_esp32_wifi_ota_hash_bytes():
|
|
"""Test hash_bytes returns correct bytes"""
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
test_data = b"firmware data"
|
|
f.write(test_data)
|
|
temp_file = f.name
|
|
|
|
try:
|
|
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
|
hash_bytes = ota.hash_bytes()
|
|
expected_bytes = hashlib.sha256(test_data).digest()
|
|
assert hash_bytes == expected_bytes
|
|
assert len(hash_bytes) == 32 # SHA256 is 32 bytes
|
|
finally:
|
|
os.unlink(temp_file)
|
|
|
|
|
|
@pytest.mark.unit
|
|
def test_esp32_wifi_ota_hash_hex():
|
|
"""Test hash_hex returns correct hex string"""
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
test_data = b"firmware data"
|
|
f.write(test_data)
|
|
temp_file = f.name
|
|
|
|
try:
|
|
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
|
hash_hex = ota.hash_hex()
|
|
expected_hex = hashlib.sha256(test_data).hexdigest()
|
|
assert hash_hex == expected_hex
|
|
assert len(hash_hex) == 64 # SHA256 hex is 64 chars
|
|
finally:
|
|
os.unlink(temp_file)
|
|
|
|
|
|
@pytest.mark.unit
|
|
def test_esp32_wifi_ota_read_line_not_connected():
|
|
"""Test _read_line raises ConnectionError when not connected"""
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
f.write(b"firmware")
|
|
temp_file = f.name
|
|
|
|
try:
|
|
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
|
with pytest.raises(ConnectionError, match="Socket not connected"):
|
|
ota._read_line()
|
|
finally:
|
|
os.unlink(temp_file)
|
|
|
|
|
|
@pytest.mark.unit
|
|
def test_esp32_wifi_ota_read_line_connection_closed():
|
|
"""Test _read_line raises ConnectionError when connection closed"""
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
f.write(b"firmware")
|
|
temp_file = f.name
|
|
|
|
try:
|
|
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
|
mock_socket = MagicMock()
|
|
# Simulate connection closed
|
|
mock_socket.recv.return_value = b""
|
|
ota._socket = mock_socket
|
|
|
|
with pytest.raises(ConnectionError, match="Connection closed"):
|
|
ota._read_line()
|
|
finally:
|
|
os.unlink(temp_file)
|
|
|
|
|
|
@pytest.mark.unit
|
|
def test_esp32_wifi_ota_read_line_success():
|
|
"""Test _read_line successfully reads a line"""
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
f.write(b"firmware")
|
|
temp_file = f.name
|
|
|
|
try:
|
|
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
|
mock_socket = MagicMock()
|
|
# Simulate receiving "OK\n"
|
|
mock_socket.recv.side_effect = [b"O", b"K", b"\n"]
|
|
ota._socket = mock_socket
|
|
|
|
result = ota._read_line()
|
|
assert result == "OK"
|
|
finally:
|
|
os.unlink(temp_file)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@patch("meshtastic.ota.socket.socket")
|
|
def test_esp32_wifi_ota_update_success(mock_socket_class):
|
|
"""Test update() with successful OTA"""
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
test_data = b"A" * 1024 # 1KB of data
|
|
f.write(test_data)
|
|
temp_file = f.name
|
|
|
|
try:
|
|
# Setup mock socket
|
|
mock_socket = MagicMock()
|
|
mock_socket_class.return_value = mock_socket
|
|
|
|
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
|
|
|
# Mock _read_line to return appropriate responses
|
|
# First call: ERASING, Second call: OK (ready), Third call: OK (complete)
|
|
with patch.object(ota, "_read_line") as mock_read_line:
|
|
mock_read_line.side_effect = [
|
|
"ERASING", # Device is erasing flash
|
|
"OK", # Device ready for firmware
|
|
"OK", # Device finished successfully
|
|
]
|
|
|
|
ota.update()
|
|
|
|
# Verify socket was created and connected
|
|
mock_socket_class.assert_called_once_with(
|
|
socket.AF_INET, socket.SOCK_STREAM
|
|
)
|
|
mock_socket.settimeout.assert_called_once_with(15)
|
|
mock_socket.connect.assert_called_once_with(("192.168.1.1", 3232))
|
|
|
|
# Verify start command was sent
|
|
start_cmd = f"OTA {len(test_data)} {ota.hash_hex()}\n".encode("utf-8")
|
|
mock_socket.sendall.assert_any_call(start_cmd)
|
|
|
|
# Verify firmware was sent (at least one chunk)
|
|
assert mock_socket.sendall.call_count >= 2
|
|
|
|
# Verify socket was closed
|
|
mock_socket.close.assert_called_once()
|
|
|
|
finally:
|
|
os.unlink(temp_file)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@patch("meshtastic.ota.socket.socket")
|
|
def test_esp32_wifi_ota_update_with_progress_callback(mock_socket_class):
|
|
"""Test update() with progress callback"""
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
test_data = b"A" * 1024 # 1KB of data
|
|
f.write(test_data)
|
|
temp_file = f.name
|
|
|
|
try:
|
|
# Setup mock socket
|
|
mock_socket = MagicMock()
|
|
mock_socket_class.return_value = mock_socket
|
|
|
|
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
|
|
|
# Track progress callback calls
|
|
progress_calls = []
|
|
|
|
def progress_callback(sent, total):
|
|
progress_calls.append((sent, total))
|
|
|
|
# Mock _read_line
|
|
with patch.object(ota, "_read_line") as mock_read_line:
|
|
mock_read_line.side_effect = [
|
|
"OK", # Device ready
|
|
"OK", # Device finished
|
|
]
|
|
|
|
ota.update(progress_callback=progress_callback)
|
|
|
|
# Verify progress callback was called
|
|
assert len(progress_calls) > 0
|
|
# First call should show some progress
|
|
assert progress_calls[0][0] > 0
|
|
# Total should be the firmware size
|
|
assert progress_calls[0][1] == len(test_data)
|
|
# Last call should show all bytes sent
|
|
assert progress_calls[-1][0] == len(test_data)
|
|
|
|
finally:
|
|
os.unlink(temp_file)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@patch("meshtastic.ota.socket.socket")
|
|
def test_esp32_wifi_ota_update_device_error_on_start(mock_socket_class):
|
|
"""Test update() raises OTAError when device reports error during start"""
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
f.write(b"firmware")
|
|
temp_file = f.name
|
|
|
|
try:
|
|
mock_socket = MagicMock()
|
|
mock_socket_class.return_value = mock_socket
|
|
|
|
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
|
|
|
with patch.object(ota, "_read_line") as mock_read_line:
|
|
mock_read_line.return_value = "ERR BAD_HASH"
|
|
|
|
with pytest.raises(OTAError, match="Device reported error"):
|
|
ota.update()
|
|
|
|
finally:
|
|
os.unlink(temp_file)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@patch("meshtastic.ota.socket.socket")
|
|
def test_esp32_wifi_ota_update_device_error_on_finish(mock_socket_class):
|
|
"""Test update() raises OTAError when device reports error after firmware sent"""
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
f.write(b"firmware")
|
|
temp_file = f.name
|
|
|
|
try:
|
|
mock_socket = MagicMock()
|
|
mock_socket_class.return_value = mock_socket
|
|
|
|
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
|
|
|
with patch.object(ota, "_read_line") as mock_read_line:
|
|
mock_read_line.side_effect = [
|
|
"OK", # Device ready
|
|
"ERR FLASH_ERR", # Error after firmware sent
|
|
]
|
|
|
|
with pytest.raises(OTAError, match="OTA update failed"):
|
|
ota.update()
|
|
|
|
finally:
|
|
os.unlink(temp_file)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@patch("meshtastic.ota.socket.socket")
|
|
def test_esp32_wifi_ota_update_socket_cleanup_on_error(mock_socket_class):
|
|
"""Test that socket is properly cleaned up on error"""
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
f.write(b"firmware")
|
|
temp_file = f.name
|
|
|
|
try:
|
|
mock_socket = MagicMock()
|
|
mock_socket_class.return_value = mock_socket
|
|
|
|
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
|
|
|
# Simulate connection error
|
|
mock_socket.connect.side_effect = ConnectionRefusedError("Connection refused")
|
|
|
|
with pytest.raises(ConnectionRefusedError):
|
|
ota.update()
|
|
|
|
# Verify socket was closed even on error
|
|
mock_socket.close.assert_called_once()
|
|
assert ota._socket is None
|
|
|
|
finally:
|
|
os.unlink(temp_file)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@patch("meshtastic.ota.socket.socket")
|
|
def test_esp32_wifi_ota_update_large_firmware(mock_socket_class):
|
|
"""Test update() correctly chunks large firmware files"""
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
# Create a file larger than chunk_size (1024)
|
|
test_data = b"B" * 3000
|
|
f.write(test_data)
|
|
temp_file = f.name
|
|
|
|
try:
|
|
mock_socket = MagicMock()
|
|
mock_socket_class.return_value = mock_socket
|
|
|
|
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
|
|
|
with patch.object(ota, "_read_line") as mock_read_line:
|
|
mock_read_line.side_effect = [
|
|
"OK", # Device ready
|
|
"OK", # Device finished
|
|
]
|
|
|
|
ota.update()
|
|
|
|
# Verify that all data was sent in chunks
|
|
# 3000 bytes should be sent in ~3 chunks of 1024 bytes
|
|
sendall_calls = [
|
|
call
|
|
for call in mock_socket.sendall.call_args_list
|
|
if call[0][0]
|
|
!= f"OTA {len(test_data)} {ota.hash_hex()}\n".encode("utf-8")
|
|
]
|
|
# Calculate total data sent (excluding the start command)
|
|
total_sent = sum(len(call[0][0]) for call in sendall_calls)
|
|
assert total_sent == len(test_data)
|
|
|
|
finally:
|
|
os.unlink(temp_file)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@patch("meshtastic.ota.socket.socket")
|
|
def test_esp32_wifi_ota_update_unexpected_response_warning(mock_socket_class, caplog):
|
|
"""Test update() logs warning on unexpected response during startup"""
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
f.write(b"firmware")
|
|
temp_file = f.name
|
|
|
|
try:
|
|
mock_socket = MagicMock()
|
|
mock_socket_class.return_value = mock_socket
|
|
|
|
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
|
|
|
with patch.object(ota, "_read_line") as mock_read_line:
|
|
mock_read_line.side_effect = [
|
|
"UNKNOWN", # Unexpected response
|
|
"OK", # Then proceed
|
|
"OK", # Device finished
|
|
]
|
|
|
|
with caplog.at_level(logging.WARNING):
|
|
ota.update()
|
|
|
|
# Check that warning was logged for unexpected response
|
|
assert "Unexpected response" in caplog.text
|
|
|
|
finally:
|
|
os.unlink(temp_file)
|
|
|
|
|
|
@pytest.mark.unit
|
|
@patch("meshtastic.ota.socket.socket")
|
|
def test_esp32_wifi_ota_update_unexpected_final_response(mock_socket_class, caplog):
|
|
"""Test update() logs warning on unexpected final response after firmware upload"""
|
|
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
|
|
f.write(b"firmware")
|
|
temp_file = f.name
|
|
|
|
try:
|
|
mock_socket = MagicMock()
|
|
mock_socket_class.return_value = mock_socket
|
|
|
|
ota = ESP32WiFiOTA(temp_file, "192.168.1.1")
|
|
|
|
with patch.object(ota, "_read_line") as mock_read_line:
|
|
mock_read_line.side_effect = [
|
|
"OK", # Device ready for firmware
|
|
"UNKNOWN", # Unexpected final response (not OK, not ERR, not ACK)
|
|
"OK", # Then succeed
|
|
]
|
|
|
|
with caplog.at_level(logging.WARNING):
|
|
ota.update()
|
|
|
|
# Check that warning was logged for unexpected final response
|
|
assert "Unexpected final response" in caplog.text
|
|
|
|
finally:
|
|
os.unlink(temp_file)
|