Add a variety of type annotations, primarily in mesh_interface

This commit is contained in:
Ian McEwen
2024-04-11 18:28:01 -07:00
parent b280d0ba23
commit f449ff9506
3 changed files with 78 additions and 73 deletions

View File

@@ -37,6 +37,8 @@ from meshtastic.util import (
message_to_json,
)
from typing import Any, Callable, Dict, List, Optional, Union
class MeshInterface:
"""Interface class for meshtastic devices
@@ -54,7 +56,7 @@ class MeshInterface:
self.message = message
super().__init__(self.message)
def __init__(self, debugOut=None, noProto=False):
def __init__(self, debugOut=None, noProto: bool=False) -> None:
"""Constructor
Keyword Arguments:
@@ -62,27 +64,27 @@ class MeshInterface:
link - just be a dumb serial client.
"""
self.debugOut = debugOut
self.nodes = None # FIXME
self.isConnected = threading.Event()
self.noProto = noProto
self.localNode = meshtastic.node.Node(self, -1) # We fixup nodenum later
self.myInfo = None # We don't have device info yet
self.metadata = None # We don't have device metadata yet
self.responseHandlers = {} # A map from request ID to the handler
self.nodes: Optional[Dict[str,Dict]] = None # FIXME
self.isConnected: threading.Event = threading.Event()
self.noProto: bool = noProto
self.localNode: meshtastic.node.Node = meshtastic.node.Node(self, -1) # We fixup nodenum later
self.myInfo: Optional[mesh_pb2.MyNodeInfo] = None # We don't have device info yet
self.metadata: Optional[mesh_pb2.DeviceMetadata] = None # We don't have device metadata yet
self.responseHandlers: Dict[int,ResponseHandler] = {} # A map from request ID to the handler
self.failure = (
None # If we've encountered a fatal exception it will be kept here
)
self._timeout = Timeout()
self._acknowledgment = Acknowledgment()
self.heartbeatTimer = None
self._timeout: Timeout = Timeout()
self._acknowledgment: Acknowledgment = Acknowledgment()
self.heartbeatTimer: Optional[threading.Timer] = None
random.seed() # FIXME, we should not clobber the random seedval here, instead tell user they must call it
self.currentPacketId = random.randint(0, 0xFFFFFFFF)
self.nodesByNum = None
self.configId = None
self.gotResponse = False # used in gpio read
self.mask = None # used in gpio read and gpio watch
self.queueStatus = None
self.queue = collections.OrderedDict()
self.currentPacketId: int = random.randint(0, 0xFFFFFFFF)
self.nodesByNum: Optional[Dict[int, Dict]] = None
self.configId: Optional[int] = None
self.gotResponse: bool = False # used in gpio read
self.mask: Optional[int] = None # used in gpio read and gpio watch
self.queueStatus: Optional[mesh_pb2.QueueStatus] = None
self.queue: collections.OrderedDict = collections.OrderedDict()
def close(self):
"""Shutdown this interface"""
@@ -103,7 +105,7 @@ class MeshInterface:
logging.error(f"Traceback: {traceback}")
self.close()
def showInfo(self, file=sys.stdout): # pylint: disable=W0613
def showInfo(self, file=sys.stdout) -> str: # pylint: disable=W0613
"""Show human readable summary about this object"""
owner = f"Owner: {self.getLongName()} ({self.getShortName()})"
myinfo = ""
@@ -135,20 +137,20 @@ class MeshInterface:
print(infos)
return infos
def showNodes(self, includeSelf=True, file=sys.stdout): # pylint: disable=W0613
def showNodes(self, includeSelf: bool=True, file=sys.stdout) -> str: # pylint: disable=W0613
"""Show table summary of nodes in mesh"""
def formatFloat(value, precision=2, unit=""):
def formatFloat(value, precision=2, unit="") -> Optional[str]:
"""Format a float value with precision."""
return f"{value:.{precision}f}{unit}" if value else None
def getLH(ts):
def getLH(ts) -> Optional[str]:
"""Format last heard"""
return (
datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") if ts else None
)
def getTimeAgo(ts):
def getTimeAgo(ts) -> Optional[str]:
"""Format how long ago have we heard from this node (aka timeago)."""
return (
timeago.format(datetime.fromtimestamp(ts), datetime.now())
@@ -156,7 +158,7 @@ class MeshInterface:
else None
)
rows = []
rows: List[Dict[str, Any]] = []
if self.nodesByNum:
logging.debug(f"self.nodes:{self.nodes}")
for node in self.nodesByNum.values():
@@ -225,7 +227,7 @@ class MeshInterface:
print(table)
return table
def getNode(self, nodeId, requestChannels=True):
def getNode(self, nodeId: str, requestChannels: bool=True) -> meshtastic.node.Node:
"""Return a node object which contains device settings and channel info"""
if nodeId in (LOCAL_ADDR, BROADCAST_ADDR):
return self.localNode
@@ -242,11 +244,11 @@ class MeshInterface:
def sendText(
self,
text: str,
destinationId=BROADCAST_ADDR,
wantAck=False,
wantResponse=False,
onResponse=None,
channelIndex=0,
destinationId: Union[int, str]=BROADCAST_ADDR,
wantAck: bool=False,
wantResponse: bool=False,
onResponse: Optional[Callable[[mesh_pb2.MeshPacket], Any]]=None,
channelIndex: int=0,
):
"""Send a utf8 string to some other node, if the node has a display it
will also be shown on the device.
@@ -281,12 +283,12 @@ class MeshInterface:
def sendData(
self,
data,
destinationId=BROADCAST_ADDR,
portNum=portnums_pb2.PortNum.PRIVATE_APP,
wantAck=False,
wantResponse=False,
onResponse=None,
channelIndex=0,
destinationId: Union[int, str]=BROADCAST_ADDR,
portNum: portnums_pb2.PortNum.ValueType=portnums_pb2.PortNum.PRIVATE_APP,
wantAck: bool=False,
wantResponse: bool=False,
onResponse: Optional[Callable[[mesh_pb2.MeshPacket], Any]]=None,
channelIndex: int=0,
):
"""Send a data packet to some other node
@@ -341,13 +343,13 @@ class MeshInterface:
def sendPosition(
self,
latitude=0.0,
longitude=0.0,
altitude=0,
timeSec=0,
destinationId=BROADCAST_ADDR,
wantAck=False,
wantResponse=False,
latitude: float=0.0,
longitude: float=0.0,
altitude: int=0,
timeSec: int=0,
destinationId: Union[int, str]=BROADCAST_ADDR,
wantAck: bool=False,
wantResponse: bool=False,
):
"""
Send a position packet to some other node (normally a broadcast)
@@ -374,8 +376,8 @@ class MeshInterface:
logging.debug(f"p.altitude:{p.altitude}")
if timeSec == 0:
timeSec = time.time() # returns unix timestamp in seconds
p.time = int(timeSec)
timeSec = int(time.time()) # returns unix timestamp in seconds
p.time = timeSec
logging.debug(f"p.time:{p.time}")
return self.sendData(
@@ -386,7 +388,7 @@ class MeshInterface:
wantResponse=wantResponse,
)
def sendTraceRoute(self, dest, hopLimit):
def sendTraceRoute(self, dest: Union[int, str], hopLimit: int):
"""Send the trace route"""
r = mesh_pb2.RouteDiscovery()
self.sendData(
@@ -397,7 +399,7 @@ class MeshInterface:
onResponse=self.onResponseTraceRoute,
)
# extend timeout based on number of nodes, limit by configured hopLimit
waitFactor = min(len(self.nodes) - 1, hopLimit)
waitFactor = min(len(self.nodes) - 1 if self.nodes else 0, hopLimit)
self.waitForTraceRoute(waitFactor)
def onResponseTraceRoute(self, p):
@@ -480,10 +482,10 @@ class MeshInterface:
if p["decoded"]["routing"]["errorReason"] == 'NO_RESPONSE':
our_exit("No response from node. At least firmware 2.1.22 is required on the destination node.")
def _addResponseHandler(self, requestId, callback):
def _addResponseHandler(self, requestId: int, callback: Callable):
self.responseHandlers[requestId] = ResponseHandler(callback)
def _sendPacket(self, meshPacket, destinationId=BROADCAST_ADDR, wantAck=False):
def _sendPacket(self, meshPacket: mesh_pb2.MeshPacket, destinationId: Union[int,str]=BROADCAST_ADDR, wantAck: bool=False):
"""Send a MeshPacket to the specified node (or if unspecified, broadcast).
You probably don't want this - use sendData instead.
@@ -497,7 +499,7 @@ class MeshInterface:
toRadio = mesh_pb2.ToRadio()
nodeNum = 0
nodeNum: int = 0
if destinationId is None:
our_exit("Warning: destinationId must not be None")
elif isinstance(destinationId, int):
@@ -515,9 +517,10 @@ class MeshInterface:
else:
if self.nodes:
node = self.nodes.get(destinationId)
if not node:
if node is None:
our_exit(f"Warning: NodeId {destinationId} not found in DB")
nodeNum = node["num"]
else:
nodeNum = node["num"]
else:
logging.warning("Warning: There were no self.nodes.")
@@ -569,9 +572,9 @@ class MeshInterface:
if not success:
raise MeshInterface.MeshInterfaceError("Timed out waiting for telemetry")
def getMyNodeInfo(self):
def getMyNodeInfo(self) -> Optional[Dict]:
"""Get info about my node."""
if self.myInfo is None:
if self.myInfo is None or self.nodesByNum is None:
return None
logging.debug(f"self.nodesByNum:{self.nodesByNum}")
return self.nodesByNum.get(self.myInfo.my_node_num)
@@ -608,7 +611,7 @@ class MeshInterface:
if self.failure:
raise self.failure
def _generatePacketId(self):
def _generatePacketId(self) -> int:
"""Get a new unique packet ID"""
if self.currentPacketId is None:
raise MeshInterface.MeshInterfaceError("Not connected yet, can not generate packet")
@@ -670,18 +673,18 @@ class MeshInterface:
m.disconnect = True
self._sendToRadio(m)
def _queueHasFreeSpace(self):
def _queueHasFreeSpace(self) -> bool:
# We never got queueStatus, maybe the firmware is old
if self.queueStatus is None:
return True
return self.queueStatus.free > 0
def _queueClaim(self):
def _queueClaim(self) -> None:
if self.queueStatus is None:
return
self.queueStatus.free -= 1
def _sendToRadio(self, toRadio):
def _sendToRadio(self, toRadio: mesh_pb2.ToRadio) -> None:
"""Send a ToRadio protobuf to the device"""
if self.noProto:
logging.warning(
@@ -730,18 +733,18 @@ class MeshInterface:
self.queue[packetId] = packet
# logging.warn("queue + resentQueue: " + " ".join(f'{k:08x}' for k in self.queue))
def _sendToRadioImpl(self, toRadio):
def _sendToRadioImpl(self, toRadio: mesh_pb2.ToRadio) -> None:
"""Send a ToRadio protobuf to the device"""
logging.error(f"Subclass must provide toradio: {toRadio}")
def _handleConfigComplete(self):
def _handleConfigComplete(self) -> None:
"""
Done with initial config messages, now send regular MeshPackets
to ask for settings and channels
"""
self.localNode.requestChannels()
def _handleQueueStatusFromRadio(self, queueStatus):
def _handleQueueStatusFromRadio(self, queueStatus) -> None:
self.queueStatus = queueStatus
logging.debug(
f"TX QUEUE free {queueStatus.free} of {queueStatus.maxlen}, res = {queueStatus.res}, id = {queueStatus.mesh_packet_id:08x} "
@@ -892,7 +895,7 @@ class MeshInterface:
else:
logging.debug("Unexpected FromRadio payload")
def _fixupPosition(self, position):
def _fixupPosition(self, position: Dict) -> Dict:
"""Convert integer lat/lon into floats
Arguments:

