diff --git a/docs/meshtastic/index.html b/docs/meshtastic/index.html
index 55c5cdd..485594d 100644
--- a/docs/meshtastic/index.html
+++ b/docs/meshtastic/index.html
@@ -142,13 +142,23 @@ interface = meshtastic.SerialInterface()
"""
-import socket
import pygatt
import google.protobuf.json_format
-import serial, threading, logging, sys, random, traceback, time, base64, platform
-from . import mesh_pb2, portnums_pb2, util
+import serial
+import threading
+import logging
+import sys
+import random
+import traceback
+import time
+import base64
+import platform
+import socket
+from . import mesh_pb2, portnums_pb2, apponly_pb2, admin_pb2, environmental_measurement_pb2, remote_hardware_pb2, channel_pb2, radioconfig_pb2, util
+from .util import fixme, catchAndIgnore
from pubsub import pub
from dotmap import DotMap
+from typing import *
START1 = 0x94
START2 = 0xc3
@@ -166,7 +176,25 @@ MY_CONFIG_ID = 42
format is Mmmss (where M is 1+the numeric major number. i.e. 20120 means 1.1.20
"""
-OUR_APP_VERSION = 20120
+OUR_APP_VERSION = 20200
+
+
+class ResponseHandler(NamedTuple):
+ """A pending response callback, waiting for a response to one of our messages"""
+ # requestId: int - used only as a key
+ callback: Callable
+ # FIXME, add timestamp and age out old requests
+
+
+class KnownProtocol(NamedTuple):
+ """Used to automatically decode known protocol payloads"""
+ name: str
+ # portnum: int, now a key
+ # If set, will be called to prase as a protocol buffer
+ protobufFactory: Callable = None
+ # If set, invoked as onReceive(interface, packet)
+ onReceive: Callable = None
+
class MeshInterface:
"""Interface class for meshtastic devices
@@ -188,7 +216,10 @@ class MeshInterface:
self.nodes = None # FIXME
self.isConnected = threading.Event()
self.noProto = noProto
- random.seed() # FIXME, we should not clobber the random seedval here, instead tell user they must call it
+ self.myInfo = None # We don't have device info yet
+ self.responseHandlers = {} # A map from request ID to the handler
+ self.failure = None # If we've encountered a fatal exception it will be kept here
+ random.seed() # FIXME, we should not clobber the random seedval here, instead tell user they must call it
self.currentPacketId = random.randint(0, 0xffffffff)
self._startConfig()
@@ -197,12 +228,17 @@ class MeshInterface:
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None and exc_value is not None:
- logging.error(f'An exception of type {exc_type} with value {exc_value} has occurred')
+ logging.error(
+ f'An exception of type {exc_type} with value {exc_value} has occurred')
if traceback is not None:
logging.error(f'Traceback: {traceback}')
self.close()
- def sendText(self, text, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False):
+ def sendText(self, text: AnyStr,
+ destinationId=BROADCAST_ADDR,
+ wantAck=False,
+ wantResponse=False,
+ onResponse=None):
"""Send a utf8 string to some other node, if the node has a display it will also be shown on the device.
Arguments:
@@ -212,13 +248,20 @@ class MeshInterface:
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
+ wantResponse -- True if you want the service on the other side to send an application layer response
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
"""
return self.sendData(text.encode("utf-8"), destinationId,
- portNum=portnums_pb2.PortNum.TEXT_MESSAGE_APP, wantAck=wantAck, wantResponse=wantResponse)
+ portNum=portnums_pb2.PortNum.TEXT_MESSAGE_APP,
+ wantAck=wantAck,
+ wantResponse=wantResponse,
+ onResponse=onResponse)
- def sendData(self, data, destinationId=BROADCAST_ADDR, portNum=portnums_pb2.PortNum.PRIVATE_APP, wantAck=False, wantResponse=False):
+ def sendData(self, data, destinationId=BROADCAST_ADDR,
+ portNum=portnums_pb2.PortNum.PRIVATE_APP, wantAck=False,
+ wantResponse=False,
+ onResponse=None):
"""Send a data packet to some other node
Keyword Arguments:
@@ -226,6 +269,8 @@ class MeshInterface:
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
+ wantResponse -- True if you want the service on the other side to send an application layer response
+ onResponse -- A closure of the form funct(packet), that will be called when a response packet arrives (or the transaction is NAKed due to non receipt)
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
"""
@@ -236,10 +281,14 @@ class MeshInterface:
if len(data) > mesh_pb2.Constants.DATA_PAYLOAD_LEN:
raise Exception("Data payload too big")
meshPacket = mesh_pb2.MeshPacket()
- meshPacket.decoded.data.payload = data
- meshPacket.decoded.data.portnum = portNum
+ meshPacket.decoded.payload = data
+ meshPacket.decoded.portnum = portNum
meshPacket.decoded.want_response = wantResponse
- return self.sendPacket(meshPacket, destinationId, wantAck=wantAck)
+
+ p = self._sendPacket(meshPacket, destinationId, wantAck=wantAck)
+ if onResponse is not None:
+ self._addResponseHandler(p.id, onResponse)
+ return p
def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False):
"""
@@ -266,18 +315,29 @@ class MeshInterface:
timeSec = time.time() # returns unix timestamp in seconds
p.time = int(timeSec)
- return self.sendData(p, destinationId, portNum=portnums_pb2.PortNum.POSITION_APP, wantAck=wantAck, wantResponse=wantResponse)
+ return self.sendData(p, destinationId,
+ portNum=portnums_pb2.PortNum.POSITION_APP,
+ wantAck=wantAck,
+ wantResponse=wantResponse)
- def sendPacket(self, meshPacket, destinationId=BROADCAST_ADDR, wantAck=False):
+ def _addResponseHandler(self, requestId, callback):
+ self.responseHandlers[requestId] = ResponseHandler(callback)
+
+ def _sendPacket(self, meshPacket,
+ destinationId=BROADCAST_ADDR,
+ wantAck=False):
"""Send a MeshPacket to the specified node (or if unspecified, broadcast).
You probably don't want this - use sendData instead.
- Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
+ Returns the sent packet. The id field will be populated in this packet and
+ can be used to track future message acks/naks.
"""
- self._waitConnected()
+
+ # We allow users to talk to the local node before we've completed the full connection flow...
+ if(self.myInfo is not None and destinationId != self.myInfo.my_node_num):
+ self._waitConnected()
toRadio = mesh_pb2.ToRadio()
- # FIXME add support for non broadcast addresses
if destinationId is None:
raise Exception("destinationId must not be None")
@@ -300,7 +360,7 @@ class MeshInterface:
self._sendToRadio(toRadio)
return meshPacket
- def waitForConfig(self, sleep=.1, maxsecs=20, attrs=('myInfo', 'nodes', 'radioConfig')):
+ def waitForConfig(self, sleep=.1, maxsecs=20, attrs=('myInfo', 'nodes', 'radioConfig', 'channels')):
"""Block until radio config is received. Returns True if config has been received."""
for _ in range(int(maxsecs/sleep)):
if all(map(lambda a: getattr(self, a, None), attrs)):
@@ -313,9 +373,12 @@ class MeshInterface:
if self.radioConfig == None:
raise Exception("No RadioConfig has been read")
- t = mesh_pb2.ToRadio()
- t.set_radio.CopyFrom(self.radioConfig)
- self._sendToRadio(t)
+ p = admin_pb2.AdminMessage()
+ p.set_radio.CopyFrom(self.radioConfig)
+
+ self.sendData(p, self.myInfo.my_node_num,
+ portNum=portnums_pb2.PortNum.ADMIN_APP,
+ wantAck=True)
logging.debug("Wrote config")
def getMyNodeInfo(self):
@@ -358,43 +421,58 @@ class MeshInterface:
short_name = long_name[0] + long_name[1:].translate(trans)
if len(short_name) < nChars:
short_name = long_name[:nChars]
- t = mesh_pb2.ToRadio()
+
+ p = admin_pb2.AdminMessage()
+
if long_name is not None:
- t.set_owner.long_name = long_name
+ p.set_owner.long_name = long_name
if short_name is not None:
short_name = short_name.strip()
if len(short_name) > nChars:
short_name = short_name[:nChars]
- t.set_owner.short_name = short_name
- self._sendToRadio(t)
+ p.set_owner.short_name = short_name
+
+ return self.sendData(p, self.myInfo.my_node_num,
+ portNum=portnums_pb2.PortNum.ADMIN_APP,
+ wantAck=True)
@property
def channelURL(self):
"""The sharable URL that describes the current channel
"""
- bytes = self.radioConfig.channel_settings.SerializeToString()
+ # Only keep the primary/secondary channels, assume primary is first
+ channelSet = apponly_pb2.ChannelSet()
+ for c in self.channels:
+ if c.role != channel_pb2.Channel.Role.DISABLED:
+ channelSet.settings.append(c.settings)
+ bytes = channelSet.SerializeToString()
s = base64.urlsafe_b64encode(bytes).decode('ascii')
- return f"https://www.meshtastic.org/c/#{s}"
+ return f"https://www.meshtastic.org/d/#{s}"
- def setURL(self, url, write=True):
+ def setURL(self, url):
"""Set mesh network URL"""
if self.radioConfig == None:
raise Exception("No RadioConfig has been read")
- # URLs are of the form https://www.meshtastic.org/c/#{base64_channel_settings}
+ # URLs are of the form https://www.meshtastic.org/d/#{base64_channel_set}
# Split on '/#' to find the base64 encoded channel settings
splitURL = url.split("/#")
decodedURL = base64.urlsafe_b64decode(splitURL[-1])
- self.radioConfig.channel_settings.ParseFromString(decodedURL)
- if write:
- self.writeConfig()
+ channelSet = apponly_pb2.ChannelSet()
+ channelSet.ParseFromString(decodedURL)
+ fixme("set self.channels, see https://developers.google.com/protocol-buffers/docs/reference/python-generated?csw=1#repeated-fields")
+ self._writeChannels()
def _waitConnected(self):
"""Block until the initial node db download is complete, or timeout
and raise an exception"""
- if not self.isConnected.wait(5.0): # timeout after 5 seconds
+ if not self.isConnected.wait(5.0): # timeout after 5 seconds
raise Exception("Timed out waiting for connection completion")
+ # If we failed while connecting, raise the connection to the client
+ if self.failure:
+ raise self.failure
+
def _generatePacketId(self):
"""Get a new unique packet ID"""
if self.currentPacketId is None:
@@ -406,13 +484,15 @@ class MeshInterface:
def _disconnected(self):
"""Called by subclasses to tell clients this interface has disconnected"""
self.isConnected.clear()
- pub.sendMessage("meshtastic.connection.lost", interface=self)
+ catchAndIgnore("disconnection publish", lambda: pub.sendMessage(
+ "meshtastic.connection.lost", interface=self))
def _connected(self):
"""Called by this class to tell clients we are now fully connected to a node
"""
self.isConnected.set()
- pub.sendMessage("meshtastic.connection.established", interface=self)
+ catchAndIgnore("connection publish", lambda: pub.sendMessage(
+ "meshtastic.connection.established", interface=self))
def _startConfig(self):
"""Start device packets flowing"""
@@ -420,6 +500,8 @@ class MeshInterface:
self.nodes = {} # nodes keyed by ID
self.nodesByNum = {} # nodes keyed by nodenum
self.radioConfig = None
+ self.channels = None
+ self.partialChannels = [] # We keep our channels in a temp array until finished
startConfig = mesh_pb2.ToRadio()
startConfig.want_config_id = MY_CONFIG_ID # we don't use this value
@@ -428,14 +510,69 @@ class MeshInterface:
def _sendToRadio(self, toRadio):
"""Send a ToRadio protobuf to the device"""
if self.noProto:
- logging.warn(f"Not sending packet because protocol use is disabled by noProto")
+ logging.warn(
+ f"Not sending packet because protocol use is disabled by noProto")
else:
+ logging.debug(f"Sending toRadio: {toRadio}")
self._sendToRadioImpl(toRadio)
def _sendToRadioImpl(self, toRadio):
"""Send a ToRadio protobuf to the device"""
logging.error(f"Subclass must provide toradio: {toRadio}")
+ def _handleConfigComplete(self):
+ """
+ Done with initial config messages, now send regular MeshPackets to ask for settings and channels
+ """
+ self._requestSettings()
+ self._requestChannel(0)
+
+ def _requestSettings(self):
+ """
+ Done with initial config messages, now send regular MeshPackets to ask for settings
+ """
+ p = admin_pb2.AdminMessage()
+ p.get_radio_request = True
+
+ def onResponse(p):
+ """A closure to handle the response packet"""
+ self.radioConfig = p["decoded"]["admin"]["raw"].get_radio_response
+
+ return self.sendData(p, self.myInfo.my_node_num,
+ portNum=portnums_pb2.PortNum.ADMIN_APP,
+ wantAck=True,
+ wantResponse=True,
+ onResponse=onResponse)
+
+ def _requestChannel(self, channelNum: int):
+ """
+ Done with initial config messages, now send regular MeshPackets to ask for settings
+ """
+ p = admin_pb2.AdminMessage()
+ p.get_channel_request = channelNum + 1
+ logging.debug(f"Requesting channel {channelNum}")
+
+ def onResponse(p):
+ """A closure to handle the response packet"""
+ c = p["decoded"]["admin"]["raw"].get_channel_response
+ self.partialChannels.append(c)
+ logging.debug(f"Received channel {c}")
+ # for stress testing, download all channels
+ # if channelNum >= self.myInfo.max_channels - 1:
+ if c.role == channel_pb2.Channel.Role.DISABLED or channelNum >= self.myInfo.max_channels - 1:
+ # Once we see a response that has NO settings, assume we are at the end of channels and stop fetching
+ self.channels = self.partialChannels
+ # FIXME, the following should only be called after we have settings and channels
+ self._connected() # Tell everone else we are ready to go
+ else:
+ self._requestChannel(channelNum + 1)
+
+ return self.sendData(p, self.myInfo.my_node_num,
+ portNum=portnums_pb2.PortNum.ADMIN_APP,
+ wantAck=True,
+ wantResponse=True,
+ onResponse=onResponse)
+
def _handleFromRadio(self, fromRadioBytes):
"""
Handle a packet that arrived from the radio(update model and publish events)
@@ -447,12 +584,21 @@ class MeshInterface:
logging.debug(f"Received: {asDict}")
if fromRadio.HasField("my_info"):
self.myInfo = fromRadio.my_info
+
+ failmsg = None
+ # Check for app too old
if self.myInfo.min_app_version > OUR_APP_VERSION:
- raise Exception(
- "This device needs a newer python client, please \"pip install --upgrade meshtastic\"")
- # start assigning our packet IDs from the opposite side of where our local device is assigning them
- elif fromRadio.HasField("radio"):
- self.radioConfig = fromRadio.radio
+ failmsg = "This device needs a newer python client, please \"pip install --upgrade meshtastic\". For more information see https://tinyurl.com/5bjsxu32"
+
+ # check for firmware too old
+ if self.myInfo.max_channels == 0:
+ failmsg = "This version of meshtastic-python requires device firmware version 1.2 or later. For more information see https://tinyurl.com/5bjsxu32"
+
+ if failmsg:
+ self.failure = Exception(failmsg)
+ self.isConnected.set() # let waitConnected return this exception
+ self.close()
+
elif fromRadio.HasField("node_info"):
node = asDict["nodeInfo"]
try:
@@ -462,10 +608,11 @@ class MeshInterface:
self.nodesByNum[node["num"]] = node
if "user" in node: # Some nodes might not have user/ids assigned yet
self.nodes[node["user"]["id"]] = node
- pub.sendMessage("meshtastic.node.updated", node=node, interface=self)
+ pub.sendMessage("meshtastic.node.updated",
+ node=node, interface=self)
elif fromRadio.config_complete_id == MY_CONFIG_ID:
# we ignore the config_complete_id, it is unneeded for our stream API fromRadio.config_complete_id
- self._connected()
+ self._handleConfigComplete()
elif fromRadio.HasField("packet"):
self._handlePacketFromRadio(fromRadio.packet)
elif fromRadio.rebooted:
@@ -528,6 +675,12 @@ class MeshInterface:
"""
asDict = google.protobuf.json_format.MessageToDict(meshPacket)
+
+ # We normally decompose the payload into a dictionary so that the client
+ # doesn't need to understand protobufs. But advanced clients might
+ # want the raw protobuf, so we provide it in "raw"
+ asDict["raw"] = meshPacket
+
# /add fromId and toId fields based on the node ID
asDict["fromId"] = self._nodeNumToId(asDict["from"])
asDict["toId"] = self._nodeNumToId(asDict["to"])
@@ -536,68 +689,59 @@ class MeshInterface:
# asObj = DotMap(asDict)
topic = "meshtastic.receive" # Generic unknown packet type
- # Warn users if firmware doesn't use new portnum based data encodings
- # But do not crash, because the lib will still basically work and ignore those packet types
- if meshPacket.decoded.HasField("user") or meshPacket.decoded.HasField("position"):
- logging.warn("Ignoring old position/user message. Recommend you update firmware to 1.1.20 or later")
+ decoded = asDict["decoded"]
+ # The default MessageToDict converts byte arrays into base64 strings.
+ # We don't want that - it messes up data payload. So slam in the correct
+ # byte array.
+ decoded["payload"] = meshPacket.decoded.payload
- if meshPacket.decoded.HasField("data"):
+ # UNKNOWN_APP is the default protobuf portnum value, and therefore if not set it will not be populated at all
+ # to make API usage easier, set it to prevent confusion
+ if not "portnum" in decoded:
+ decoded["portnum"] = portnums_pb2.PortNum.Name(
+ portnums_pb2.PortNum.UNKNOWN_APP)
- # The default MessageToDict converts byte arrays into base64 strings.
- # We don't want that - it messes up data payload. So slam in the correct
- # byte array.
- asDict["decoded"]["data"]["payload"] = meshPacket.decoded.data.payload
+ portnum = decoded["portnum"]
- # UNKNOWN_APP is the default protobuf portnum value, and therefore if not set it will not be populated at all
- # to make API usage easier, set it to prevent confusion
- if not "portnum" in asDict["decoded"]["data"]:
- asDict["decoded"]["data"]["portnum"] = portnums_pb2.PortNum.Name(portnums_pb2.PortNum.UNKNOWN_APP)
+ topic = f"meshtastic.receive.data.{portnum}"
- portnum = asDict["decoded"]["data"]["portnum"]
+ # decode position protobufs and update nodedb, provide decoded version as "position" in the published msg
+ # move the following into a 'decoders' API that clients could register?
+ portNumInt = meshPacket.decoded.portnum # we want portnum as an int
+ handler = protocols.get(portNumInt)
+ # The decoded protobuf as a dictionary (if we understand this message)
+ p = None
+ if handler is not None:
+ topic = f"meshtastic.receive.{handler.name}"
- topic = f"meshtastic.receive.data.{portnum}"
-
- # For text messages, we go ahead and decode the text to ascii for our users
- if portnum == portnums_pb2.PortNum.Name(portnums_pb2.PortNum.TEXT_MESSAGE_APP):
- topic = "meshtastic.receive.text"
-
- # We don't throw if the utf8 is invalid in the text message. Instead we just don't populate
- # the decoded.data.text and we log an error message. This at least allows some delivery to
- # the app and the app can deal with the missing decoded representation.
- #
- # Usually btw this problem is caused by apps sending binary data but setting the payload type to
- # text.
- try:
- asDict["decoded"]["data"]["text"] = meshPacket.decoded.data.payload.decode("utf-8")
- except Exception as ex:
- logging.error(f"Malformatted utf8 in text message: {ex}")
-
- # decode position protobufs and update nodedb, provide decoded version as "position" in the published msg
- if portnum == portnums_pb2.PortNum.Name(portnums_pb2.PortNum.POSITION_APP):
- topic = "meshtastic.receive.position"
- pb = mesh_pb2.Position()
- pb.ParseFromString(meshPacket.decoded.data.payload)
+ # Convert to protobuf if possible
+ if handler.protobufFactory is not None:
+ pb = handler.protobufFactory()
+ pb.ParseFromString(meshPacket.decoded.payload)
p = google.protobuf.json_format.MessageToDict(pb)
- self._fixupPosition(p)
- asDict["decoded"]["data"]["position"] = p
- # update node DB as needed
- self._getOrCreateByNum(asDict["from"])["position"] = p
+ asDict["decoded"][handler.name] = p
+ # Also provide the protobuf raw
+ asDict["decoded"][handler.name]["raw"] = pb
- # decode user protobufs and update nodedb, provide decoded version as "position" in the published msg
- if portnum == portnums_pb2.PortNum.Name(portnums_pb2.PortNum.NODEINFO_APP):
- topic = "meshtastic.receive.user"
- pb = mesh_pb2.User()
- pb.ParseFromString(meshPacket.decoded.data.payload)
- u = google.protobuf.json_format.MessageToDict(pb)
- asDict["decoded"]["data"]["user"] = u
- # update node DB as needed
- n = self._getOrCreateByNum(asDict["from"])
- n["user"] = u
- # We now have a node ID, make sure it is uptodate in that table
- self.nodes[u["id"]] = n
+ # Call specialized onReceive if necessary
+ if handler.onReceive is not None:
+ handler.onReceive(self, asDict)
+
+ # Is this message in response to a request, if so, look for a handler
+ requestId = decoded.get("requestId")
+ if requestId is not None:
+ # We ignore ACK packets, but send NAKs and data responses to the handlers
+ routing = decoded.get("routing")
+ isAck = routing is not None and ("errorReason" not in routing)
+ if not isAck:
+ # we keep the responseHandler in dict until we get a non ack
+ handler = self.responseHandlers.pop(requestId, None)
+ if handler is not None:
+ handler.callback(asDict)
logging.debug(f"Publishing topic {topic}")
- pub.sendMessage(topic, packet=asDict, interface=self)
+ catchAndIgnore(f"publishing {topic}", lambda: pub.sendMessage(
+ topic, packet=asDict, interface=self))
# Our standard BLE characteristics
@@ -686,7 +830,7 @@ class StreamInterface(MeshInterface):
time.sleep(0.1) # wait 100ms to give device time to start running
self._rxThread.start()
- if not self.noProto: # Wait for the db download if using the protocol
+ if not self.noProto: # Wait for the db download if using the protocol
self._waitConnected()
def _disconnected(self):
@@ -772,13 +916,16 @@ class StreamInterface(MeshInterface):
# logging.debug(f"timeout")
pass
except serial.SerialException as ex:
- if not self._wantExit: # We might intentionally get an exception during shutdown
- logging.warn(f"Meshtastic serial port disconnected, disconnecting... {ex}")
+ if not self._wantExit: # We might intentionally get an exception during shutdown
+ logging.warn(
+ f"Meshtastic serial port disconnected, disconnecting... {ex}")
except OSError as ex:
- if not self._wantExit: # We might intentionally get an exception during shutdown
- logging.error(f"Unexpected OSError, terminating meshtastic reader... {ex}")
+ if not self._wantExit: # We might intentionally get an exception during shutdown
+ logging.error(
+ f"Unexpected OSError, terminating meshtastic reader... {ex}")
except Exception as ex:
- logging.error(f"Unexpected exception, terminating meshtastic reader... {ex}")
+ logging.error(
+ f"Unexpected exception, terminating meshtastic reader... {ex}")
finally:
logging.debug("reader is exiting")
self._disconnected()
@@ -826,10 +973,11 @@ class SerialInterface(StreamInterface):
StreamInterface.__init__(
self, debugOut=debugOut, noProto=noProto, connectNow=connectNow)
+
class TCPInterface(StreamInterface):
"""Interface class for meshtastic devices over a TCP link"""
- def __init__(self, hostname, debugOut=None, noProto=False, connectNow=True, portNumber=4403):
+ def __init__(self, hostname: AnyStr, debugOut=None, noProto=False, connectNow=True, portNumber=4403):
"""Constructor, opens a connection to a specified IP address/hostname
Keyword Arguments:
@@ -852,9 +1000,9 @@ class TCPInterface(StreamInterface):
def close(self):
"""Close a connection to the device"""
logging.debug("Closing TCP stream")
- # Sometimes the socket read might be blocked in the reader thread. Therefore we force the shutdown by closing
+ # Sometimes the socket read might be blocked in the reader thread. Therefore we force the shutdown by closing
# the socket here
- self._wantExit = True
+ self._wantExit = True
if not self.socket is None:
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
@@ -866,12 +1014,63 @@ class TCPInterface(StreamInterface):
def _readBytes(self, len):
"""Read an array of bytes from our stream"""
- return self.socket.recv(len)
+ return self.socket.recv(len)
+
+
+def _onTextReceive(iface, asDict):
+ """Special text auto parsing for received messages"""
+ # We don't throw if the utf8 is invalid in the text message. Instead we just don't populate
+ # the decoded.data.text and we log an error message. This at least allows some delivery to
+ # the app and the app can deal with the missing decoded representation.
+ #
+ # Usually btw this problem is caused by apps sending binary data but setting the payload type to
+ # text.
+ try:
+ asDict["decoded"]["text"] = meshPacket.decoded.payload.decode(
+ "utf-8")
+ except Exception as ex:
+ logging.error(f"Malformatted utf8 in text message: {ex}")
+
+
+def _onPositionReceive(iface, asDict):
+ """Special auto parsing for received messages"""
+ p = asDict["decoded"]["position"]
+ iface._fixupPosition(p)
+ # update node DB as needed
+ iface._getOrCreateByNum(asDict["from"])["position"] = p
+
+
+def _onNodeInfoReceive(iface, asDict):
+ """Special auto parsing for received messages"""
+ p = asDict["decoded"]["user"]
+ # decode user protobufs and update nodedb, provide decoded version as "position" in the published msg
+ # update node DB as needed
+ n = iface._getOrCreateByNum(asDict["from"])
+ n["user"] = p
+ # We now have a node ID, make sure it is uptodate in that table
+ iface.nodes[p["id"]] = n
+
+
+"""Well known message payloads can register decoders for automatic protobuf parsing"""
+protocols = {
+ portnums_pb2.PortNum.TEXT_MESSAGE_APP: KnownProtocol("text", onReceive=_onTextReceive),
+ portnums_pb2.PortNum.POSITION_APP: KnownProtocol("position", mesh_pb2.Position, _onPositionReceive),
+ portnums_pb2.PortNum.NODEINFO_APP: KnownProtocol("user", mesh_pb2.User, _onNodeInfoReceive),
+ portnums_pb2.PortNum.ADMIN_APP: KnownProtocol("admin", admin_pb2.AdminMessage),
+ portnums_pb2.PortNum.ROUTING_APP: KnownProtocol("routing", mesh_pb2.Routing),
+ portnums_pb2.PortNum.ENVIRONMENTAL_MEASUREMENT_APP: KnownProtocol("environmental", environmental_measurement_pb2.EnvironmentalMeasurement),
+ portnums_pb2.PortNum.REMOTE_HARDWARE_APP: KnownProtocol(
+ "remotehw", remote_hardware_pb2.HardwareMessage)
+}
Used to automatically decode known protocol payloads
+
+
+Expand source code
+
+
class KnownProtocol(NamedTuple):
+ """Used to automatically decode known protocol payloads"""
+ name: str
+ # portnum: int, now a key
+ # If set, will be called to prase as a protocol buffer
+ protobufFactory: Callable = None
+ # If set, invoked as onReceive(interface, packet)
+ onReceive: Callable = None
+
+
Ancestors
+
+
builtins.tuple
+
+
Instance variables
+
+
var name :Â str
+
+
Alias for field number 0
+
+
var onReceive :Â Callable
+
+
Alias for field number 2
+
+
var protobufFactory :Â Callable
+
+
Alias for field number 1
+
+
+
class MeshInterface(debugOut=None, noProto=False)
@@ -1057,7 +1302,10 @@ noProto – If True, don't try to run our protocol on the link - just be a d
self.nodes = None # FIXME
self.isConnected = threading.Event()
self.noProto = noProto
- random.seed() # FIXME, we should not clobber the random seedval here, instead tell user they must call it
+ self.myInfo = None # We don't have device info yet
+ self.responseHandlers = {} # A map from request ID to the handler
+ self.failure = None # If we've encountered a fatal exception it will be kept here
+ random.seed() # FIXME, we should not clobber the random seedval here, instead tell user they must call it
self.currentPacketId = random.randint(0, 0xffffffff)
self._startConfig()
@@ -1066,12 +1314,17 @@ noProto – If True, don't try to run our protocol on the link - just be a d
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None and exc_value is not None:
- logging.error(f'An exception of type {exc_type} with value {exc_value} has occurred')
+ logging.error(
+ f'An exception of type {exc_type} with value {exc_value} has occurred')
if traceback is not None:
logging.error(f'Traceback: {traceback}')
self.close()
- def sendText(self, text, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False):
+ def sendText(self, text: AnyStr,
+ destinationId=BROADCAST_ADDR,
+ wantAck=False,
+ wantResponse=False,
+ onResponse=None):
"""Send a utf8 string to some other node, if the node has a display it will also be shown on the device.
Arguments:
@@ -1081,13 +1334,20 @@ noProto – If True, don't try to run our protocol on the link - just be a d
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
+ wantResponse -- True if you want the service on the other side to send an application layer response
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
"""
return self.sendData(text.encode("utf-8"), destinationId,
- portNum=portnums_pb2.PortNum.TEXT_MESSAGE_APP, wantAck=wantAck, wantResponse=wantResponse)
+ portNum=portnums_pb2.PortNum.TEXT_MESSAGE_APP,
+ wantAck=wantAck,
+ wantResponse=wantResponse,
+ onResponse=onResponse)
- def sendData(self, data, destinationId=BROADCAST_ADDR, portNum=portnums_pb2.PortNum.PRIVATE_APP, wantAck=False, wantResponse=False):
+ def sendData(self, data, destinationId=BROADCAST_ADDR,
+ portNum=portnums_pb2.PortNum.PRIVATE_APP, wantAck=False,
+ wantResponse=False,
+ onResponse=None):
"""Send a data packet to some other node
Keyword Arguments:
@@ -1095,6 +1355,8 @@ noProto – If True, don't try to run our protocol on the link - just be a d
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
+ wantResponse -- True if you want the service on the other side to send an application layer response
+ onResponse -- A closure of the form funct(packet), that will be called when a response packet arrives (or the transaction is NAKed due to non receipt)
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
"""
@@ -1105,10 +1367,14 @@ noProto – If True, don't try to run our protocol on the link - just be a d
if len(data) > mesh_pb2.Constants.DATA_PAYLOAD_LEN:
raise Exception("Data payload too big")
meshPacket = mesh_pb2.MeshPacket()
- meshPacket.decoded.data.payload = data
- meshPacket.decoded.data.portnum = portNum
+ meshPacket.decoded.payload = data
+ meshPacket.decoded.portnum = portNum
meshPacket.decoded.want_response = wantResponse
- return self.sendPacket(meshPacket, destinationId, wantAck=wantAck)
+
+ p = self._sendPacket(meshPacket, destinationId, wantAck=wantAck)
+ if onResponse is not None:
+ self._addResponseHandler(p.id, onResponse)
+ return p
def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False):
"""
@@ -1135,18 +1401,29 @@ noProto – If True, don't try to run our protocol on the link - just be a d
timeSec = time.time() # returns unix timestamp in seconds
p.time = int(timeSec)
- return self.sendData(p, destinationId, portNum=portnums_pb2.PortNum.POSITION_APP, wantAck=wantAck, wantResponse=wantResponse)
+ return self.sendData(p, destinationId,
+ portNum=portnums_pb2.PortNum.POSITION_APP,
+ wantAck=wantAck,
+ wantResponse=wantResponse)
- def sendPacket(self, meshPacket, destinationId=BROADCAST_ADDR, wantAck=False):
+ def _addResponseHandler(self, requestId, callback):
+ self.responseHandlers[requestId] = ResponseHandler(callback)
+
+ def _sendPacket(self, meshPacket,
+ destinationId=BROADCAST_ADDR,
+ wantAck=False):
"""Send a MeshPacket to the specified node (or if unspecified, broadcast).
You probably don't want this - use sendData instead.
- Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
+ Returns the sent packet. The id field will be populated in this packet and
+ can be used to track future message acks/naks.
"""
- self._waitConnected()
+
+ # We allow users to talk to the local node before we've completed the full connection flow...
+ if(self.myInfo is not None and destinationId != self.myInfo.my_node_num):
+ self._waitConnected()
toRadio = mesh_pb2.ToRadio()
- # FIXME add support for non broadcast addresses
if destinationId is None:
raise Exception("destinationId must not be None")
@@ -1169,7 +1446,7 @@ noProto – If True, don't try to run our protocol on the link - just be a d
self._sendToRadio(toRadio)
return meshPacket
- def waitForConfig(self, sleep=.1, maxsecs=20, attrs=('myInfo', 'nodes', 'radioConfig')):
+ def waitForConfig(self, sleep=.1, maxsecs=20, attrs=('myInfo', 'nodes', 'radioConfig', 'channels')):
"""Block until radio config is received. Returns True if config has been received."""
for _ in range(int(maxsecs/sleep)):
if all(map(lambda a: getattr(self, a, None), attrs)):
@@ -1182,9 +1459,12 @@ noProto – If True, don't try to run our protocol on the link - just be a d
if self.radioConfig == None:
raise Exception("No RadioConfig has been read")
- t = mesh_pb2.ToRadio()
- t.set_radio.CopyFrom(self.radioConfig)
- self._sendToRadio(t)
+ p = admin_pb2.AdminMessage()
+ p.set_radio.CopyFrom(self.radioConfig)
+
+ self.sendData(p, self.myInfo.my_node_num,
+ portNum=portnums_pb2.PortNum.ADMIN_APP,
+ wantAck=True)
logging.debug("Wrote config")
def getMyNodeInfo(self):
@@ -1227,43 +1507,58 @@ noProto – If True, don't try to run our protocol on the link - just be a d
short_name = long_name[0] + long_name[1:].translate(trans)
if len(short_name) < nChars:
short_name = long_name[:nChars]
- t = mesh_pb2.ToRadio()
+
+ p = admin_pb2.AdminMessage()
+
if long_name is not None:
- t.set_owner.long_name = long_name
+ p.set_owner.long_name = long_name
if short_name is not None:
short_name = short_name.strip()
if len(short_name) > nChars:
short_name = short_name[:nChars]
- t.set_owner.short_name = short_name
- self._sendToRadio(t)
+ p.set_owner.short_name = short_name
+
+ return self.sendData(p, self.myInfo.my_node_num,
+ portNum=portnums_pb2.PortNum.ADMIN_APP,
+ wantAck=True)
@property
def channelURL(self):
"""The sharable URL that describes the current channel
"""
- bytes = self.radioConfig.channel_settings.SerializeToString()
+ # Only keep the primary/secondary channels, assume primary is first
+ channelSet = apponly_pb2.ChannelSet()
+ for c in self.channels:
+ if c.role != channel_pb2.Channel.Role.DISABLED:
+ channelSet.settings.append(c.settings)
+ bytes = channelSet.SerializeToString()
s = base64.urlsafe_b64encode(bytes).decode('ascii')
- return f"https://www.meshtastic.org/c/#{s}"
+ return f"https://www.meshtastic.org/d/#{s}"
- def setURL(self, url, write=True):
+ def setURL(self, url):
"""Set mesh network URL"""
if self.radioConfig == None:
raise Exception("No RadioConfig has been read")
- # URLs are of the form https://www.meshtastic.org/c/#{base64_channel_settings}
+ # URLs are of the form https://www.meshtastic.org/d/#{base64_channel_set}
# Split on '/#' to find the base64 encoded channel settings
splitURL = url.split("/#")
decodedURL = base64.urlsafe_b64decode(splitURL[-1])
- self.radioConfig.channel_settings.ParseFromString(decodedURL)
- if write:
- self.writeConfig()
+ channelSet = apponly_pb2.ChannelSet()
+ channelSet.ParseFromString(decodedURL)
+ fixme("set self.channels, see https://developers.google.com/protocol-buffers/docs/reference/python-generated?csw=1#repeated-fields")
+ self._writeChannels()
def _waitConnected(self):
"""Block until the initial node db download is complete, or timeout
and raise an exception"""
- if not self.isConnected.wait(5.0): # timeout after 5 seconds
+ if not self.isConnected.wait(5.0): # timeout after 5 seconds
raise Exception("Timed out waiting for connection completion")
+ # If we failed while connecting, raise the connection to the client
+ if self.failure:
+ raise self.failure
+
def _generatePacketId(self):
"""Get a new unique packet ID"""
if self.currentPacketId is None:
@@ -1275,13 +1570,15 @@ noProto – If True, don't try to run our protocol on the link - just be a d
def _disconnected(self):
"""Called by subclasses to tell clients this interface has disconnected"""
self.isConnected.clear()
- pub.sendMessage("meshtastic.connection.lost", interface=self)
+ catchAndIgnore("disconnection publish", lambda: pub.sendMessage(
+ "meshtastic.connection.lost", interface=self))
def _connected(self):
"""Called by this class to tell clients we are now fully connected to a node
"""
self.isConnected.set()
- pub.sendMessage("meshtastic.connection.established", interface=self)
+ catchAndIgnore("connection publish", lambda: pub.sendMessage(
+ "meshtastic.connection.established", interface=self))
def _startConfig(self):
"""Start device packets flowing"""
@@ -1289,6 +1586,8 @@ noProto – If True, don't try to run our protocol on the link - just be a d
self.nodes = {} # nodes keyed by ID
self.nodesByNum = {} # nodes keyed by nodenum
self.radioConfig = None
+ self.channels = None
+ self.partialChannels = [] # We keep our channels in a temp array until finished
startConfig = mesh_pb2.ToRadio()
startConfig.want_config_id = MY_CONFIG_ID # we don't use this value
@@ -1297,14 +1596,69 @@ noProto – If True, don't try to run our protocol on the link - just be a d
def _sendToRadio(self, toRadio):
"""Send a ToRadio protobuf to the device"""
if self.noProto:
- logging.warn(f"Not sending packet because protocol use is disabled by noProto")
+ logging.warn(
+ f"Not sending packet because protocol use is disabled by noProto")
else:
+ logging.debug(f"Sending toRadio: {toRadio}")
self._sendToRadioImpl(toRadio)
def _sendToRadioImpl(self, toRadio):
"""Send a ToRadio protobuf to the device"""
logging.error(f"Subclass must provide toradio: {toRadio}")
+ def _handleConfigComplete(self):
+ """
+ Done with initial config messages, now send regular MeshPackets to ask for settings and channels
+ """
+ self._requestSettings()
+ self._requestChannel(0)
+
+ def _requestSettings(self):
+ """
+ Done with initial config messages, now send regular MeshPackets to ask for settings
+ """
+ p = admin_pb2.AdminMessage()
+ p.get_radio_request = True
+
+ def onResponse(p):
+ """A closure to handle the response packet"""
+ self.radioConfig = p["decoded"]["admin"]["raw"].get_radio_response
+
+ return self.sendData(p, self.myInfo.my_node_num,
+ portNum=portnums_pb2.PortNum.ADMIN_APP,
+ wantAck=True,
+ wantResponse=True,
+ onResponse=onResponse)
+
+ def _requestChannel(self, channelNum: int):
+ """
+ Done with initial config messages, now send regular MeshPackets to ask for settings
+ """
+ p = admin_pb2.AdminMessage()
+ p.get_channel_request = channelNum + 1
+ logging.debug(f"Requesting channel {channelNum}")
+
+ def onResponse(p):
+ """A closure to handle the response packet"""
+ c = p["decoded"]["admin"]["raw"].get_channel_response
+ self.partialChannels.append(c)
+ logging.debug(f"Received channel {c}")
+ # for stress testing, download all channels
+ # if channelNum >= self.myInfo.max_channels - 1:
+ if c.role == channel_pb2.Channel.Role.DISABLED or channelNum >= self.myInfo.max_channels - 1:
+ # Once we see a response that has NO settings, assume we are at the end of channels and stop fetching
+ self.channels = self.partialChannels
+ # FIXME, the following should only be called after we have settings and channels
+ self._connected() # Tell everone else we are ready to go
+ else:
+ self._requestChannel(channelNum + 1)
+
+ return self.sendData(p, self.myInfo.my_node_num,
+ portNum=portnums_pb2.PortNum.ADMIN_APP,
+ wantAck=True,
+ wantResponse=True,
+ onResponse=onResponse)
+
def _handleFromRadio(self, fromRadioBytes):
"""
Handle a packet that arrived from the radio(update model and publish events)
@@ -1316,12 +1670,21 @@ noProto – If True, don't try to run our protocol on the link - just be a d
logging.debug(f"Received: {asDict}")
if fromRadio.HasField("my_info"):
self.myInfo = fromRadio.my_info
+
+ failmsg = None
+ # Check for app too old
if self.myInfo.min_app_version > OUR_APP_VERSION:
- raise Exception(
- "This device needs a newer python client, please \"pip install --upgrade meshtastic\"")
- # start assigning our packet IDs from the opposite side of where our local device is assigning them
- elif fromRadio.HasField("radio"):
- self.radioConfig = fromRadio.radio
+ failmsg = "This device needs a newer python client, please \"pip install --upgrade meshtastic\". For more information see https://tinyurl.com/5bjsxu32"
+
+ # check for firmware too old
+ if self.myInfo.max_channels == 0:
+ failmsg = "This version of meshtastic-python requires device firmware version 1.2 or later. For more information see https://tinyurl.com/5bjsxu32"
+
+ if failmsg:
+ self.failure = Exception(failmsg)
+ self.isConnected.set() # let waitConnected return this exception
+ self.close()
+
elif fromRadio.HasField("node_info"):
node = asDict["nodeInfo"]
try:
@@ -1331,10 +1694,11 @@ noProto – If True, don't try to run our protocol on the link - just be a d
self.nodesByNum[node["num"]] = node
if "user" in node: # Some nodes might not have user/ids assigned yet
self.nodes[node["user"]["id"]] = node
- pub.sendMessage("meshtastic.node.updated", node=node, interface=self)
+ pub.sendMessage("meshtastic.node.updated",
+ node=node, interface=self)
elif fromRadio.config_complete_id == MY_CONFIG_ID:
# we ignore the config_complete_id, it is unneeded for our stream API fromRadio.config_complete_id
- self._connected()
+ self._handleConfigComplete()
elif fromRadio.HasField("packet"):
self._handlePacketFromRadio(fromRadio.packet)
elif fromRadio.rebooted:
@@ -1397,6 +1761,12 @@ noProto – If True, don't try to run our protocol on the link - just be a d
"""
asDict = google.protobuf.json_format.MessageToDict(meshPacket)
+
+ # We normally decompose the payload into a dictionary so that the client
+ # doesn't need to understand protobufs. But advanced clients might
+ # want the raw protobuf, so we provide it in "raw"
+ asDict["raw"] = meshPacket
+
# /add fromId and toId fields based on the node ID
asDict["fromId"] = self._nodeNumToId(asDict["from"])
asDict["toId"] = self._nodeNumToId(asDict["to"])
@@ -1405,68 +1775,59 @@ noProto – If True, don't try to run our protocol on the link - just be a d
# asObj = DotMap(asDict)
topic = "meshtastic.receive" # Generic unknown packet type
- # Warn users if firmware doesn't use new portnum based data encodings
- # But do not crash, because the lib will still basically work and ignore those packet types
- if meshPacket.decoded.HasField("user") or meshPacket.decoded.HasField("position"):
- logging.warn("Ignoring old position/user message. Recommend you update firmware to 1.1.20 or later")
+ decoded = asDict["decoded"]
+ # The default MessageToDict converts byte arrays into base64 strings.
+ # We don't want that - it messes up data payload. So slam in the correct
+ # byte array.
+ decoded["payload"] = meshPacket.decoded.payload
- if meshPacket.decoded.HasField("data"):
+ # UNKNOWN_APP is the default protobuf portnum value, and therefore if not set it will not be populated at all
+ # to make API usage easier, set it to prevent confusion
+ if not "portnum" in decoded:
+ decoded["portnum"] = portnums_pb2.PortNum.Name(
+ portnums_pb2.PortNum.UNKNOWN_APP)
- # The default MessageToDict converts byte arrays into base64 strings.
- # We don't want that - it messes up data payload. So slam in the correct
- # byte array.
- asDict["decoded"]["data"]["payload"] = meshPacket.decoded.data.payload
+ portnum = decoded["portnum"]
- # UNKNOWN_APP is the default protobuf portnum value, and therefore if not set it will not be populated at all
- # to make API usage easier, set it to prevent confusion
- if not "portnum" in asDict["decoded"]["data"]:
- asDict["decoded"]["data"]["portnum"] = portnums_pb2.PortNum.Name(portnums_pb2.PortNum.UNKNOWN_APP)
+ topic = f"meshtastic.receive.data.{portnum}"
- portnum = asDict["decoded"]["data"]["portnum"]
+ # decode position protobufs and update nodedb, provide decoded version as "position" in the published msg
+ # move the following into a 'decoders' API that clients could register?
+ portNumInt = meshPacket.decoded.portnum # we want portnum as an int
+ handler = protocols.get(portNumInt)
+ # The decoded protobuf as a dictionary (if we understand this message)
+ p = None
+ if handler is not None:
+ topic = f"meshtastic.receive.{handler.name}"
- topic = f"meshtastic.receive.data.{portnum}"
-
- # For text messages, we go ahead and decode the text to ascii for our users
- if portnum == portnums_pb2.PortNum.Name(portnums_pb2.PortNum.TEXT_MESSAGE_APP):
- topic = "meshtastic.receive.text"
-
- # We don't throw if the utf8 is invalid in the text message. Instead we just don't populate
- # the decoded.data.text and we log an error message. This at least allows some delivery to
- # the app and the app can deal with the missing decoded representation.
- #
- # Usually btw this problem is caused by apps sending binary data but setting the payload type to
- # text.
- try:
- asDict["decoded"]["data"]["text"] = meshPacket.decoded.data.payload.decode("utf-8")
- except Exception as ex:
- logging.error(f"Malformatted utf8 in text message: {ex}")
-
- # decode position protobufs and update nodedb, provide decoded version as "position" in the published msg
- if portnum == portnums_pb2.PortNum.Name(portnums_pb2.PortNum.POSITION_APP):
- topic = "meshtastic.receive.position"
- pb = mesh_pb2.Position()
- pb.ParseFromString(meshPacket.decoded.data.payload)
+ # Convert to protobuf if possible
+ if handler.protobufFactory is not None:
+ pb = handler.protobufFactory()
+ pb.ParseFromString(meshPacket.decoded.payload)
p = google.protobuf.json_format.MessageToDict(pb)
- self._fixupPosition(p)
- asDict["decoded"]["data"]["position"] = p
- # update node DB as needed
- self._getOrCreateByNum(asDict["from"])["position"] = p
+ asDict["decoded"][handler.name] = p
+ # Also provide the protobuf raw
+ asDict["decoded"][handler.name]["raw"] = pb
- # decode user protobufs and update nodedb, provide decoded version as "position" in the published msg
- if portnum == portnums_pb2.PortNum.Name(portnums_pb2.PortNum.NODEINFO_APP):
- topic = "meshtastic.receive.user"
- pb = mesh_pb2.User()
- pb.ParseFromString(meshPacket.decoded.data.payload)
- u = google.protobuf.json_format.MessageToDict(pb)
- asDict["decoded"]["data"]["user"] = u
- # update node DB as needed
- n = self._getOrCreateByNum(asDict["from"])
- n["user"] = u
- # We now have a node ID, make sure it is uptodate in that table
- self.nodes[u["id"]] = n
+ # Call specialized onReceive if necessary
+ if handler.onReceive is not None:
+ handler.onReceive(self, asDict)
+
+ # Is this message in response to a request, if so, look for a handler
+ requestId = decoded.get("requestId")
+ if requestId is not None:
+ # We ignore ACK packets, but send NAKs and data responses to the handlers
+ routing = decoded.get("routing")
+ isAck = routing is not None and ("errorReason" not in routing)
+ if not isAck:
+ # we keep the responseHandler in dict until we get a non ack
+ handler = self.responseHandlers.pop(requestId, None)
+ if handler is not None:
+ handler.callback(asDict)
logging.debug(f"Publishing topic {topic}")
- pub.sendMessage(topic, packet=asDict, interface=self)
+ catchAndIgnore(f"publishing {topic}", lambda: pub.sendMessage(
+ topic, packet=asDict, interface=self))
Subclasses
@@ -1486,9 +1847,14 @@ noProto – If True, don't try to run our protocol on the link - just be a d
def channelURL(self):
"""The sharable URL that describes the current channel
"""
- bytes = self.radioConfig.channel_settings.SerializeToString()
+ # Only keep the primary/secondary channels, assume primary is first
+ channelSet = apponly_pb2.ChannelSet()
+ for c in self.channels:
+ if c.role != channel_pb2.Channel.Role.DISABLED:
+ channelSet.settings.append(c.settings)
+ bytes = channelSet.SerializeToString()
s = base64.urlsafe_b64encode(bytes).decode('ascii')
- return f"https://www.meshtastic.org/c/#{s}"
+ return f"https://www.meshtastic.org/d/#{s}"
@@ -1566,13 +1932,18 @@ def channelURL(self):
data – the data to send, either as an array of bytes or as a protobuf (which will be automatically serialized to bytes)
destinationId {nodeId or nodeNum} – where to send this message (default: {BROADCAST_ADDR})
portNum – the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
-wantAck – True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
+wantAck – True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
+wantResponse – True if you want the service on the other side to send an application layer response
+onResponse – A closure of the form funct(packet), that will be called when a response packet arrives (or the transaction is NAKed due to non receipt)
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
def sendData(self, data, destinationId=BROADCAST_ADDR,
+ portNum=portnums_pb2.PortNum.PRIVATE_APP, wantAck=False,
+ wantResponse=False,
+ onResponse=None):
"""Send a data packet to some other node
Keyword Arguments:
@@ -1580,6 +1951,8 @@ wantAck – True if you want the message sent in a reliable manner (with ret
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
+ wantResponse -- True if you want the service on the other side to send an application layer response
+ onResponse -- A closure of the form funct(packet), that will be called when a response packet arrives (or the transaction is NAKed due to non receipt)
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
"""
@@ -1590,54 +1963,14 @@ wantAck – True if you want the message sent in a reliable manner (with ret
if len(data) > mesh_pb2.Constants.DATA_PAYLOAD_LEN:
raise Exception("Data payload too big")
meshPacket = mesh_pb2.MeshPacket()
- meshPacket.decoded.data.payload = data
- meshPacket.decoded.data.portnum = portNum
+ meshPacket.decoded.payload = data
+ meshPacket.decoded.portnum = portNum
meshPacket.decoded.want_response = wantResponse
- return self.sendPacket(meshPacket, destinationId, wantAck=wantAck)
Send a MeshPacket to the specified node (or if unspecified, broadcast).
-You probably don't want this - use sendData instead.
-
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
-
-
-Expand source code
-
-
def sendPacket(self, meshPacket, destinationId=BROADCAST_ADDR, wantAck=False):
- """Send a MeshPacket to the specified node (or if unspecified, broadcast).
- You probably don't want this - use sendData instead.
- Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
- """
- self._waitConnected()
-
- toRadio = mesh_pb2.ToRadio()
- # FIXME add support for non broadcast addresses
-
- if destinationId is None:
- raise Exception("destinationId must not be None")
- elif isinstance(destinationId, int):
- nodeNum = destinationId
- elif destinationId == BROADCAST_ADDR:
- nodeNum = BROADCAST_NUM
- else:
- nodeNum = self.nodes[destinationId]['num']
-
- meshPacket.to = nodeNum
- meshPacket.want_ack = wantAck
-
- # if the user hasn't set an ID for this packet (likely and recommended), we should pick a new unique ID
- # so the message can be tracked.
- if meshPacket.id == 0:
- meshPacket.id = self._generatePacketId()
-
- toRadio.packet.CopyFrom(meshPacket)
- self._sendToRadio(toRadio)
- return meshPacket
+ p = self._sendPacket(meshPacket, destinationId, wantAck=wantAck)
+ if onResponse is not None:
+ self._addResponseHandler(p.id, onResponse)
+ return p
Send a utf8 string to some other node, if the node has a display it will also be shown on the device.
@@ -1691,13 +2027,18 @@ the local position.
Keyword Arguments:
destinationId {nodeId or nodeNum} – where to send this message (default: {BROADCAST_ADDR})
portNum – the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
-wantAck – True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
+wantAck – True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
+wantResponse – True if you want the service on the other side to send an application layer response
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
def sendText(self, text: AnyStr,
+ destinationId=BROADCAST_ADDR,
+ wantAck=False,
+ wantResponse=False,
+ onResponse=None):
"""Send a utf8 string to some other node, if the node has a display it will also be shown on the device.
Arguments:
@@ -1707,11 +2048,15 @@ wantAck – True if you want the message sent in a reliable manner (with ret
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
+ wantResponse -- True if you want the service on the other side to send an application layer response
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
"""
return self.sendData(text.encode("utf-8"), destinationId,
- portNum=portnums_pb2.PortNum.TEXT_MESSAGE_APP, wantAck=wantAck, wantResponse=wantResponse)
@@ -1760,22 +2110,23 @@ wantAck – True if you want the message sent in a reliable manner (with ret
Expand source code
-
def setURL(self, url, write=True):
+
def setURL(self, url):
"""Set mesh network URL"""
if self.radioConfig == None:
raise Exception("No RadioConfig has been read")
- # URLs are of the form https://www.meshtastic.org/c/#{base64_channel_settings}
+ # URLs are of the form https://www.meshtastic.org/d/#{base64_channel_set}
# Split on '/#' to find the base64 encoded channel settings
splitURL = url.split("/#")
decodedURL = base64.urlsafe_b64decode(splitURL[-1])
- self.radioConfig.channel_settings.ParseFromString(decodedURL)
- if write:
- self.writeConfig()
def waitForConfig(self, sleep=.1, maxsecs=20, attrs=('myInfo', 'nodes', 'radioConfig', 'channels')):
"""Block until radio config is received. Returns True if config has been received."""
for _ in range(int(maxsecs/sleep)):
if all(map(lambda a: getattr(self, a, None), attrs)):
@@ -1806,14 +2157,44 @@ wantAck – True if you want the message sent in a reliable manner (with ret
if self.radioConfig == None:
raise Exception("No RadioConfig has been read")
- t = mesh_pb2.ToRadio()
- t.set_radio.CopyFrom(self.radioConfig)
- self._sendToRadio(t)
+ p = admin_pb2.AdminMessage()
+ p.set_radio.CopyFrom(self.radioConfig)
+
+ self.sendData(p, self.myInfo.my_node_num,
+ portNum=portnums_pb2.PortNum.ADMIN_APP,
+ wantAck=True)
logging.debug("Wrote config")
+
+class ResponseHandler
+(callback:Â Callable)
+
+
+
A pending response callback, waiting for a response to one of our messages
+
+
+Expand source code
+
+
class ResponseHandler(NamedTuple):
+ """A pending response callback, waiting for a response to one of our messages"""
+ # requestId: int - used only as a key
+ callback: Callable
+
+
Ancestors
+
+
builtins.tuple
+
+
Instance variables
+
+
var callback :Â Callable
+
+
Alias for field number 0
+
+
+
class SerialInterface(devPath=None, debugOut=None, noProto=False, connectNow=True)
@@ -1884,7 +2265,6 @@ debugOut {stream} – If a stream is provided, any debug serial output from
@@ -1958,7 +2338,7 @@ debugOut {stream} – If a stream is provided, any debug serial output from
time.sleep(0.1) # wait 100ms to give device time to start running
self._rxThread.start()
- if not self.noProto: # Wait for the db download if using the protocol
+ if not self.noProto: # Wait for the db download if using the protocol
self._waitConnected()
def _disconnected(self):
@@ -2044,13 +2424,16 @@ debugOut {stream} – If a stream is provided, any debug serial output from
# logging.debug(f"timeout")
pass
except serial.SerialException as ex:
- if not self._wantExit: # We might intentionally get an exception during shutdown
- logging.warn(f"Meshtastic serial port disconnected, disconnecting... {ex}")
+ if not self._wantExit: # We might intentionally get an exception during shutdown
+ logging.warn(
+ f"Meshtastic serial port disconnected, disconnecting... {ex}")
except OSError as ex:
- if not self._wantExit: # We might intentionally get an exception during shutdown
- logging.error(f"Unexpected OSError, terminating meshtastic reader... {ex}")
+ if not self._wantExit: # We might intentionally get an exception during shutdown
+ logging.error(
+ f"Unexpected OSError, terminating meshtastic reader... {ex}")
except Exception as ex:
- logging.error(f"Unexpected exception, terminating meshtastic reader... {ex}")
+ logging.error(
+ f"Unexpected exception, terminating meshtastic reader... {ex}")
finally:
logging.debug("reader is exiting")
self._disconnected()
@@ -2107,7 +2490,7 @@ start the reading thread later.
time.sleep(0.1) # wait 100ms to give device time to start running
self._rxThread.start()
- if not self.noProto: # Wait for the db download if using the protocol
+ if not self.noProto: # Wait for the db download if using the protocol
self._waitConnected()
@@ -2118,7 +2501,6 @@ start the reading thread later.
Interface class for meshtastic devices over a TCP link
@@ -2145,7 +2527,7 @@ hostname {string} – Hostname/IP address of the device to connect toclass TCPInterface(StreamInterface):
"""Interface class for meshtastic devices over a TCP link"""
- def __init__(self, hostname, debugOut=None, noProto=False, connectNow=True, portNumber=4403):
+ def __init__(self, hostname: AnyStr, debugOut=None, noProto=False, connectNow=True, portNumber=4403):
"""Constructor, opens a connection to a specified IP address/hostname
Keyword Arguments:
@@ -2168,9 +2550,9 @@ hostname {string} – Hostname/IP address of the device to connect toclose
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/docs/meshtastic/test.html b/docs/meshtastic/test.html
index ccd2028..12327be 100644
--- a/docs/meshtastic/test.html
+++ b/docs/meshtastic/test.html
@@ -47,6 +47,7 @@ testNumber = 0
sendingInterface = None
+
def onReceive(packet, interface):
"""Callback invoked when a packet arrives"""
if sendingInterface == interface:
@@ -55,10 +56,11 @@ def onReceive(packet, interface):
print(f"From {interface.stream.port}: {packet}")
p = DotMap(packet)
- if p.decoded.data.portnum == "TEXT_MESSAGE_APP":
+ if p.decoded.portnum == "TEXT_MESSAGE_APP":
# We only care a about clear text packets
receivedPackets.append(p)
+
def onNode(node):
"""Callback invoked when the node DB changes"""
print(f"Node changed: {node}")
@@ -91,7 +93,7 @@ def testSend(fromInterface, toInterface, isBroadcast=False, asBinary=False):
toNode = toInterface.myInfo.my_node_num
logging.info(f"Sending test packet from {fromNode} to {toNode}")
- wantAck = False # Don't want any sort of reliaible sending
+ wantAck = False # Don't want any sort of reliaible sending
global sendingInterface
sendingInterface = fromInterface
if not asBinary:
@@ -99,11 +101,11 @@ def testSend(fromInterface, toInterface, isBroadcast=False, asBinary=False):
else:
fromInterface.sendData((f"Binary {testNumber}").encode(
"utf-8"), toNode, wantAck=wantAck)
- for sec in range(45): # max of 45 secs before we timeout
+ for sec in range(45): # max of 45 secs before we timeout
time.sleep(1)
if (len(receivedPackets) >= 1):
return True
- return False # Failed to send
+ return False # Failed to send
def testThread(numTests=50):
@@ -116,14 +118,15 @@ def testThread(numTests=50):
isBroadcast = True
# asBinary=(i % 2 == 0)
success = testSend(
- interfaces[0], interfaces[1], isBroadcast, asBinary = False)
+ interfaces[0], interfaces[1], isBroadcast, asBinary=False)
if not success:
numFail = numFail + 1
logging.error(
f"Test failed, expected packet not received ({numFail} failures so far)")
else:
numSuccess = numSuccess + 1
- logging.info(f"Test succeeded ({numSuccess} successes ({numFail} failures) so far)")
+ logging.info(
+ f"Test succeeded ({numSuccess} successes ({numFail} failures) so far)")
if numFail >= 3:
for i in interfaces:
@@ -137,11 +140,13 @@ def onConnection(topic=pub.AUTO_TOPIC):
"""Callback invoked when we connect/disconnect from a radio"""
print(f"Connection changed: {topic.getName()}")
+
def openDebugLog(portName):
debugname = "log" + portName.replace("/", "_")
logging.info(f"Writing serial debugging to {debugname}")
return open(debugname, 'w+', buffering=1)
+
def testAll():
"""
Run a series of tests using devices we can find.
@@ -163,8 +168,7 @@ def testAll():
testThread()
for i in interfaces:
- i.close()
-
+ i.close()
@@ -226,7 +230,7 @@ def testAll():
print(f"From {interface.stream.port}: {packet}")
p = DotMap(packet)
- if p.decoded.data.portnum == "TEXT_MESSAGE_APP":
+ if p.decoded.portnum == "TEXT_MESSAGE_APP":
# We only care a about clear text packets
receivedPackets.append(p)
@@ -334,7 +338,7 @@ toInterface {[type]} – [description]
toNode = toInterface.myInfo.my_node_num
logging.info(f"Sending test packet from {fromNode} to {toNode}")
- wantAck = False # Don't want any sort of reliaible sending
+ wantAck = False # Don't want any sort of reliaible sending
global sendingInterface
sendingInterface = fromInterface
if not asBinary:
@@ -342,11 +346,11 @@ toInterface {[type]} – [description]
else:
fromInterface.sendData((f"Binary {testNumber}").encode(
"utf-8"), toNode, wantAck=wantAck)
- for sec in range(45): # max of 45 secs before we timeout
+ for sec in range(45): # max of 45 secs before we timeout
time.sleep(1)
if (len(receivedPackets) >= 1):
return True
- return False # Failed to send
+ return False # Failed to send
@@ -368,14 +372,15 @@ toInterface {[type]} – [description]
isBroadcast = True
# asBinary=(i % 2 == 0)
success = testSend(
- interfaces[0], interfaces[1], isBroadcast, asBinary = False)
+ interfaces[0], interfaces[1], isBroadcast, asBinary=False)
if not success:
numFail = numFail + 1
logging.error(
f"Test failed, expected packet not received ({numFail} failures so far)")
else:
numSuccess = numSuccess + 1
- logging.info(f"Test succeeded ({numSuccess} successes ({numFail} failures) so far)")
+ logging.info(
+ f"Test succeeded ({numSuccess} successes ({numFail} failures) so far)")
if numFail >= 3:
for i in interfaces:
diff --git a/docs/meshtastic/tunnel.html b/docs/meshtastic/tunnel.html
index 1952857..015a5e7 100644
--- a/docs/meshtastic/tunnel.html
+++ b/docs/meshtastic/tunnel.html
@@ -34,7 +34,7 @@
# sudo bin/run.sh --port /dev/ttyUSB0 --setch-shortfast
# sudo bin/run.sh --port /dev/ttyUSB0 --tunnel --debug
# ssh -Y root@192.168.10.151 (or dietpi), default password p
-# ncat -e /bin/cat -k -u -l 1235
+# ncat -e /bin/cat -k -u -l 1235
# ncat -u 10.115.64.152 1235
# ping -c 1 -W 20 10.115.64.152
# ping -i 30 -W 30 10.115.64.152
@@ -43,7 +43,8 @@
from . import portnums_pb2
from pubsub import pub
-import logging, threading
+import logging
+import threading
# A new non standard log level that is lower level than DEBUG
LOG_TRACE = 5
@@ -54,8 +55,8 @@ tunnelInstance = None
"""A list of chatty UDP services we should never accidentally
forward to our slow network"""
udpBlacklist = {
- 1900, # SSDP
- 5353, # multicast DNS
+ 1900, # SSDP
+ 5353, # multicast DNS
}
"""A list of TCP services to block"""
@@ -63,31 +64,36 @@ tcpBlacklist = {}
"""A list of protocols we ignore"""
protocolBlacklist = {
- 0x02, # IGMP
- 0x80, # Service-Specific Connection-Oriented Protocol in a Multilink and Connectionless Environment
+ 0x02, # IGMP
+ 0x80, # Service-Specific Connection-Oriented Protocol in a Multilink and Connectionless Environment
}
+
def hexstr(barray):
"""Print a string of hex digits"""
return ":".join('{:02x}'.format(x) for x in barray)
+
def ipstr(barray):
"""Print a string of ip digits"""
return ".".join('{}'.format(x) for x in barray)
+
def readnet_u16(p, offset):
"""Read big endian u16 (network byte order)"""
return p[offset] * 256 + p[offset + 1]
+
def onTunnelReceive(packet, interface):
"""Callback for received tunneled messages from mesh
-
+
FIXME figure out how to do closures with methods in python"""
tunnelInstance.onReceive(packet)
+
class Tunnel:
"""A TUN based IP tunnel over meshtastic"""
-
+
def __init__(self, iface, subnet=None, netmask="255.255.0.0"):
"""
Constructor
@@ -105,7 +111,8 @@ class Tunnel:
global tunnelInstance
tunnelInstance = self
- logging.info("Starting IP to mesh tunnel (you must be root for this *pre-alpha* feature to work). Mesh members:")
+ logging.info(
+ "Starting IP to mesh tunnel (you must be root for this *pre-alpha* feature to work). Mesh members:")
pub.subscribe(onTunnelReceive, "meshtastic.receive.data.IP_TUNNEL_APP")
myAddr = self._nodeNumToIp(self.iface.myInfo.my_node_num)
@@ -113,24 +120,26 @@ class Tunnel:
for node in self.iface.nodes.values():
nodeId = node["user"]["id"]
ip = self._nodeNumToIp(node["num"])
- logging.info(f"Node { nodeId } has IP address { ip }")
+ logging.info(f"Node { nodeId } has IP address { ip }")
logging.debug("creating TUN device with MTU=200")
# FIXME - figure out real max MTU, it should be 240 - the overhead bytes for SubPacket and Data
from pytap2 import TapDevice
self.tun = TapDevice(name="mesh")
self.tun.up()
- self.tun.ifconfig(address=myAddr,netmask=netmask,mtu=200)
+ self.tun.ifconfig(address=myAddr, netmask=netmask, mtu=200)
logging.debug(f"starting TUN reader, our IP address is {myAddr}")
- self._rxThread = threading.Thread(target=self.__tunReader, args=(), daemon=True)
+ self._rxThread = threading.Thread(
+ target=self.__tunReader, args=(), daemon=True)
self._rxThread.start()
def onReceive(self, packet):
- p = packet["decoded"]["data"]["payload"]
+ p = packet["decoded"]["payload"]
if packet["from"] == self.iface.myInfo.my_node_num:
logging.debug("Ignoring message we sent")
else:
- logging.debug(f"Received mesh tunnel message type={type(p)} len={len(p)}")
+ logging.debug(
+ f"Received mesh tunnel message type={type(p)} len={len(p)}")
# we don't really need to check for filtering here (sender should have checked), but this provides
# useful debug printing on types of packets received
if not self._shouldFilterPacket(p):
@@ -142,36 +151,43 @@ class Tunnel:
srcaddr = p[12:16]
destAddr = p[16:20]
subheader = 20
- ignore = False # Assume we will be forwarding the packet
+ ignore = False # Assume we will be forwarding the packet
if protocol in protocolBlacklist:
ignore = True
- logging.log(LOG_TRACE, f"Ignoring blacklisted protocol 0x{protocol:02x}")
- elif protocol == 0x01: # ICMP
+ logging.log(
+ LOG_TRACE, f"Ignoring blacklisted protocol 0x{protocol:02x}")
+ elif protocol == 0x01: # ICMP
icmpType = p[20]
icmpCode = p[21]
checksum = p[22:24]
- logging.debug(f"forwarding ICMP message src={ipstr(srcaddr)}, dest={ipstr(destAddr)}, type={icmpType}, code={icmpCode}, checksum={checksum}")
+ logging.debug(
+ f"forwarding ICMP message src={ipstr(srcaddr)}, dest={ipstr(destAddr)}, type={icmpType}, code={icmpCode}, checksum={checksum}")
# reply to pings (swap src and dest but keep rest of packet unchanged)
#pingback = p[:12]+p[16:20]+p[12:16]+p[20:]
- #tap.write(pingback)
- elif protocol == 0x11: # UDP
+ # tap.write(pingback)
+ elif protocol == 0x11: # UDP
srcport = readnet_u16(p, subheader)
destport = readnet_u16(p, subheader + 2)
if destport in udpBlacklist:
ignore = True
- logging.log(LOG_TRACE, f"ignoring blacklisted UDP port {destport}")
+ logging.log(
+ LOG_TRACE, f"ignoring blacklisted UDP port {destport}")
else:
- logging.debug(f"forwarding udp srcport={srcport}, destport={destport}")
- elif protocol == 0x06: # TCP
+ logging.debug(
+ f"forwarding udp srcport={srcport}, destport={destport}")
+ elif protocol == 0x06: # TCP
srcport = readnet_u16(p, subheader)
destport = readnet_u16(p, subheader + 2)
if destport in tcpBlacklist:
ignore = True
- logging.log(LOG_TRACE, f"ignoring blacklisted TCP port {destport}")
+ logging.log(
+ LOG_TRACE, f"ignoring blacklisted TCP port {destport}")
else:
- logging.debug(f"forwarding tcp srcport={srcport}, destport={destport}")
+ logging.debug(
+ f"forwarding tcp srcport={srcport}, destport={destport}")
else:
- logging.warning(f"forwarding unexpected protocol 0x{protocol:02x}, src={ipstr(srcaddr)}, dest={ipstr(destAddr)}")
+ logging.warning(
+ f"forwarding unexpected protocol 0x{protocol:02x}, src={ipstr(srcaddr)}, dest={ipstr(destAddr)}")
return ignore
@@ -207,10 +223,13 @@ class Tunnel:
"""Forward the provided IP packet into the mesh"""
nodeId = self._ipToNodeId(destAddr)
if nodeId is not None:
- logging.debug(f"Forwarding packet bytelen={len(p)} dest={ipstr(destAddr)}, destNode={nodeId}")
- self.iface.sendData(p, nodeId, portnums_pb2.IP_TUNNEL_APP, wantAck = False)
+ logging.debug(
+ f"Forwarding packet bytelen={len(p)} dest={ipstr(destAddr)}, destNode={nodeId}")
+ self.iface.sendData(
+ p, nodeId, portnums_pb2.IP_TUNNEL_APP, wantAck=False)
else:
- logging.warning(f"Dropping packet because no node found for destIP={ipstr(destAddr)}")
+ logging.warning(
+ f"Dropping packet because no node found for destIP={ipstr(destAddr)}")
def close(self):
self.tun.close()
@@ -279,7 +298,7 @@ forward to our slow network
def onTunnelReceive(packet, interface):
"""Callback for received tunneled messages from mesh
-
+
FIXME figure out how to do closures with methods in python"""
tunnelInstance.onReceive(packet)
@@ -318,7 +337,7 @@ subnet is used to construct our network number (normally 10.115.x.x)
class Tunnel:
"""A TUN based IP tunnel over meshtastic"""
-
+
def __init__(self, iface, subnet=None, netmask="255.255.0.0"):
"""
Constructor
@@ -336,7 +355,8 @@ subnet is used to construct our network number (normally 10.115.x.x)
global tunnelInstance
tunnelInstance = self
- logging.info("Starting IP to mesh tunnel (you must be root for this *pre-alpha* feature to work). Mesh members:")
+ logging.info(
+ "Starting IP to mesh tunnel (you must be root for this *pre-alpha* feature to work). Mesh members:")
pub.subscribe(onTunnelReceive, "meshtastic.receive.data.IP_TUNNEL_APP")
myAddr = self._nodeNumToIp(self.iface.myInfo.my_node_num)
@@ -344,24 +364,26 @@ subnet is used to construct our network number (normally 10.115.x.x)
for node in self.iface.nodes.values():
nodeId = node["user"]["id"]
ip = self._nodeNumToIp(node["num"])
- logging.info(f"Node { nodeId } has IP address { ip }")
+ logging.info(f"Node { nodeId } has IP address { ip }")
logging.debug("creating TUN device with MTU=200")
# FIXME - figure out real max MTU, it should be 240 - the overhead bytes for SubPacket and Data
from pytap2 import TapDevice
self.tun = TapDevice(name="mesh")
self.tun.up()
- self.tun.ifconfig(address=myAddr,netmask=netmask,mtu=200)
+ self.tun.ifconfig(address=myAddr, netmask=netmask, mtu=200)
logging.debug(f"starting TUN reader, our IP address is {myAddr}")
- self._rxThread = threading.Thread(target=self.__tunReader, args=(), daemon=True)
+ self._rxThread = threading.Thread(
+ target=self.__tunReader, args=(), daemon=True)
self._rxThread.start()
def onReceive(self, packet):
- p = packet["decoded"]["data"]["payload"]
+ p = packet["decoded"]["payload"]
if packet["from"] == self.iface.myInfo.my_node_num:
logging.debug("Ignoring message we sent")
else:
- logging.debug(f"Received mesh tunnel message type={type(p)} len={len(p)}")
+ logging.debug(
+ f"Received mesh tunnel message type={type(p)} len={len(p)}")
# we don't really need to check for filtering here (sender should have checked), but this provides
# useful debug printing on types of packets received
if not self._shouldFilterPacket(p):
@@ -373,36 +395,43 @@ subnet is used to construct our network number (normally 10.115.x.x)
srcaddr = p[12:16]
destAddr = p[16:20]
subheader = 20
- ignore = False # Assume we will be forwarding the packet
+ ignore = False # Assume we will be forwarding the packet
if protocol in protocolBlacklist:
ignore = True
- logging.log(LOG_TRACE, f"Ignoring blacklisted protocol 0x{protocol:02x}")
- elif protocol == 0x01: # ICMP
+ logging.log(
+ LOG_TRACE, f"Ignoring blacklisted protocol 0x{protocol:02x}")
+ elif protocol == 0x01: # ICMP
icmpType = p[20]
icmpCode = p[21]
checksum = p[22:24]
- logging.debug(f"forwarding ICMP message src={ipstr(srcaddr)}, dest={ipstr(destAddr)}, type={icmpType}, code={icmpCode}, checksum={checksum}")
+ logging.debug(
+ f"forwarding ICMP message src={ipstr(srcaddr)}, dest={ipstr(destAddr)}, type={icmpType}, code={icmpCode}, checksum={checksum}")
# reply to pings (swap src and dest but keep rest of packet unchanged)
#pingback = p[:12]+p[16:20]+p[12:16]+p[20:]
- #tap.write(pingback)
- elif protocol == 0x11: # UDP
+ # tap.write(pingback)
+ elif protocol == 0x11: # UDP
srcport = readnet_u16(p, subheader)
destport = readnet_u16(p, subheader + 2)
if destport in udpBlacklist:
ignore = True
- logging.log(LOG_TRACE, f"ignoring blacklisted UDP port {destport}")
+ logging.log(
+ LOG_TRACE, f"ignoring blacklisted UDP port {destport}")
else:
- logging.debug(f"forwarding udp srcport={srcport}, destport={destport}")
- elif protocol == 0x06: # TCP
+ logging.debug(
+ f"forwarding udp srcport={srcport}, destport={destport}")
+ elif protocol == 0x06: # TCP
srcport = readnet_u16(p, subheader)
destport = readnet_u16(p, subheader + 2)
if destport in tcpBlacklist:
ignore = True
- logging.log(LOG_TRACE, f"ignoring blacklisted TCP port {destport}")
+ logging.log(
+ LOG_TRACE, f"ignoring blacklisted TCP port {destport}")
else:
- logging.debug(f"forwarding tcp srcport={srcport}, destport={destport}")
+ logging.debug(
+ f"forwarding tcp srcport={srcport}, destport={destport}")
else:
- logging.warning(f"forwarding unexpected protocol 0x{protocol:02x}, src={ipstr(srcaddr)}, dest={ipstr(destAddr)}")
+ logging.warning(
+ f"forwarding unexpected protocol 0x{protocol:02x}, src={ipstr(srcaddr)}, dest={ipstr(destAddr)}")
return ignore
@@ -438,10 +467,13 @@ subnet is used to construct our network number (normally 10.115.x.x)
"""Forward the provided IP packet into the mesh"""
nodeId = self._ipToNodeId(destAddr)
if nodeId is not None:
- logging.debug(f"Forwarding packet bytelen={len(p)} dest={ipstr(destAddr)}, destNode={nodeId}")
- self.iface.sendData(p, nodeId, portnums_pb2.IP_TUNNEL_APP, wantAck = False)
+ logging.debug(
+ f"Forwarding packet bytelen={len(p)} dest={ipstr(destAddr)}, destNode={nodeId}")
+ self.iface.sendData(
+ p, nodeId, portnums_pb2.IP_TUNNEL_APP, wantAck=False)
else:
- logging.warning(f"Dropping packet because no node found for destIP={ipstr(destAddr)}")
+ logging.warning(
+ f"Dropping packet because no node found for destIP={ipstr(destAddr)}")
def close(self):
self.tun.close()
@@ -471,11 +503,12 @@ subnet is used to construct our network number (normally 10.115.x.x)
Expand source code
def onReceive(self, packet):
- p = packet["decoded"]["data"]["payload"]
+ p = packet["decoded"]["payload"]
if packet["from"] == self.iface.myInfo.my_node_num:
logging.debug("Ignoring message we sent")
else:
- logging.debug(f"Received mesh tunnel message type={type(p)} len={len(p)}")
+ logging.debug(
+ f"Received mesh tunnel message type={type(p)} len={len(p)}")
# we don't really need to check for filtering here (sender should have checked), but this provides
# useful debug printing on types of packets received
if not self._shouldFilterPacket(p):
@@ -495,10 +528,13 @@ subnet is used to construct our network number (normally 10.115.x.x)
"""Forward the provided IP packet into the mesh"""
nodeId = self._ipToNodeId(destAddr)
if nodeId is not None:
- logging.debug(f"Forwarding packet bytelen={len(p)} dest={ipstr(destAddr)}, destNode={nodeId}")
- self.iface.sendData(p, nodeId, portnums_pb2.IP_TUNNEL_APP, wantAck = False)
+ logging.debug(
+ f"Forwarding packet bytelen={len(p)} dest={ipstr(destAddr)}, destNode={nodeId}")
+ self.iface.sendData(
+ p, nodeId, portnums_pb2.IP_TUNNEL_APP, wantAck=False)
else:
- logging.warning(f"Dropping packet because no node found for destIP={ipstr(destAddr)}")
+ logging.warning(
+ f"Dropping packet because no node found for destIP={ipstr(destAddr)}")
diff --git a/docs/meshtastic/util.html b/docs/meshtastic/util.html
index efc4a67..973bd30 100644
--- a/docs/meshtastic/util.html
+++ b/docs/meshtastic/util.html
@@ -34,6 +34,18 @@ import serial.tools.list_ports
blacklistVids = dict.fromkeys([0x1366])
+def fixme(message):
+ raise Exception(f"FIXME: {message}")
+
+
+def catchAndIgnore(reason, closure):
+ """Call a closure but if it throws an excpetion print it and continue"""
+ try:
+ closure()
+ except BaseException as ex:
+ logging.error(f"Exception thrown in {reason}: {ex}")
+
+
def findPorts():
"""Find all ports that might have meshtastic devices
@@ -61,6 +73,23 @@ class dotdict(dict):
Functions
+
+def catchAndIgnore(reason, closure)
+
+
+
Call a closure but if it throws an excpetion print it and continue
+
+
+Expand source code
+
+
def catchAndIgnore(reason, closure):
+ """Call a closure but if it throws an excpetion print it and continue"""
+ try:
+ closure()
+ except BaseException as ex:
+ logging.error(f"Exception thrown in {reason}: {ex}")
+
+
def findPorts()
@@ -85,6 +114,19 @@ class dotdict(dict):
return l
+