+
+Expand source code
+
+""" Mesh Interface class
+"""
+import sys
+import random
+import time
+import logging
+from typing import AnyStr
+import threading
+from datetime import datetime
+import timeago
+from tabulate import tabulate
+
+import google.protobuf.json_format
+
+from pubsub import pub
+from google.protobuf.json_format import MessageToJson
+
+
+from . import portnums_pb2, mesh_pb2
+from .util import stripnl, Timeout, our_exit
+from .node import Node
+from .__init__ import LOCAL_ADDR, BROADCAST_NUM, BROADCAST_ADDR, ResponseHandler, publishingThread, OUR_APP_VERSION, protocols
+
+
+defaultHopLimit = 3
+
+
+class MeshInterface:
+ """Interface class for meshtastic devices
+
+ Properties:
+
+ isConnected
+ nodes
+ debugOut
+ """
+
+ def __init__(self, debugOut=None, noProto=False):
+ """Constructor
+
+ Keyword Arguments:
+ noProto -- If True, don't try to run our protocol on the link - just be a dumb serial client.
+ """
+ self.debugOut = debugOut
+ self.nodes = None # FIXME
+ self.isConnected = threading.Event()
+ self.noProto = noProto
+ self.localNode = Node(self, -1) # We fixup nodenum later
+ self.myInfo = None # We don't have device info yet
+ self.responseHandlers = {} # A map from request ID to the handler
+ self.failure = None # If we've encountered a fatal exception it will be kept here
+ self._timeout = Timeout()
+ self.heartbeatTimer = None
+ random.seed() # FIXME, we should not clobber the random seedval here, instead tell user they must call it
+ self.currentPacketId = random.randint(0, 0xffffffff)
+
+ def close(self):
+ """Shutdown this interface"""
+ if self.heartbeatTimer:
+ self.heartbeatTimer.cancel()
+
+ self._sendDisconnect()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if exc_type is not None and exc_value is not None:
+ logging.error(
+ f'An exception of type {exc_type} with value {exc_value} has occurred')
+ if traceback is not None:
+ logging.error(f'Traceback: {traceback}')
+ self.close()
+
+ def showInfo(self, file=sys.stdout):
+ """Show human readable summary about this object"""
+ owner = f"Owner: {self.getLongName()} ({self.getShortName()})"
+ myinfo = ''
+ if self.myInfo:
+ myinfo = f"\nMy info: {stripnl(MessageToJson(self.myInfo))}"
+ mesh = "\nNodes in mesh:"
+ nodes = ""
+ if self.nodes:
+ for n in self.nodes.values():
+ nodes = nodes + f" {stripnl(n)}"
+ infos = owner + myinfo + mesh + nodes
+ print(infos)
+ return infos
+
+ def showNodes(self, includeSelf=True, file=sys.stdout):
+ """Show table summary of nodes in mesh"""
+ def formatFloat(value, precision=2, unit=''):
+ return f'{value:.{precision}f}{unit}' if value else None
+
+ def getLH(ts):
+ return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') if ts else None
+
+ def getTimeAgo(ts):
+ return timeago.format(datetime.fromtimestamp(ts), datetime.now()) if ts else None
+
+ rows = []
+ for node in self.nodes.values():
+ if not includeSelf and node['num'] == self.localNode.nodeNum:
+ continue
+
+ row = {"N": 0}
+
+ user = node.get('user')
+ if user:
+ row.update({
+ "User": user['longName'],
+ "AKA": user['shortName'],
+ "ID": user['id'],
+ })
+
+ pos = node.get('position')
+ if pos:
+ row.update({
+ "Latitude": formatFloat(pos.get("latitude"), 4, "°"),
+ "Longitude": formatFloat(pos.get("longitude"), 4, "°"),
+ "Altitude": formatFloat(pos.get("altitude"), 0, " m"),
+ "Battery": formatFloat(pos.get("batteryLevel"), 2, "%"),
+ })
+
+ row.update({
+ "SNR": formatFloat(node.get("snr"), 2, " dB"),
+ "LastHeard": getLH(node.get("lastHeard")),
+ "Since": getTimeAgo(node.get("lastHeard")),
+ })
+
+ rows.append(row)
+
+ # Why doesn't this way work?
+ #rows.sort(key=lambda r: r.get('LastHeard', '0000'), reverse=True)
+ rows.sort(key=lambda r: r.get('LastHeard') or '0000', reverse=True)
+ for i, row in enumerate(rows):
+ row['N'] = i+1
+
+ table = tabulate(rows, headers='keys', missingval='N/A',
+ tablefmt='fancy_grid')
+ print(table)
+ return table
+
+
+ def getNode(self, nodeId):
+ """Return a node object which contains device settings and channel info"""
+ if nodeId == LOCAL_ADDR:
+ return self.localNode
+ else:
+ n = Node(self, nodeId)
+ n.requestConfig()
+ if not n.waitForConfig():
+ our_exit("Error: Timed out waiting for node config")
+ return n
+
+ def sendText(self, text: AnyStr,
+ destinationId=BROADCAST_ADDR,
+ wantAck=False,
+ wantResponse=False,
+ hopLimit=defaultHopLimit,
+ onResponse=None,
+ channelIndex=0):
+ """Send a utf8 string to some other node, if the node has a display it will also be shown on the device.
+
+ Arguments:
+ text {string} -- The text to send
+
+ Keyword Arguments:
+ destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
+ portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
+ wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
+ wantResponse -- True if you want the service on the other side to send an application layer response
+
+ Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
+ """
+ return self.sendData(text.encode("utf-8"), destinationId,
+ portNum=portnums_pb2.PortNum.TEXT_MESSAGE_APP,
+ wantAck=wantAck,
+ wantResponse=wantResponse,
+ hopLimit=hopLimit,
+ onResponse=onResponse,
+ channelIndex=channelIndex)
+
+ def sendData(self, data, destinationId=BROADCAST_ADDR,
+ portNum=portnums_pb2.PortNum.PRIVATE_APP, wantAck=False,
+ wantResponse=False,
+ hopLimit=defaultHopLimit,
+ onResponse=None,
+ channelIndex=0):
+ """Send a data packet to some other node
+
+ Keyword Arguments:
+ data -- the data to send, either as an array of bytes or as a protobuf (which will be automatically serialized to bytes)
+ destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
+ portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
+ wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
+ wantResponse -- True if you want the service on the other side to send an application layer response
+ onResponse -- A closure of the form funct(packet), that will be called when a response packet arrives
+ (or the transaction is NAKed due to non receipt)
+
+ Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
+ """
+ if getattr(data, "SerializeToString", None):
+ logging.debug(f"Serializing protobuf as data: {stripnl(data)}")
+ data = data.SerializeToString()
+
+ if len(data) > mesh_pb2.Constants.DATA_PAYLOAD_LEN:
+ Exception("Data payload too big")
+
+ if portNum == portnums_pb2.PortNum.UNKNOWN_APP: # we are now more strict wrt port numbers
+ our_exit("Warning: A non-zero port number must be specified")
+
+ meshPacket = mesh_pb2.MeshPacket()
+ meshPacket.channel = channelIndex
+ meshPacket.decoded.payload = data
+ meshPacket.decoded.portnum = portNum
+ meshPacket.decoded.want_response = wantResponse
+
+ p = self._sendPacket(meshPacket, destinationId,
+ wantAck=wantAck, hopLimit=hopLimit)
+ if onResponse is not None:
+ self._addResponseHandler(p.id, onResponse)
+ return p
+
+ def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False):
+ """
+ Send a position packet to some other node (normally a broadcast)
+
+ Also, the device software will notice this packet and use it to automatically set its notion of
+ the local position.
+
+ If timeSec is not specified (recommended), we will use the local machine time.
+
+ Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
+ """
+ p = mesh_pb2.Position()
+ if latitude != 0.0:
+ p.latitude_i = int(latitude / 1e-7)
+
+ if longitude != 0.0:
+ p.longitude_i = int(longitude / 1e-7)
+
+ if altitude != 0:
+ p.altitude = int(altitude)
+
+ if timeSec == 0:
+ timeSec = time.time() # returns unix timestamp in seconds
+ p.time = int(timeSec)
+
+ return self.sendData(p, destinationId,
+ portNum=portnums_pb2.PortNum.POSITION_APP,
+ wantAck=wantAck,
+ wantResponse=wantResponse)
+
+ def _addResponseHandler(self, requestId, callback):
+ self.responseHandlers[requestId] = ResponseHandler(callback)
+
+ def _sendPacket(self, meshPacket,
+ destinationId=BROADCAST_ADDR,
+ wantAck=False, hopLimit=defaultHopLimit):
+ """Send a MeshPacket to the specified node (or if unspecified, broadcast).
+ You probably don't want this - use sendData instead.
+
+ Returns the sent packet. The id field will be populated in this packet and
+ can be used to track future message acks/naks.
+ """
+
+ # We allow users to talk to the local node before we've completed the full connection flow...
+ if(self.myInfo is not None and destinationId != self.myInfo.my_node_num):
+ self._waitConnected()
+
+ toRadio = mesh_pb2.ToRadio()
+
+ if destinationId is None:
+ our_exit("Warning: destinationId must not be None")
+ elif isinstance(destinationId, int):
+ nodeNum = destinationId
+ elif destinationId == BROADCAST_ADDR:
+ nodeNum = BROADCAST_NUM
+ elif destinationId == LOCAL_ADDR:
+ nodeNum = self.myInfo.my_node_num
+ # A simple hex style nodeid - we can parse this without needing the DB
+ elif destinationId.startswith("!"):
+ nodeNum = int(destinationId[1:], 16)
+ else:
+ node = self.nodes.get(destinationId)
+ if not node:
+ our_exit(f"Warning: NodeId {destinationId} not found in DB")
+ nodeNum = node['num']
+
+ meshPacket.to = nodeNum
+ meshPacket.want_ack = wantAck
+ meshPacket.hop_limit = hopLimit
+
+ # if the user hasn't set an ID for this packet (likely and recommended), we should pick a new unique ID
+ # so the message can be tracked.
+ if meshPacket.id == 0:
+ meshPacket.id = self._generatePacketId()
+
+ toRadio.packet.CopyFrom(meshPacket)
+ #logging.debug(f"Sending packet: {stripnl(meshPacket)}")
+ self._sendToRadio(toRadio)
+ return meshPacket
+
+ def waitForConfig(self):
+ """Block until radio config is received. Returns True if config has been received."""
+ success = self._timeout.waitForSet(self, attrs=('myInfo', 'nodes')
+ ) and self.localNode.waitForConfig()
+ if not success:
+ raise Exception("Timed out waiting for interface config")
+
+ def getMyNodeInfo(self):
+ """Get info about my node."""
+ if self.myInfo is None:
+ return None
+ return self.nodesByNum.get(self.myInfo.my_node_num)
+
+ def getMyUser(self):
+ """Get user"""
+ nodeInfo = self.getMyNodeInfo()
+ if nodeInfo is not None:
+ return nodeInfo.get('user')
+ return None
+
+ def getLongName(self):
+ """Get long name"""
+ user = self.getMyUser()
+ if user is not None:
+ return user.get('longName', None)
+ return None
+
+ def getShortName(self):
+ """Get short name"""
+ user = self.getMyUser()
+ if user is not None:
+ return user.get('shortName', None)
+ return None
+
+ def _waitConnected(self):
+ """Block until the initial node db download is complete, or timeout
+ and raise an exception"""
+ if not self.isConnected.wait(10.0): # timeout after 10 seconds
+ raise Exception("Timed out waiting for connection completion")
+
+ # If we failed while connecting, raise the connection to the client
+ if self.failure:
+ raise self.failure
+
+ def _generatePacketId(self):
+ """Get a new unique packet ID"""
+ if self.currentPacketId is None:
+ raise Exception("Not connected yet, can not generate packet")
+ else:
+ self.currentPacketId = (self.currentPacketId + 1) & 0xffffffff
+ return self.currentPacketId
+
+ def _disconnected(self):
+ """Called by subclasses to tell clients this interface has disconnected"""
+ self.isConnected.clear()
+ publishingThread.queueWork(lambda: pub.sendMessage(
+ "meshtastic.connection.lost", interface=self))
+
+ def _startHeartbeat(self):
+ """We need to send a heartbeat message to the device every X seconds"""
+ def callback():
+ self.heartbeatTimer = None
+ prefs = self.localNode.radioConfig.preferences
+ i = prefs.phone_timeout_secs / 2
+ logging.debug(f"Sending heartbeat, interval {i}")
+ if i != 0:
+ self.heartbeatTimer = threading.Timer(i, callback)
+ self.heartbeatTimer.start()
+ p = mesh_pb2.ToRadio()
+ self._sendToRadio(p)
+
+ callback() # run our periodic callback now, it will make another timer if necessary
+
+ def _connected(self):
+ """Called by this class to tell clients we are now fully connected to a node
+ """
+ # (because I'm lazy) _connected might be called when remote Node
+ # objects complete their config reads, don't generate redundant isConnected
+ # for the local interface
+ if not self.isConnected.is_set():
+ self.isConnected.set()
+ self._startHeartbeat()
+ publishingThread.queueWork(lambda: pub.sendMessage(
+ "meshtastic.connection.established", interface=self))
+
+ def _startConfig(self):
+ """Start device packets flowing"""
+ self.myInfo = None
+ self.nodes = {} # nodes keyed by ID
+ self.nodesByNum = {} # nodes keyed by nodenum
+
+ startConfig = mesh_pb2.ToRadio()
+ self.configId = random.randint(0, 0xffffffff)
+ startConfig.want_config_id = self.configId
+ self._sendToRadio(startConfig)
+
+ def _sendDisconnect(self):
+ """Tell device we are done using it"""
+ m = mesh_pb2.ToRadio()
+ m.disconnect = True
+ self._sendToRadio(m)
+
+ def _sendToRadio(self, toRadio):
+ """Send a ToRadio protobuf to the device"""
+ if self.noProto:
+ logging.warning(
+ f"Not sending packet because protocol use is disabled by noProto")
+ else:
+ #logging.debug(f"Sending toRadio: {stripnl(toRadio)}")
+ self._sendToRadioImpl(toRadio)
+
+ def _sendToRadioImpl(self, toRadio):
+ """Send a ToRadio protobuf to the device"""
+ logging.error(f"Subclass must provide toradio: {toRadio}")
+
+ def _handleConfigComplete(self):
+ """
+ Done with initial config messages, now send regular MeshPackets to ask for settings and channels
+ """
+ self.localNode.requestConfig()
+
+ def _handleFromRadio(self, fromRadioBytes):
+ """
+ Handle a packet that arrived from the radio(update model and publish events)
+
+ Called by subclasses."""
+ fromRadio = mesh_pb2.FromRadio()
+ fromRadio.ParseFromString(fromRadioBytes)
+ asDict = google.protobuf.json_format.MessageToDict(fromRadio)
+ #logging.debug(f"Received from radio: {fromRadio}")
+ if fromRadio.HasField("my_info"):
+ self.myInfo = fromRadio.my_info
+ self.localNode.nodeNum = self.myInfo.my_node_num
+ logging.debug(f"Received myinfo: {stripnl(fromRadio.my_info)}")
+
+ failmsg = None
+ # Check for app too old
+ if self.myInfo.min_app_version > OUR_APP_VERSION:
+ failmsg = "This device needs a newer python client, please \"pip install --upgrade meshtastic\". "\
+ "For more information see https://tinyurl.com/5bjsxu32"
+
+ # check for firmware too old
+ if self.myInfo.max_channels == 0:
+ failmsg = "This version of meshtastic-python requires device firmware version 1.2 or later. "\
+ "For more information see https://tinyurl.com/5bjsxu32"
+
+ if failmsg:
+ self.failure = Exception(failmsg)
+ self.isConnected.set() # let waitConnected return this exception
+ self.close()
+
+ elif fromRadio.HasField("node_info"):
+ node = asDict["nodeInfo"]
+ try:
+ self._fixupPosition(node["position"])
+ except:
+ logging.debug("Node without position")
+
+ logging.debug(f"Received nodeinfo: {node}")
+
+ self.nodesByNum[node["num"]] = node
+ if "user" in node: # Some nodes might not have user/ids assigned yet
+ self.nodes[node["user"]["id"]] = node
+ publishingThread.queueWork(lambda: pub.sendMessage("meshtastic.node.updated",
+ node=node, interface=self))
+ elif fromRadio.config_complete_id == self.configId:
+ # we ignore the config_complete_id, it is unneeded for our stream API fromRadio.config_complete_id
+ logging.debug(f"Config complete ID {self.configId}")
+ self._handleConfigComplete()
+ elif fromRadio.HasField("packet"):
+ self._handlePacketFromRadio(fromRadio.packet)
+ elif fromRadio.rebooted:
+ # Tell clients the device went away. Careful not to call the overridden subclass version that closes the serial port
+ MeshInterface._disconnected(self)
+
+ self._startConfig() # redownload the node db etc...
+ else:
+ logging.debug("Unexpected FromRadio payload")
+
+ def _fixupPosition(self, position):
+ """Convert integer lat/lon into floats
+
+ Arguments:
+ position {Position dictionary} -- object ot fix up
+ """
+ if "latitudeI" in position:
+ position["latitude"] = position["latitudeI"] * 1e-7
+ if "longitudeI" in position:
+ position["longitude"] = position["longitudeI"] * 1e-7
+
+ def _nodeNumToId(self, num):
+ """Map a node node number to a node ID
+
+ Arguments:
+ num {int} -- Node number
+
+ Returns:
+ string -- Node ID
+ """
+ if num == BROADCAST_NUM:
+ return BROADCAST_ADDR
+
+ try:
+ return self.nodesByNum[num]["user"]["id"]
+ except:
+ logging.debug(f"Node {num} not found for fromId")
+ return None
+
+ def _getOrCreateByNum(self, nodeNum):
+ """Given a nodenum find the NodeInfo in the DB (or create if necessary)"""
+ if nodeNum == BROADCAST_NUM:
+ raise Exception("Can not create/find nodenum by the broadcast num")
+
+ if nodeNum in self.nodesByNum:
+ return self.nodesByNum[nodeNum]
+ else:
+ n = {"num": nodeNum} # Create a minimial node db entry
+ self.nodesByNum[nodeNum] = n
+ return n
+
+ def _handlePacketFromRadio(self, meshPacket):
+ """Handle a MeshPacket that just arrived from the radio
+
+ Will publish one of the following events:
+ - meshtastic.receive.text(packet = MeshPacket dictionary)
+ - meshtastic.receive.position(packet = MeshPacket dictionary)
+ - meshtastic.receive.user(packet = MeshPacket dictionary)
+ - meshtastic.receive.data(packet = MeshPacket dictionary)
+ """
+
+ asDict = google.protobuf.json_format.MessageToDict(meshPacket)
+
+ # We normally decompose the payload into a dictionary so that the client
+ # doesn't need to understand protobufs. But advanced clients might
+ # want the raw protobuf, so we provide it in "raw"
+ asDict["raw"] = meshPacket
+
+ # from might be missing if the nodenum was zero.
+ if not "from" in asDict:
+ asDict["from"] = 0
+ logging.error(
+ f"Device returned a packet we sent, ignoring: {stripnl(asDict)}")
+ return
+ if not "to" in asDict:
+ asDict["to"] = 0
+
+ # /add fromId and toId fields based on the node ID
+ try:
+ asDict["fromId"] = self._nodeNumToId(asDict["from"])
+ except Exception as ex:
+ logging.warning(f"Not populating fromId {ex}")
+ try:
+ asDict["toId"] = self._nodeNumToId(asDict["to"])
+ except Exception as ex:
+ logging.warning(f"Not populating toId {ex}")
+
+ # We could provide our objects as DotMaps - which work with . notation or as dictionaries
+ # asObj = DotMap(asDict)
+ topic = "meshtastic.receive" # Generic unknown packet type
+
+ decoded = asDict["decoded"]
+ # The default MessageToDict converts byte arrays into base64 strings.
+ # We don't want that - it messes up data payload. So slam in the correct
+ # byte array.
+ decoded["payload"] = meshPacket.decoded.payload
+
+ # UNKNOWN_APP is the default protobuf portnum value, and therefore if not set it will not be populated at all
+ # to make API usage easier, set it to prevent confusion
+ if not "portnum" in decoded:
+ decoded["portnum"] = portnums_pb2.PortNum.Name(
+ portnums_pb2.PortNum.UNKNOWN_APP)
+
+ portnum = decoded["portnum"]
+
+ topic = f"meshtastic.receive.data.{portnum}"
+
+ # decode position protobufs and update nodedb, provide decoded version as "position" in the published msg
+ # move the following into a 'decoders' API that clients could register?
+ portNumInt = meshPacket.decoded.portnum # we want portnum as an int
+ handler = protocols.get(portNumInt)
+ # The decoded protobuf as a dictionary (if we understand this message)
+ p = None
+ if handler is not None:
+ topic = f"meshtastic.receive.{handler.name}"
+
+ # Convert to protobuf if possible
+ if handler.protobufFactory is not None:
+ pb = handler.protobufFactory()
+ pb.ParseFromString(meshPacket.decoded.payload)
+ p = google.protobuf.json_format.MessageToDict(pb)
+ asDict["decoded"][handler.name] = p
+ # Also provide the protobuf raw
+ asDict["decoded"][handler.name]["raw"] = pb
+
+ # Call specialized onReceive if necessary
+ if handler.onReceive is not None:
+ handler.onReceive(self, asDict)
+
+ # Is this message in response to a request, if so, look for a handler
+ requestId = decoded.get("requestId")
+ if requestId is not None:
+ # We ignore ACK packets, but send NAKs and data responses to the handlers
+ routing = decoded.get("routing")
+ isAck = routing is not None and ("errorReason" not in routing)
+ if not isAck:
+ # we keep the responseHandler in dict until we get a non ack
+ handler = self.responseHandlers.pop(requestId, None)
+ if handler is not None:
+ handler.callback(asDict)
+
+ logging.debug(f"Publishing {topic}: packet={stripnl(asDict)} ")
+ publishingThread.queueWork(lambda: pub.sendMessage(
+ topic, packet=asDict, interface=self))
+
+