View File

@@ -5,7 +5,7 @@ import base64
import logging
import time
from typing import Union
from typing import Optional, Union
from meshtastic import admin_pb2, apponly_pb2, channel_pb2, localonly_pb2, portnums_pb2
from meshtastic.util import (
@@ -759,9 +759,9 @@ class Node:
def _sendAdmin(
self,
p: admin_pb2.AdminMessage,
wantResponse=True,
wantResponse: bool=True,
onResponse=None,
adminIndex=0,
adminIndex: int=0,
):
"""Send an admin message to the specified node (or the local node if destNodeNum is zero)"""

View File

@@ -21,6 +21,8 @@ import serial.tools.list_ports # type: ignore[import-untyped]
from meshtastic.supported_device import supported_devices
from meshtastic.version import get_active_version
from typing import Union
"""Some devices such as a seger jlink we never want to accidentally open"""
blacklistVids = dict.fromkeys([0x1366])
@@ -153,16 +155,16 @@ class dotdict(dict):
class Timeout:
"""Timeout class"""
def __init__(self, maxSecs=20):
self.expireTime = 0
self.sleepInterval = 0.1
self.expireTimeout = maxSecs
def __init__(self, maxSecs: int=20):
self.expireTime: Union[int, float] = 0
self.sleepInterval: float = 0.1
self.expireTimeout: int = maxSecs
def reset(self):
"""Restart the waitForSet timer"""
self.expireTime = time.time() + self.expireTimeout
def waitForSet(self, target, attrs=()):
def waitForSet(self, target, attrs=()) -> bool:
"""Block until the specified attributes are set. Returns True if config has been received."""
self.reset()
while time.time() < self.expireTime:
@@ -173,7 +175,7 @@ class Timeout:
def waitForAckNak(
self, acknowledgment, attrs=("receivedAck", "receivedNak", "receivedImplAck")
):
) -> bool:
"""Block until an ACK or NAK has been received. Returns True if ACK or NAK has been received."""
self.reset()
while time.time() < self.expireTime:
@@ -183,7 +185,7 @@ class Timeout:
time.sleep(self.sleepInterval)
return False
def waitForTraceRoute(self, waitFactor, acknowledgment, attr="receivedTraceRoute"):
def waitForTraceRoute(self, waitFactor, acknowledgment, attr="receivedTraceRoute") -> bool:
"""Block until traceroute response is received. Returns True if traceroute response has been received."""
self.expireTimeout *= waitFactor
self.reset()
@@ -194,7 +196,7 @@ class Timeout:
time.sleep(self.sleepInterval)
return False
def waitForTelemetry(self, acknowledgment):
def waitForTelemetry(self, acknowledgment) -> bool:
"""Block until telemetry response is received. Returns True if telemetry response has been received."""
self.reset()
while time.time() < self.expireTime: