diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index a7c8b74..42df8ae 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -10,21 +10,14 @@ import threading import time from datetime import datetime from decimal import Decimal - from typing import Any, Callable, Dict, List, Optional, Union import google.protobuf.json_format -from pubsub import pub # type: ignore[import-untyped] -from tabulate import tabulate import print_color # type: ignore[import-untyped] +from pubsub import pub # type: ignore[import-untyped] +from tabulate import tabulate import meshtastic.node - -from meshtastic.protobuf import ( - mesh_pb2, - portnums_pb2, - telemetry_pb2, -) from meshtastic import ( BROADCAST_ADDR, BROADCAST_NUM, @@ -32,18 +25,20 @@ from meshtastic import ( NODELESS_WANT_CONFIG_ID, ResponseHandler, protocols, - publishingThread + publishingThread, ) +from meshtastic.protobuf import mesh_pb2, portnums_pb2, telemetry_pb2 from meshtastic.util import ( Acknowledgment, Timeout, convert_mac_addr, + message_to_json, our_exit, remove_keys_from_dict, stripnl, - message_to_json, ) + def _timeago(delta_secs: int) -> str: """Convert a number of seconds in the past into a short, friendly string e.g. "now", "30 sec ago", "1 hour ago" @@ -67,7 +62,7 @@ def _timeago(delta_secs: int) -> str: return "now" -class MeshInterface: # pylint: disable=R0902 +class MeshInterface: # pylint: disable=R0902 """Interface class for meshtastic devices Properties: @@ -79,11 +74,14 @@ class MeshInterface: # pylint: disable=R0902 class MeshInterfaceError(Exception): """An exception class for general mesh interface errors""" + def __init__(self, message): self.message = message super().__init__(self.message) - def __init__(self, debugOut=None, noProto: bool=False, noNodes: bool=False) -> None: + def __init__( + self, debugOut=None, noProto: bool = False, noNodes: bool = False + ) -> None: """Constructor Keyword Arguments: @@ -93,13 +91,21 @@ class MeshInterface: # pylint: disable=R0902 on startup, just other configuration information. """ self.debugOut = debugOut - self.nodes: Optional[Dict[str,Dict]] = None # FIXME + self.nodes: Optional[Dict[str, Dict]] = None # FIXME self.isConnected: threading.Event = threading.Event() self.noProto: bool = noProto - self.localNode: meshtastic.node.Node = meshtastic.node.Node(self, -1) # We fixup nodenum later - self.myInfo: Optional[mesh_pb2.MyNodeInfo] = None # We don't have device info yet - self.metadata: Optional[mesh_pb2.DeviceMetadata] = None # We don't have device metadata yet - self.responseHandlers: Dict[int,ResponseHandler] = {} # A map from request ID to the handler + self.localNode: meshtastic.node.Node = meshtastic.node.Node( + self, -1 + ) # We fixup nodenum later + self.myInfo: Optional[ + mesh_pb2.MyNodeInfo + ] = None # We don't have device info yet + self.metadata: Optional[ + mesh_pb2.DeviceMetadata + ] = None # We don't have device metadata yet + self.responseHandlers: Dict[ + int, ResponseHandler + ] = {} # A map from request ID to the handler self.failure = ( None # If we've encountered a fatal exception it will be kept here ) @@ -162,6 +168,12 @@ class MeshInterface: # pylint: disable=R0902 def _handleLogLine(self, line: str) -> None: """Handle a line of log output from the device.""" + + # Devices should _not_ be including a newline at the end of each log-line str (especially when + # encapsulated as a LogRecord). But to cope with old device loads, we check for that and fix it here: + if line.endswith("\n"): + line = line[:-1] + pub.sendMessage("meshtastic.log.line", line=line, interface=self) def _handleLogRecord(self, record: mesh_pb2.LogRecord) -> None: @@ -201,7 +213,9 @@ class MeshInterface: # pylint: disable=R0902 print(infos) return infos - def showNodes(self, includeSelf: bool=True, file=sys.stdout) -> str: # pylint: disable=W0613 + def showNodes( + self, includeSelf: bool = True, file=sys.stdout + ) -> str: # pylint: disable=W0613 """Show table summary of nodes in mesh""" def formatFloat(value, precision=2, unit="") -> Optional[str]: @@ -232,7 +246,11 @@ class MeshInterface: # pylint: disable=R0902 continue presumptive_id = f"!{node['num']:08x}" - row = {"N": 0, "User": f"Meshtastic {presumptive_id[-4:]}", "ID": presumptive_id} + row = { + "N": 0, + "User": f"Meshtastic {presumptive_id[-4:]}", + "ID": presumptive_id, + } user = node.get("user") if user: @@ -241,7 +259,7 @@ class MeshInterface: # pylint: disable=R0902 "User": user.get("longName", "N/A"), "AKA": user.get("shortName", "N/A"), "ID": user["id"], - "Hardware": user.get("hwModel", "UNSET") + "Hardware": user.get("hwModel", "UNSET"), } ) @@ -295,7 +313,9 @@ class MeshInterface: # pylint: disable=R0902 print(table) return table - def getNode(self, nodeId: str, requestChannels: bool=True) -> meshtastic.node.Node: + def getNode( + self, nodeId: str, requestChannels: bool = True + ) -> meshtastic.node.Node: """Return a node object which contains device settings and channel info""" if nodeId in (LOCAL_ADDR, BROADCAST_ADDR): return self.localNode @@ -312,11 +332,11 @@ class MeshInterface: # pylint: disable=R0902 def sendText( self, text: str, - destinationId: Union[int, str]=BROADCAST_ADDR, - wantAck: bool=False, - wantResponse: bool=False, - onResponse: Optional[Callable[[dict], Any]]=None, - channelIndex: int=0, + destinationId: Union[int, str] = BROADCAST_ADDR, + wantAck: bool = False, + wantResponse: bool = False, + onResponse: Optional[Callable[[dict], Any]] = None, + channelIndex: int = 0, ): """Send a utf8 string to some other node, if the node has a display it will also be shown on the device. @@ -351,13 +371,13 @@ class MeshInterface: # pylint: disable=R0902 def sendData( self, data, - destinationId: Union[int, str]=BROADCAST_ADDR, - portNum: portnums_pb2.PortNum.ValueType=portnums_pb2.PortNum.PRIVATE_APP, - wantAck: bool=False, - wantResponse: bool=False, - onResponse: Optional[Callable[[dict], Any]]=None, - onResponseAckPermitted: bool=False, - channelIndex: int=0, + destinationId: Union[int, str] = BROADCAST_ADDR, + portNum: portnums_pb2.PortNum.ValueType = portnums_pb2.PortNum.PRIVATE_APP, + wantAck: bool = False, + wantResponse: bool = False, + onResponse: Optional[Callable[[dict], Any]] = None, + onResponseAckPermitted: bool = False, + channelIndex: int = 0, ): """Send a data packet to some other node @@ -411,20 +431,22 @@ class MeshInterface: # pylint: disable=R0902 if onResponse is not None: logging.debug(f"Setting a response handler for requestId {meshPacket.id}") - self._addResponseHandler(meshPacket.id, onResponse, ackPermitted=onResponseAckPermitted) + self._addResponseHandler( + meshPacket.id, onResponse, ackPermitted=onResponseAckPermitted + ) p = self._sendPacket(meshPacket, destinationId, wantAck=wantAck) return p def sendPosition( self, - latitude: float=0.0, - longitude: float=0.0, - altitude: int=0, - timeSec: int=0, - destinationId: Union[int, str]=BROADCAST_ADDR, - wantAck: bool=False, - wantResponse: bool=False, - channelIndex: int=0, + latitude: float = 0.0, + longitude: float = 0.0, + altitude: int = 0, + timeSec: int = 0, + destinationId: Union[int, str] = BROADCAST_ADDR, + wantAck: bool = False, + wantResponse: bool = False, + channelIndex: int = 0, ): """ Send a position packet to some other node (normally a broadcast) @@ -475,20 +497,22 @@ class MeshInterface: # pylint: disable=R0902 def onResponsePosition(self, p): """on response for position""" - if p["decoded"]["portnum"] == 'POSITION_APP': + if p["decoded"]["portnum"] == "POSITION_APP": self._acknowledgment.receivedPosition = True position = mesh_pb2.Position() position.ParseFromString(p["decoded"]["payload"]) ret = "Position received: " if position.latitude_i != 0 and position.longitude_i != 0: - ret += f"({position.latitude_i * 10**-7}, {position.longitude_i * 10**-7})" + ret += ( + f"({position.latitude_i * 10**-7}, {position.longitude_i * 10**-7})" + ) else: ret += "(unknown)" if position.altitude != 0: ret += f" {position.altitude}m" - if position.precision_bits not in [0,32]: + if position.precision_bits not in [0, 32]: ret += f" precision:{position.precision_bits}" elif position.precision_bits == 32: ret += " full precision" @@ -497,11 +521,15 @@ class MeshInterface: # pylint: disable=R0902 print(ret) - elif p["decoded"]["portnum"] == 'ROUTING_APP': - if p["decoded"]["routing"]["errorReason"] == 'NO_RESPONSE': - our_exit("No response from node. At least firmware 2.1.22 is required on the destination node.") + elif p["decoded"]["portnum"] == "ROUTING_APP": + if p["decoded"]["routing"]["errorReason"] == "NO_RESPONSE": + our_exit( + "No response from node. At least firmware 2.1.22 is required on the destination node." + ) - def sendTraceRoute(self, dest: Union[int, str], hopLimit: int, channelIndex: int=0): + def sendTraceRoute( + self, dest: Union[int, str], hopLimit: int, channelIndex: int = 0 + ): """Send the trace route""" r = mesh_pb2.RouteDiscovery() self.sendData( @@ -532,12 +560,19 @@ class MeshInterface: # pylint: disable=R0902 self._acknowledgment.receivedTraceRoute = True - def sendTelemetry(self, destinationId: Union[int,str]=BROADCAST_ADDR, wantResponse: bool=False, channelIndex: int=0): + def sendTelemetry( + self, + destinationId: Union[int, str] = BROADCAST_ADDR, + wantResponse: bool = False, + channelIndex: int = 0, + ): """Send telemetry and optionally ask for a response""" r = telemetry_pb2.Telemetry() if self.nodes is not None: - node = next(n for n in self.nodes.values() if n["num"] == self.localNode.nodeNum) + node = next( + n for n in self.nodes.values() if n["num"] == self.localNode.nodeNum + ) if node is not None: metrics = node.get("deviceMetrics") if metrics: @@ -572,7 +607,7 @@ class MeshInterface: # pylint: disable=R0902 def onResponseTelemetry(self, p: dict): """on response for telemetry""" - if p["decoded"]["portnum"] == 'TELEMETRY_APP': + if p["decoded"]["portnum"] == "TELEMETRY_APP": self._acknowledgment.receivedTelemetry = True telemetry = telemetry_pb2.Telemetry() telemetry.ParseFromString(p["decoded"]["payload"]) @@ -587,16 +622,32 @@ class MeshInterface: # pylint: disable=R0902 f"Total channel utilization: {telemetry.device_metrics.channel_utilization:.2f}%" ) if telemetry.device_metrics.air_util_tx is not None: - print(f"Transmit air utilization: {telemetry.device_metrics.air_util_tx:.2f}%") + print( + f"Transmit air utilization: {telemetry.device_metrics.air_util_tx:.2f}%" + ) - elif p["decoded"]["portnum"] == 'ROUTING_APP': - if p["decoded"]["routing"]["errorReason"] == 'NO_RESPONSE': - our_exit("No response from node. At least firmware 2.1.22 is required on the destination node.") + elif p["decoded"]["portnum"] == "ROUTING_APP": + if p["decoded"]["routing"]["errorReason"] == "NO_RESPONSE": + our_exit( + "No response from node. At least firmware 2.1.22 is required on the destination node." + ) - def _addResponseHandler(self, requestId: int, callback: Callable[[dict], Any], ackPermitted: bool=False): - self.responseHandlers[requestId] = ResponseHandler(callback=callback, ackPermitted=ackPermitted) + def _addResponseHandler( + self, + requestId: int, + callback: Callable[[dict], Any], + ackPermitted: bool = False, + ): + self.responseHandlers[requestId] = ResponseHandler( + callback=callback, ackPermitted=ackPermitted + ) - def _sendPacket(self, meshPacket: mesh_pb2.MeshPacket, destinationId: Union[int,str]=BROADCAST_ADDR, wantAck: bool=False): + def _sendPacket( + self, + meshPacket: mesh_pb2.MeshPacket, + destinationId: Union[int, str] = BROADCAST_ADDR, + wantAck: bool = False, + ): """Send a MeshPacket to the specified node (or if unspecified, broadcast). You probably don't want this - use sendData instead. @@ -663,13 +714,17 @@ class MeshInterface: # pylint: disable=R0902 and self.localNode.waitForConfig() ) if not success: - raise MeshInterface.MeshInterfaceError("Timed out waiting for interface config") + raise MeshInterface.MeshInterfaceError( + "Timed out waiting for interface config" + ) def waitForAckNak(self): """Wait for the ack/nak""" success = self._timeout.waitForAckNak(self._acknowledgment) if not success: - raise MeshInterface.MeshInterfaceError("Timed out waiting for an acknowledgment") + raise MeshInterface.MeshInterfaceError( + "Timed out waiting for an acknowledgment" + ) def waitForTraceRoute(self, waitFactor): """Wait for trace route""" @@ -722,7 +777,9 @@ class MeshInterface: # pylint: disable=R0902 and raise an exception""" if not self.noProto: if not self.isConnected.wait(timeout): # timeout after x seconds - raise MeshInterface.MeshInterfaceError("Timed out waiting for connection completion") + raise MeshInterface.MeshInterfaceError( + "Timed out waiting for connection completion" + ) # If we failed while connecting, raise the connection to the client if self.failure: @@ -731,7 +788,9 @@ class MeshInterface: # pylint: disable=R0902 def _generatePacketId(self) -> int: """Get a new unique packet ID""" if self.currentPacketId is None: - raise MeshInterface.MeshInterfaceError("Not connected yet, can not generate packet") + raise MeshInterface.MeshInterfaceError( + "Not connected yet, can not generate packet" + ) else: self.currentPacketId = (self.currentPacketId + 1) & 0xFFFFFFFF return self.currentPacketId @@ -778,7 +837,9 @@ class MeshInterface: # pylint: disable=R0902 self.myInfo = None self.nodes = {} # nodes keyed by ID self.nodesByNum = {} # nodes keyed by nodenum - self._localChannels = [] # empty until we start getting channels pushed from the device (during config) + self._localChannels = ( + [] + ) # empty until we start getting channels pushed from the device (during config) startConfig = mesh_pb2.ToRadio() if self.configId is None or not self.noNodes: @@ -927,7 +988,7 @@ class MeshInterface: # pylint: disable=R0902 logging.debug("Node without position") # no longer necessary since we're mutating directly in nodesByNum via _getOrCreateByNum - #self.nodesByNum[node["num"]] = node + # self.nodesByNum[node["num"]] = node if "user" in node: # Some nodes might not have user/ids assigned yet if "id" in node["user"]: self.nodes[node["user"]["id"]] = node @@ -953,14 +1014,18 @@ class MeshInterface: # pylint: disable=R0902 elif fromRadio.HasField("mqttClientProxyMessage"): publishingThread.queueWork( lambda: pub.sendMessage( - "meshtastic.mqttclientproxymessage", proxymessage=fromRadio.mqttClientProxyMessage, interface=self + "meshtastic.mqttclientproxymessage", + proxymessage=fromRadio.mqttClientProxyMessage, + interface=self, ) ) elif fromRadio.HasField("xmodemPacket"): publishingThread.queueWork( lambda: pub.sendMessage( - "meshtastic.xmodempacket", packet=fromRadio.xmodemPacket, interface=self + "meshtastic.xmodempacket", + packet=fromRadio.xmodemPacket, + interface=self, ) ) @@ -1067,7 +1132,7 @@ class MeshInterface: # pylint: disable=R0902 return BROADCAST_ADDR try: - return self.nodesByNum[num]["user"]["id"] #type: ignore[index] + return self.nodesByNum[num]["user"]["id"] # type: ignore[index] except: logging.debug(f"Node {num} not found for fromId") return None @@ -1075,7 +1140,9 @@ class MeshInterface: # pylint: disable=R0902 def _getOrCreateByNum(self, nodeNum): """Given a nodenum find the NodeInfo in the DB (or create if necessary)""" if nodeNum == BROADCAST_NUM: - raise MeshInterface.MeshInterfaceError("Can not create/find nodenum by the broadcast num") + raise MeshInterface.MeshInterfaceError( + "Can not create/find nodenum by the broadcast num" + ) if nodeNum in self.nodesByNum: return self.nodesByNum[nodeNum] @@ -1087,9 +1154,9 @@ class MeshInterface: # pylint: disable=R0902 "id": presumptive_id, "longName": f"Meshtastic {presumptive_id[-4:]}", "shortName": f"{presumptive_id[-4:]}", - "hwModel": "UNSET" - } - } # Create a minimal node db entry + "hwModel": "UNSET", + }, + } # Create a minimal node db entry self.nodesByNum[nodeNum] = n return n @@ -1198,13 +1265,21 @@ class MeshInterface: # pylint: disable=R0902 # or the handler is set as ackPermitted, but send NAKs and # other, data-containing responses to the handlers routing = decoded.get("routing") - isAck = routing is not None and ("errorReason" not in routing or routing["errorReason"] == "NONE") + isAck = routing is not None and ( + "errorReason" not in routing or routing["errorReason"] == "NONE" + ) # we keep the responseHandler in dict until we actually call it handler = self.responseHandlers.get(requestId, None) if handler is not None: - if (not isAck) or handler.callback.__name__ == "onAckNak" or handler.ackPermitted: + if ( + (not isAck) + or handler.callback.__name__ == "onAckNak" + or handler.ackPermitted + ): handler = self.responseHandlers.pop(requestId, None) - logging.debug(f"Calling response handler for requestId {requestId}") + logging.debug( + f"Calling response handler for requestId {requestId}" + ) handler.callback(asDict) logging.debug(f"Publishing {topic}: packet={stripnl(asDict)} ")