From f449ff95069fae0793b7c8c3361080f7c315e58e Mon Sep 17 00:00:00 2001 From: Ian McEwen Date: Thu, 11 Apr 2024 18:28:01 -0700 Subject: [PATCH] Add a variety of type annotations, primarily in mesh_interface --- meshtastic/mesh_interface.py | 127 ++++++++++++++++++----------------- meshtastic/node.py | 6 +- meshtastic/util.py | 18 ++--- 3 files changed, 78 insertions(+), 73 deletions(-) diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index f476af3..c23c311 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -37,6 +37,8 @@ from meshtastic.util import ( message_to_json, ) +from typing import Any, Callable, Dict, List, Optional, Union + class MeshInterface: """Interface class for meshtastic devices @@ -54,7 +56,7 @@ class MeshInterface: self.message = message super().__init__(self.message) - def __init__(self, debugOut=None, noProto=False): + def __init__(self, debugOut=None, noProto: bool=False) -> None: """Constructor Keyword Arguments: @@ -62,27 +64,27 @@ class MeshInterface: link - just be a dumb serial client. """ self.debugOut = debugOut - self.nodes = None # FIXME - self.isConnected = threading.Event() - self.noProto = noProto - self.localNode = meshtastic.node.Node(self, -1) # We fixup nodenum later - self.myInfo = None # We don't have device info yet - self.metadata = None # We don't have device metadata yet - self.responseHandlers = {} # A map from request ID to the handler + self.nodes: Optional[Dict[str,Dict]] = None # FIXME + self.isConnected: threading.Event = threading.Event() + self.noProto: bool = noProto + self.localNode: meshtastic.node.Node = meshtastic.node.Node(self, -1) # We fixup nodenum later + self.myInfo: Optional[mesh_pb2.MyNodeInfo] = None # We don't have device info yet + self.metadata: Optional[mesh_pb2.DeviceMetadata] = None # We don't have device metadata yet + self.responseHandlers: Dict[int,ResponseHandler] = {} # A map from request ID to the handler self.failure = ( None # If we've encountered a fatal exception it will be kept here ) - self._timeout = Timeout() - self._acknowledgment = Acknowledgment() - self.heartbeatTimer = None + self._timeout: Timeout = Timeout() + self._acknowledgment: Acknowledgment = Acknowledgment() + self.heartbeatTimer: Optional[threading.Timer] = None random.seed() # FIXME, we should not clobber the random seedval here, instead tell user they must call it - self.currentPacketId = random.randint(0, 0xFFFFFFFF) - self.nodesByNum = None - self.configId = None - self.gotResponse = False # used in gpio read - self.mask = None # used in gpio read and gpio watch - self.queueStatus = None - self.queue = collections.OrderedDict() + self.currentPacketId: int = random.randint(0, 0xFFFFFFFF) + self.nodesByNum: Optional[Dict[int, Dict]] = None + self.configId: Optional[int] = None + self.gotResponse: bool = False # used in gpio read + self.mask: Optional[int] = None # used in gpio read and gpio watch + self.queueStatus: Optional[mesh_pb2.QueueStatus] = None + self.queue: collections.OrderedDict = collections.OrderedDict() def close(self): """Shutdown this interface""" @@ -103,7 +105,7 @@ class MeshInterface: logging.error(f"Traceback: {traceback}") self.close() - def showInfo(self, file=sys.stdout): # pylint: disable=W0613 + def showInfo(self, file=sys.stdout) -> str: # pylint: disable=W0613 """Show human readable summary about this object""" owner = f"Owner: {self.getLongName()} ({self.getShortName()})" myinfo = "" @@ -135,20 +137,20 @@ class MeshInterface: print(infos) return infos - def showNodes(self, includeSelf=True, file=sys.stdout): # pylint: disable=W0613 + def showNodes(self, includeSelf: bool=True, file=sys.stdout) -> str: # pylint: disable=W0613 """Show table summary of nodes in mesh""" - def formatFloat(value, precision=2, unit=""): + def formatFloat(value, precision=2, unit="") -> Optional[str]: """Format a float value with precision.""" return f"{value:.{precision}f}{unit}" if value else None - def getLH(ts): + def getLH(ts) -> Optional[str]: """Format last heard""" return ( datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") if ts else None ) - def getTimeAgo(ts): + def getTimeAgo(ts) -> Optional[str]: """Format how long ago have we heard from this node (aka timeago).""" return ( timeago.format(datetime.fromtimestamp(ts), datetime.now()) @@ -156,7 +158,7 @@ class MeshInterface: else None ) - rows = [] + rows: List[Dict[str, Any]] = [] if self.nodesByNum: logging.debug(f"self.nodes:{self.nodes}") for node in self.nodesByNum.values(): @@ -225,7 +227,7 @@ class MeshInterface: print(table) return table - def getNode(self, nodeId, requestChannels=True): + def getNode(self, nodeId: str, requestChannels: bool=True) -> meshtastic.node.Node: """Return a node object which contains device settings and channel info""" if nodeId in (LOCAL_ADDR, BROADCAST_ADDR): return self.localNode @@ -242,11 +244,11 @@ class MeshInterface: def sendText( self, text: str, - destinationId=BROADCAST_ADDR, - wantAck=False, - wantResponse=False, - onResponse=None, - channelIndex=0, + destinationId: Union[int, str]=BROADCAST_ADDR, + wantAck: bool=False, + wantResponse: bool=False, + onResponse: Optional[Callable[[mesh_pb2.MeshPacket], Any]]=None, + channelIndex: int=0, ): """Send a utf8 string to some other node, if the node has a display it will also be shown on the device. @@ -281,12 +283,12 @@ class MeshInterface: def sendData( self, data, - destinationId=BROADCAST_ADDR, - portNum=portnums_pb2.PortNum.PRIVATE_APP, - wantAck=False, - wantResponse=False, - onResponse=None, - channelIndex=0, + destinationId: Union[int, str]=BROADCAST_ADDR, + portNum: portnums_pb2.PortNum.ValueType=portnums_pb2.PortNum.PRIVATE_APP, + wantAck: bool=False, + wantResponse: bool=False, + onResponse: Optional[Callable[[mesh_pb2.MeshPacket], Any]]=None, + channelIndex: int=0, ): """Send a data packet to some other node @@ -341,13 +343,13 @@ class MeshInterface: def sendPosition( self, - latitude=0.0, - longitude=0.0, - altitude=0, - timeSec=0, - destinationId=BROADCAST_ADDR, - wantAck=False, - wantResponse=False, + latitude: float=0.0, + longitude: float=0.0, + altitude: int=0, + timeSec: int=0, + destinationId: Union[int, str]=BROADCAST_ADDR, + wantAck: bool=False, + wantResponse: bool=False, ): """ Send a position packet to some other node (normally a broadcast) @@ -374,8 +376,8 @@ class MeshInterface: logging.debug(f"p.altitude:{p.altitude}") if timeSec == 0: - timeSec = time.time() # returns unix timestamp in seconds - p.time = int(timeSec) + timeSec = int(time.time()) # returns unix timestamp in seconds + p.time = timeSec logging.debug(f"p.time:{p.time}") return self.sendData( @@ -386,7 +388,7 @@ class MeshInterface: wantResponse=wantResponse, ) - def sendTraceRoute(self, dest, hopLimit): + def sendTraceRoute(self, dest: Union[int, str], hopLimit: int): """Send the trace route""" r = mesh_pb2.RouteDiscovery() self.sendData( @@ -397,7 +399,7 @@ class MeshInterface: onResponse=self.onResponseTraceRoute, ) # extend timeout based on number of nodes, limit by configured hopLimit - waitFactor = min(len(self.nodes) - 1, hopLimit) + waitFactor = min(len(self.nodes) - 1 if self.nodes else 0, hopLimit) self.waitForTraceRoute(waitFactor) def onResponseTraceRoute(self, p): @@ -480,10 +482,10 @@ class MeshInterface: if p["decoded"]["routing"]["errorReason"] == 'NO_RESPONSE': our_exit("No response from node. At least firmware 2.1.22 is required on the destination node.") - def _addResponseHandler(self, requestId, callback): + def _addResponseHandler(self, requestId: int, callback: Callable): self.responseHandlers[requestId] = ResponseHandler(callback) - def _sendPacket(self, meshPacket, destinationId=BROADCAST_ADDR, wantAck=False): + def _sendPacket(self, meshPacket: mesh_pb2.MeshPacket, destinationId: Union[int,str]=BROADCAST_ADDR, wantAck: bool=False): """Send a MeshPacket to the specified node (or if unspecified, broadcast). You probably don't want this - use sendData instead. @@ -497,7 +499,7 @@ class MeshInterface: toRadio = mesh_pb2.ToRadio() - nodeNum = 0 + nodeNum: int = 0 if destinationId is None: our_exit("Warning: destinationId must not be None") elif isinstance(destinationId, int): @@ -515,9 +517,10 @@ class MeshInterface: else: if self.nodes: node = self.nodes.get(destinationId) - if not node: + if node is None: our_exit(f"Warning: NodeId {destinationId} not found in DB") - nodeNum = node["num"] + else: + nodeNum = node["num"] else: logging.warning("Warning: There were no self.nodes.") @@ -569,9 +572,9 @@ class MeshInterface: if not success: raise MeshInterface.MeshInterfaceError("Timed out waiting for telemetry") - def getMyNodeInfo(self): + def getMyNodeInfo(self) -> Optional[Dict]: """Get info about my node.""" - if self.myInfo is None: + if self.myInfo is None or self.nodesByNum is None: return None logging.debug(f"self.nodesByNum:{self.nodesByNum}") return self.nodesByNum.get(self.myInfo.my_node_num) @@ -608,7 +611,7 @@ class MeshInterface: if self.failure: raise self.failure - def _generatePacketId(self): + def _generatePacketId(self) -> int: """Get a new unique packet ID""" if self.currentPacketId is None: raise MeshInterface.MeshInterfaceError("Not connected yet, can not generate packet") @@ -670,18 +673,18 @@ class MeshInterface: m.disconnect = True self._sendToRadio(m) - def _queueHasFreeSpace(self): + def _queueHasFreeSpace(self) -> bool: # We never got queueStatus, maybe the firmware is old if self.queueStatus is None: return True return self.queueStatus.free > 0 - def _queueClaim(self): + def _queueClaim(self) -> None: if self.queueStatus is None: return self.queueStatus.free -= 1 - def _sendToRadio(self, toRadio): + def _sendToRadio(self, toRadio: mesh_pb2.ToRadio) -> None: """Send a ToRadio protobuf to the device""" if self.noProto: logging.warning( @@ -730,18 +733,18 @@ class MeshInterface: self.queue[packetId] = packet # logging.warn("queue + resentQueue: " + " ".join(f'{k:08x}' for k in self.queue)) - def _sendToRadioImpl(self, toRadio): + def _sendToRadioImpl(self, toRadio: mesh_pb2.ToRadio) -> None: """Send a ToRadio protobuf to the device""" logging.error(f"Subclass must provide toradio: {toRadio}") - def _handleConfigComplete(self): + def _handleConfigComplete(self) -> None: """ Done with initial config messages, now send regular MeshPackets to ask for settings and channels """ self.localNode.requestChannels() - def _handleQueueStatusFromRadio(self, queueStatus): + def _handleQueueStatusFromRadio(self, queueStatus) -> None: self.queueStatus = queueStatus logging.debug( f"TX QUEUE free {queueStatus.free} of {queueStatus.maxlen}, res = {queueStatus.res}, id = {queueStatus.mesh_packet_id:08x} " @@ -892,7 +895,7 @@ class MeshInterface: else: logging.debug("Unexpected FromRadio payload") - def _fixupPosition(self, position): + def _fixupPosition(self, position: Dict) -> Dict: """Convert integer lat/lon into floats Arguments: diff --git a/meshtastic/node.py b/meshtastic/node.py index 1fc1b37..2860980 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -5,7 +5,7 @@ import base64 import logging import time -from typing import Union +from typing import Optional, Union from meshtastic import admin_pb2, apponly_pb2, channel_pb2, localonly_pb2, portnums_pb2 from meshtastic.util import ( @@ -759,9 +759,9 @@ class Node: def _sendAdmin( self, p: admin_pb2.AdminMessage, - wantResponse=True, + wantResponse: bool=True, onResponse=None, - adminIndex=0, + adminIndex: int=0, ): """Send an admin message to the specified node (or the local node if destNodeNum is zero)""" diff --git a/meshtastic/util.py b/meshtastic/util.py index cef3f3d..bdbec49 100644 --- a/meshtastic/util.py +++ b/meshtastic/util.py @@ -21,6 +21,8 @@ import serial.tools.list_ports # type: ignore[import-untyped] from meshtastic.supported_device import supported_devices from meshtastic.version import get_active_version +from typing import Union + """Some devices such as a seger jlink we never want to accidentally open""" blacklistVids = dict.fromkeys([0x1366]) @@ -153,16 +155,16 @@ class dotdict(dict): class Timeout: """Timeout class""" - def __init__(self, maxSecs=20): - self.expireTime = 0 - self.sleepInterval = 0.1 - self.expireTimeout = maxSecs + def __init__(self, maxSecs: int=20): + self.expireTime: Union[int, float] = 0 + self.sleepInterval: float = 0.1 + self.expireTimeout: int = maxSecs def reset(self): """Restart the waitForSet timer""" self.expireTime = time.time() + self.expireTimeout - def waitForSet(self, target, attrs=()): + def waitForSet(self, target, attrs=()) -> bool: """Block until the specified attributes are set. Returns True if config has been received.""" self.reset() while time.time() < self.expireTime: @@ -173,7 +175,7 @@ class Timeout: def waitForAckNak( self, acknowledgment, attrs=("receivedAck", "receivedNak", "receivedImplAck") - ): + ) -> bool: """Block until an ACK or NAK has been received. Returns True if ACK or NAK has been received.""" self.reset() while time.time() < self.expireTime: @@ -183,7 +185,7 @@ class Timeout: time.sleep(self.sleepInterval) return False - def waitForTraceRoute(self, waitFactor, acknowledgment, attr="receivedTraceRoute"): + def waitForTraceRoute(self, waitFactor, acknowledgment, attr="receivedTraceRoute") -> bool: """Block until traceroute response is received. Returns True if traceroute response has been received.""" self.expireTimeout *= waitFactor self.reset() @@ -194,7 +196,7 @@ class Timeout: time.sleep(self.sleepInterval) return False - def waitForTelemetry(self, acknowledgment): + def waitForTelemetry(self, acknowledgment) -> bool: """Block until telemetry response is received. Returns True if telemetry response has been received.""" self.reset() while time.time() < self.expireTime: