From 4d8430d36094ed30524c14a3746f4af85aa27bf8 Mon Sep 17 00:00:00 2001 From: Sergio Conde Date: Sun, 1 Feb 2026 11:39:09 +0100 Subject: [PATCH] fix: throw propper exceptions and cleanup code --- meshtastic/__main__.py | 9 +++++---- meshtastic/node.py | 10 +++++----- meshtastic/ota.py | 36 +++++++++++++++++++++--------------- 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 82f2a56..8d4e38b 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -466,8 +466,8 @@ def onConnected(interface): print(f"Triggering OTA update on {interface.hostname}...") interface.getNode(args.dest, False, **getNode_kwargs).startOTA( - mode=admin_pb2.OTA_WIFI, - hash=ota.hash_bytes() + ota_mode=admin_pb2.OTAMode.OTA_WIFI, + ota_file_hash=ota.hash_bytes() ) print("Waiting for device to reboot into OTA mode...") @@ -487,7 +487,7 @@ def onConnected(interface): time.sleep(2) print("\nOTA update completed successfully!") - + if args.enter_dfu: closeNow = True waitForAckNak = True @@ -1946,7 +1946,8 @@ def addRemoteAdminArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars group.add_argument( "--ota-update", - help="Perform an OTA update on the destination node (ESP32, firmware version >=2.7.18, WiFi/TCP only for now). Specify the path to the firmware file.", + help="Perform an OTA update on the local node (ESP32, firmware version >=2.7.18, WiFi/TCP only for now). " + "Specify the path to the firmware file.", metavar="FIRMWARE_FILE", action="store", ) diff --git a/meshtastic/node.py b/meshtastic/node.py index 4df5acb..a635643 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -669,17 +669,17 @@ class Node: def startOTA( self, - mode: admin_pb2.OTAMode.ValueType, - hash: bytes, + ota_mode: admin_pb2.OTAMode.ValueType, + ota_file_hash: bytes, ): """Tell the node to start OTA mode (firmware >= 2.7.18).""" if self != self.iface.localNode: - raise Exception("startOTA only possible in local node") + raise ValueError("startOTA only possible in local node") self.ensureSessionKey() p = admin_pb2.AdminMessage() - p.ota_request.reboot_ota_mode=mode - p.ota_request.ota_hash=hash + p.ota_request.reboot_ota_mode = ota_mode + p.ota_request.ota_hash = ota_file_hash return self._sendAdmin(p) diff --git a/meshtastic/ota.py b/meshtastic/ota.py index b2545a2..af9feae 100644 --- a/meshtastic/ota.py +++ b/meshtastic/ota.py @@ -1,4 +1,6 @@ -import os +"""Meshtastic ESP32 Unified OTA +""" +import os import hashlib import socket import logging @@ -11,13 +13,18 @@ logger = logging.getLogger(__name__) def _file_sha256(filename: str): """Calculate SHA256 hash of a file.""" sha256_hash = hashlib.sha256() - + with open(filename, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash + +class OTAError(Exception): + """Exception for OTA errors.""" + + class ESP32WiFiOTA: """ESP32 WiFi Unified OTA updates.""" @@ -28,21 +35,21 @@ class ESP32WiFiOTA: self._socket: Optional[socket.socket] = None if not os.path.exists(self._filename): - raise Exception(f"File {self._filename} does not exist") + raise FileNotFoundError(f"File {self._filename} does not exist") self._file_hash = _file_sha256(self._filename) def _read_line(self) -> str: """Read a line from the socket.""" if not self._socket: - raise Exception("Socket not connected") + raise ConnectionError("Socket not connected") line = b"" while not line.endswith(b"\n"): char = self._socket.recv(1) - + if not char: - raise Exception("Connection closed while waiting for response") + raise ConnectionError("Connection closed while waiting for response") line += char @@ -78,10 +85,11 @@ class ESP32WiFiOTA: response = self._read_line() if response == "OK": break - elif response == "ERASING": + + if response == "ERASING": logger.info("Device is erasing flash...") elif response.startswith("ERR "): - raise Exception(f"Device reported error: {response}") + raise OTAError(f"Device reported error: {response}") else: logger.warning(f"Unexpected response: {response}") @@ -105,18 +113,16 @@ class ESP32WiFiOTA: logger.info("Firmware sent, waiting for verification...") while True: response = self._read_line() - if response == "OK": logger.info("OTA update completed successfully!") break - elif response == "ACK": - continue - elif response.startswith("ERR "): - raise Exception(f"OTA update failed: {response}") - else: + + if response.startswith("ERR "): + raise OTAError(f"OTA update failed: {response}") + elif response != "ACK": logger.warning(f"Unexpected final response: {response}") finally: if self._socket: self._socket.close() - self._socket = None \ No newline at end of file + self._socket = None