diff --git a/docs/meshtastic/index.html b/docs/meshtastic/index.html
index a59de7f..e44f687 100644
--- a/docs/meshtastic/index.html
+++ b/docs/meshtastic/index.html
@@ -32,7 +32,7 @@ the device.
Includes always up-to-date location and username information for each
node in the mesh.
This is a read-only datastructure.
-
myNodeInfo - You probably don't want this.
+
myNodeInfo - Contains read-only information about the local radio device (software version, hardware version, etc)
Published PubSub topics
We use a publish-subscribe model to communicate asynchronous events.
@@ -80,7 +80,7 @@ properties of StreamInterface:
the device.
- nodes - The database of received nodes. Includes always up-to-date location and username information for each
node in the mesh. This is a read-only datastructure.
-- myNodeInfo - You probably don't want this.
+- myNodeInfo - Contains read-only information about the local radio device (software version, hardware version, etc)
## Published PubSub topics
@@ -116,13 +116,14 @@ interface = meshtastic.StreamInterface() # By default will try to find a meshtas
import google.protobuf.json_format
import serial
-import serial.tools.list_ports
import threading
import logging
import sys
import traceback
from . import mesh_pb2
+from . import util
from pubsub import pub
+from dotmap import DotMap
START1 = 0x94
START2 = 0xc3
@@ -130,19 +131,26 @@ HEADER_LEN = 4
MAX_TO_FROM_RADIO_SIZE = 512
BROADCAST_ADDR = "all" # A special ID that means broadcast
-
+BROADCAST_NUM = 255
MY_CONFIG_ID = 42
class MeshInterface:
"""Interface class for meshtastic devices
+
+ Properties:
+
+ isConnected
+ nodes
+ debugOut
"""
def __init__(self, debugOut=None):
"""Constructor"""
self.debugOut = debugOut
self.nodes = None # FIXME
+ self.isConnected = False
self._startConfig()
def sendText(self, text, destinationId=BROADCAST_ADDR):
@@ -152,7 +160,7 @@ class MeshInterface:
text {string} -- The text to send
Keyword Arguments:
- destinationId {nodeId} -- where to send this message (default: {BROADCAST_ADDR})
+ destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
"""
self.sendData(text.encode("utf-8"), destinationId,
dataType=mesh_pb2.Data.CLEAR_TEXT)
@@ -169,13 +177,38 @@ class MeshInterface:
You probably don't want this - use sendData instead."""
toRadio = mesh_pb2.ToRadio()
# FIXME add support for non broadcast addresses
- meshPacket.to = 255
+
+ if isinstance(destinationId, int):
+ nodeNum = destinationId
+ elif nodeNum == BROADCAST_ADDR:
+ nodeNum = 255
+ else:
+ raise Exception(
+ "invalid destination addr, we don't yet support nodeid lookup")
+
+ meshPacket.to = nodeNum
toRadio.packet.CopyFrom(meshPacket)
self._sendToRadio(toRadio)
+ def writeConfig(self):
+ """Write the current (edited) radioConfig to the device"""
+ if self.radioConfig == None:
+ raise Exception("No RadioConfig has been read")
+
+ t = mesh_pb2.ToRadio()
+ t.set_radio.CopyFrom(self.radioConfig)
+ self._sendToRadio(t)
+
def _disconnected(self):
"""Called by subclasses to tell clients this interface has disconnected"""
- pub.sendMessage("meshtastic.connection.lost")
+ self.isConnected = False
+ pub.sendMessage("meshtastic.connection.lost", interface=self)
+
+ def _connected(self):
+ """Called by this class to tell clients we are now fully connected to a node
+ """
+ self.isConnected = True
+ pub.sendMessage("meshtastic.connection.established", interface=self)
def _startConfig(self):
"""Start device packets flowing"""
@@ -211,12 +244,25 @@ class MeshInterface:
self.nodes[node.user.id] = node
elif fromRadio.config_complete_id == MY_CONFIG_ID:
# we ignore the config_complete_id, it is unneeded for our stream API fromRadio.config_complete_id
- pub.sendMessage("meshtastic.connection.established")
+ self._connected()
elif fromRadio.HasField("packet"):
self._handlePacketFromRadio(fromRadio.packet)
+ elif fromRadio.rebooted:
+ self._disconnected()
+ self._startConfig() # redownload the node db etc...
else:
logging.warn("Unexpected FromRadio payload")
+ def _nodeNumToId(self, num):
+ if num == BROADCAST_NUM:
+ return BROADCAST_ADDR
+
+ try:
+ return self._nodesByNum[num].user.id
+ except:
+ logging.error("Node not found for fromId")
+ return None
+
def _handlePacketFromRadio(self, meshPacket):
"""Handle a MeshPacket that just arrived from the radio
@@ -225,16 +271,29 @@ class MeshInterface:
- meshtastic.receive.user(packet = MeshPacket dictionary)
- meshtastic.receive.data(packet = MeshPacket dictionary)
"""
- # FIXME, update node DB as needed
- json = google.protobuf.json_format.MessageToDict(meshPacket)
+
+ asDict = google.protobuf.json_format.MessageToDict(meshPacket)
+ # /add fromId and toId fields based on the node ID
+ asDict["fromId"] = self._nodeNumToId(asDict["from"])
+ asDict["toId"] = self._nodeNumToId(asDict["to"])
+
+ # We could provide our objects as DotMaps - which work with . notation or as dictionaries
+ #asObj = DotMap(asDict)
+ topic = None
if meshPacket.payload.HasField("position"):
- pub.sendMessage("meshtastic.receive.position", packet=json)
+ topic = "meshtastic.receive.position"
+ # FIXME, update node DB as needed
if meshPacket.payload.HasField("user"):
- pub.sendMessage("meshtastic.receive.user",
- packet=json)
+ topic = "meshtastic.receive.user"
+ # FIXME, update node DB as needed
if meshPacket.payload.HasField("data"):
- pub.sendMessage("meshtastic.receive.data",
- packet=json)
+ topic = "meshtastic.receive.data"
+ # For text messages, we go ahead and decode the text to ascii for our users
+ # if asObj.payload.data.typ == "CLEAR_TEXT":
+ # asObj.payload.data.text = asObj.payload.data.payload.decode(
+ # "utf-8")
+
+ pub.sendMessage(topic, packet=asDict, interface=self)
class StreamInterface(MeshInterface):
@@ -254,17 +313,17 @@ class StreamInterface(MeshInterface):
"""
if devPath is None:
- ports = list(filter(lambda port: port.vid != None,
- serial.tools.list_ports.comports()))
+ ports = util.findPorts()
if len(ports) == 0:
raise Exception("No Meshtastic devices detected")
elif len(ports) > 1:
raise Exception(
f"Multiple ports detected, you must specify a device, such as {ports[0].device}")
else:
- devPath = ports[0].device
+ devPath = ports[0]
logging.debug(f"Connecting to {devPath}")
+ self.devPath = devPath
self._rxBuf = bytes() # empty
self._wantExit = False
self.stream = serial.Serial(
@@ -288,6 +347,8 @@ class StreamInterface(MeshInterface):
logging.debug("Closing serial stream")
# pyserial cancel_read doesn't seem to work, therefore we ask the reader thread to close things for us
self._wantExit = True
+ if self._rxThread != threading.current_thread():
+ self._rxThread.join() # wait for it to exit
def __reader(self):
"""The reader thread that reads bytes from our stream"""
@@ -327,10 +388,13 @@ class StreamInterface(MeshInterface):
try:
self._handleFromRadio(self._rxBuf[HEADER_LEN:])
except Exception as ex:
- logging.warn(
+ logging.error(
f"Error handling FromRadio, possibly corrupted? {ex}")
traceback.print_exc()
self._rxBuf = empty
+ else:
+ # logging.debug(f"timeout on {self.devPath}")
+ pass
logging.debug("reader is exiting")
self.stream.close()
self._disconnected()
@@ -343,6 +407,14 @@ class StreamInterface(MeshInterface):
@@ -358,6 +430,10 @@ class StreamInterface(MeshInterface):
Interface class for meshtastic devices
+
Properties:
+
isConnected
+nodes
+debugOut
Constructor
@@ -365,12 +441,19 @@ class StreamInterface(MeshInterface):
class MeshInterface:
"""Interface class for meshtastic devices
+
+ Properties:
+
+ isConnected
+ nodes
+ debugOut
"""
def __init__(self, debugOut=None):
"""Constructor"""
self.debugOut = debugOut
self.nodes = None # FIXME
+ self.isConnected = False
self._startConfig()
def sendText(self, text, destinationId=BROADCAST_ADDR):
@@ -380,7 +463,7 @@ class StreamInterface(MeshInterface):
text {string} -- The text to send
Keyword Arguments:
- destinationId {nodeId} -- where to send this message (default: {BROADCAST_ADDR})
+ destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
"""
self.sendData(text.encode("utf-8"), destinationId,
dataType=mesh_pb2.Data.CLEAR_TEXT)
@@ -397,13 +480,38 @@ class StreamInterface(MeshInterface):
You probably don't want this - use sendData instead."""
toRadio = mesh_pb2.ToRadio()
# FIXME add support for non broadcast addresses
- meshPacket.to = 255
+
+ if isinstance(destinationId, int):
+ nodeNum = destinationId
+ elif nodeNum == BROADCAST_ADDR:
+ nodeNum = 255
+ else:
+ raise Exception(
+ "invalid destination addr, we don't yet support nodeid lookup")
+
+ meshPacket.to = nodeNum
toRadio.packet.CopyFrom(meshPacket)
self._sendToRadio(toRadio)
+ def writeConfig(self):
+ """Write the current (edited) radioConfig to the device"""
+ if self.radioConfig == None:
+ raise Exception("No RadioConfig has been read")
+
+ t = mesh_pb2.ToRadio()
+ t.set_radio.CopyFrom(self.radioConfig)
+ self._sendToRadio(t)
+
def _disconnected(self):
"""Called by subclasses to tell clients this interface has disconnected"""
- pub.sendMessage("meshtastic.connection.lost")
+ self.isConnected = False
+ pub.sendMessage("meshtastic.connection.lost", interface=self)
+
+ def _connected(self):
+ """Called by this class to tell clients we are now fully connected to a node
+ """
+ self.isConnected = True
+ pub.sendMessage("meshtastic.connection.established", interface=self)
def _startConfig(self):
"""Start device packets flowing"""
@@ -439,12 +547,25 @@ class StreamInterface(MeshInterface):
self.nodes[node.user.id] = node
elif fromRadio.config_complete_id == MY_CONFIG_ID:
# we ignore the config_complete_id, it is unneeded for our stream API fromRadio.config_complete_id
- pub.sendMessage("meshtastic.connection.established")
+ self._connected()
elif fromRadio.HasField("packet"):
self._handlePacketFromRadio(fromRadio.packet)
+ elif fromRadio.rebooted:
+ self._disconnected()
+ self._startConfig() # redownload the node db etc...
else:
logging.warn("Unexpected FromRadio payload")
+ def _nodeNumToId(self, num):
+ if num == BROADCAST_NUM:
+ return BROADCAST_ADDR
+
+ try:
+ return self._nodesByNum[num].user.id
+ except:
+ logging.error("Node not found for fromId")
+ return None
+
def _handlePacketFromRadio(self, meshPacket):
"""Handle a MeshPacket that just arrived from the radio
@@ -453,16 +574,29 @@ class StreamInterface(MeshInterface):
- meshtastic.receive.user(packet = MeshPacket dictionary)
- meshtastic.receive.data(packet = MeshPacket dictionary)
"""
- # FIXME, update node DB as needed
- json = google.protobuf.json_format.MessageToDict(meshPacket)
+
+ asDict = google.protobuf.json_format.MessageToDict(meshPacket)
+ # /add fromId and toId fields based on the node ID
+ asDict["fromId"] = self._nodeNumToId(asDict["from"])
+ asDict["toId"] = self._nodeNumToId(asDict["to"])
+
+ # We could provide our objects as DotMaps - which work with . notation or as dictionaries
+ #asObj = DotMap(asDict)
+ topic = None
if meshPacket.payload.HasField("position"):
- pub.sendMessage("meshtastic.receive.position", packet=json)
+ topic = "meshtastic.receive.position"
+ # FIXME, update node DB as needed
if meshPacket.payload.HasField("user"):
- pub.sendMessage("meshtastic.receive.user",
- packet=json)
+ topic = "meshtastic.receive.user"
+ # FIXME, update node DB as needed
if meshPacket.payload.HasField("data"):
- pub.sendMessage("meshtastic.receive.data",
- packet=json)
+ topic = "meshtastic.receive.data"
+ # For text messages, we go ahead and decode the text to ascii for our users
+ # if asObj.payload.data.typ == "CLEAR_TEXT":
+ # asObj.payload.data.text = asObj.payload.data.payload.decode(
+ # "utf-8")
+
+ pub.sendMessage(topic, packet=asDict, interface=self)
Subclasses
@@ -502,7 +636,16 @@ You probably don't want this - use sendData instead.
You probably don't want this - use sendData instead."""
toRadio = mesh_pb2.ToRadio()
# FIXME add support for non broadcast addresses
- meshPacket.to = 255
+
+ if isinstance(destinationId, int):
+ nodeNum = destinationId
+ elif nodeNum == BROADCAST_ADDR:
+ nodeNum = 255
+ else:
+ raise Exception(
+ "invalid destination addr, we don't yet support nodeid lookup")
+
+ meshPacket.to = nodeNum
toRadio.packet.CopyFrom(meshPacket)
self._sendToRadio(toRadio)
@@ -515,7 +658,7 @@ You probably don't want this - use sendData instead.
Arguments
text {string} – The text to send
Keyword Arguments:
-destinationId {nodeId} – where to send this message (default: {BROADCAST_ADDR})
+destinationId {nodeId or nodeNum} – where to send this message (default: {BROADCAST_ADDR})
Expand source code
@@ -527,12 +670,31 @@ destinationId {nodeId} – where to send this message (default: {BROADCAST_A
text {string} -- The text to send
Keyword Arguments:
- destinationId {nodeId} -- where to send this message (default: {BROADCAST_ADDR})
+ destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
"""
self.sendData(text.encode("utf-8"), destinationId,
dataType=mesh_pb2.Data.CLEAR_TEXT)
+
+def writeConfig(self)
+
+
+
Write the current (edited) radioConfig to the device
+
+
+Expand source code
+
+
def writeConfig(self):
+ """Write the current (edited) radioConfig to the device"""
+ if self.radioConfig == None:
+ raise Exception("No RadioConfig has been read")
+
+ t = mesh_pb2.ToRadio()
+ t.set_radio.CopyFrom(self.radioConfig)
+ self._sendToRadio(t)
+
+
@@ -574,17 +736,17 @@ debugOut {stream} – If a stream is provided, any debug serial output from
"""
if devPath is None:
- ports = list(filter(lambda port: port.vid != None,
- serial.tools.list_ports.comports()))
+ ports = util.findPorts()
if len(ports) == 0:
raise Exception("No Meshtastic devices detected")
elif len(ports) > 1:
raise Exception(
f"Multiple ports detected, you must specify a device, such as {ports[0].device}")
else:
- devPath = ports[0].device
+ devPath = ports[0]
logging.debug(f"Connecting to {devPath}")
+ self.devPath = devPath
self._rxBuf = bytes() # empty
self._wantExit = False
self.stream = serial.Serial(
@@ -608,6 +770,8 @@ debugOut {stream} – If a stream is provided, any debug serial output from
logging.debug("Closing serial stream")
# pyserial cancel_read doesn't seem to work, therefore we ask the reader thread to close things for us
self._wantExit = True
+ if self._rxThread != threading.current_thread():
+ self._rxThread.join() # wait for it to exit
def __reader(self):
"""The reader thread that reads bytes from our stream"""
@@ -647,10 +811,13 @@ debugOut {stream} – If a stream is provided, any debug serial output from
try:
self._handleFromRadio(self._rxBuf[HEADER_LEN:])
except Exception as ex:
- logging.warn(
+ logging.error(
f"Error handling FromRadio, possibly corrupted? {ex}")
traceback.print_exc()
self._rxBuf = empty
+ else:
+ # logging.debug(f"timeout on {self.devPath}")
+ pass
logging.debug("reader is exiting")
self.stream.close()
self._disconnected()
@@ -674,7 +841,9 @@ debugOut {stream} – If a stream is provided, any debug serial output from
"""Close a connection to the device"""
logging.debug("Closing serial stream")
# pyserial cancel_read doesn't seem to work, therefore we ask the reader thread to close things for us
- self._wantExit = True
+ self._wantExit = True
+ if self._rxThread != threading.current_thread():
+ self._rxThread.join() # wait for it to exit
@@ -685,6 +854,7 @@ debugOut {stream} – If a stream is provided, any debug serial output from
import logging
+from . import util
+from . import StreamInterface
+from pubsub import pub
+import time
+import sys
+import threading
+from dotmap import DotMap
+
+"""The interfaces we are using for our tests"""
+interfaces = None
+
+"""A list of all packets we received while the current test was running"""
+receivedPackets = None
+
+testsRunning = False
+
+testNumber = 0
+
+
+def onReceive(packet, interface):
+ """Callback invoked when a packet arrives"""
+ print(f"From {interface.devPath}: {packet}")
+ p = DotMap(packet)
+
+ if p.payload.data.typ == "CLEAR_TEXT":
+ # We only care a about clear text packets
+ receivedPackets.append(p)
+
+
+def onNode(node):
+ """Callback invoked when the node DB changes"""
+ print(f"Node changed: {node}")
+
+
+def subscribe():
+ """Subscribe to the topics the user probably wants to see, prints output to stdout"""
+
+ pub.subscribe(onNode, "meshtastic.node")
+
+
+def testSend(fromInterface, toInterface):
+ """
+ Sends one test packet between two nodes and then returns success or failure
+
+ Arguments:
+ fromInterface {[type]} -- [description]
+ toInterface {[type]} -- [description]
+
+ Returns:
+ boolean -- True for success
+ """
+ global receivedPackets
+ receivedPackets = []
+ fromNode = fromInterface.myInfo.my_node_num
+ toNode = toInterface.myInfo.my_node_num
+
+ # FIXME, hack to test broadcast
+ toNode = 255
+
+ logging.info(f"Sending test packet from {fromNode} to {toNode}")
+ fromInterface.sendText(f"Test {testNumber}", toNode)
+ time.sleep(40)
+ return (len(receivedPackets) >= 1)
+
+
+def testThread():
+ logging.info("Found devices, starting tests...")
+ numFail = 0
+ numSuccess = 0
+ while True:
+ global testNumber
+ testNumber = testNumber + 1
+ success = testSend(interfaces[0], interfaces[1])
+ if not success:
+ numFail = numFail + 1
+ logging.error(
+ f"Test failed, expected packet not received ({numFail} failures so far)")
+ else:
+ numSuccess = numSuccess + 1
+ logging.info(f"Test succeeded ({numSuccess} successes so far)")
+
+ if numFail >= 3:
+ for i in interfaces:
+ i.close()
+ return
+
+ time.sleep(1)
+
+
+def onConnection(topic=pub.AUTO_TOPIC):
+ """Callback invoked when we connect/disconnect from a radio"""
+ print(f"Connection changed: {topic.getName()}")
+
+ global testsRunning
+ if (all(iface.isConnected for iface in interfaces) and not testsRunning):
+ testsRunning = True
+ t = threading.Thread(target=testThread, args=())
+ t.start()
+
+
+def openDebugLog(portName):
+ debugname = "log" + portName.replace("/", "_")
+ logging.info(f"Writing serial debugging to {debugname}")
+ return open(debugname, 'w+', buffering=1)
+
+
+def testAll():
+ """
+ Run a series of tests using devices we can find.
+
+ Raises:
+ Exception: If not enough devices are found
+ """
+ ports = util.findPorts()
+ if (len(ports) < 2):
+ raise Exception("Must have at least two devices connected to USB")
+
+ pub.subscribe(onConnection, "meshtastic.connection")
+ pub.subscribe(onReceive, "meshtastic.receive")
+ global interfaces
+ interfaces = list(map(lambda port: StreamInterface(
+ port, debugOut=openDebugLog(port)), ports))
+ logging.info("Ports opened, waiting for device to complete connection")
+
+
+
+
+
+
Global variables
+
+
var interfaces
+
+
A list of all packets we received while the current test was running
Callback invoked when we connect/disconnect from a radio
+
+
+Expand source code
+
+
def onConnection(topic=pub.AUTO_TOPIC):
+ """Callback invoked when we connect/disconnect from a radio"""
+ print(f"Connection changed: {topic.getName()}")
+
+ global testsRunning
+ if (all(iface.isConnected for iface in interfaces) and not testsRunning):
+ testsRunning = True
+ t = threading.Thread(target=testThread, args=())
+ t.start()
+
+
+
+def onNode(node)
+
+
+
Callback invoked when the node DB changes
+
+
+Expand source code
+
+
def onNode(node):
+ """Callback invoked when the node DB changes"""
+ print(f"Node changed: {node}")
+
+
+
+def onReceive(packet, interface)
+
+
+
Callback invoked when a packet arrives
+
+
+Expand source code
+
+
def onReceive(packet, interface):
+ """Callback invoked when a packet arrives"""
+ print(f"From {interface.devPath}: {packet}")
+ p = DotMap(packet)
+
+ if p.payload.data.typ == "CLEAR_TEXT":
+ # We only care a about clear text packets
+ receivedPackets.append(p)
+
+
+
+def openDebugLog(portName)
+
+
+
+
+
+Expand source code
+
+
def openDebugLog(portName):
+ debugname = "log" + portName.replace("/", "_")
+ logging.info(f"Writing serial debugging to {debugname}")
+ return open(debugname, 'w+', buffering=1)
+
+
+
+def subscribe()
+
+
+
Subscribe to the topics the user probably wants to see, prints output to stdout
+
+
+Expand source code
+
+
def subscribe():
+ """Subscribe to the topics the user probably wants to see, prints output to stdout"""
+
+ pub.subscribe(onNode, "meshtastic.node")
+
+
+
+def testAll()
+
+
+
Run a series of tests using devices we can find.
+
Raises
+
+
Exception
+
If not enough devices are found
+
+
+
+Expand source code
+
+
def testAll():
+ """
+ Run a series of tests using devices we can find.
+
+ Raises:
+ Exception: If not enough devices are found
+ """
+ ports = util.findPorts()
+ if (len(ports) < 2):
+ raise Exception("Must have at least two devices connected to USB")
+
+ pub.subscribe(onConnection, "meshtastic.connection")
+ pub.subscribe(onReceive, "meshtastic.receive")
+ global interfaces
+ interfaces = list(map(lambda port: StreamInterface(
+ port, debugOut=openDebugLog(port)), ports))
+ logging.info("Ports opened, waiting for device to complete connection")
+
+
+
+def testSend(fromInterface, toInterface)
+
+
+
Sends one test packet between two nodes and then returns success or failure
def testSend(fromInterface, toInterface):
+ """
+ Sends one test packet between two nodes and then returns success or failure
+
+ Arguments:
+ fromInterface {[type]} -- [description]
+ toInterface {[type]} -- [description]
+
+ Returns:
+ boolean -- True for success
+ """
+ global receivedPackets
+ receivedPackets = []
+ fromNode = fromInterface.myInfo.my_node_num
+ toNode = toInterface.myInfo.my_node_num
+
+ # FIXME, hack to test broadcast
+ toNode = 255
+
+ logging.info(f"Sending test packet from {fromNode} to {toNode}")
+ fromInterface.sendText(f"Test {testNumber}", toNode)
+ time.sleep(40)
+ return (len(receivedPackets) >= 1)
+
+
+
+def testThread()
+
+
+
+
+
+Expand source code
+
+
def testThread():
+ logging.info("Found devices, starting tests...")
+ numFail = 0
+ numSuccess = 0
+ while True:
+ global testNumber
+ testNumber = testNumber + 1
+ success = testSend(interfaces[0], interfaces[1])
+ if not success:
+ numFail = numFail + 1
+ logging.error(
+ f"Test failed, expected packet not received ({numFail} failures so far)")
+ else:
+ numSuccess = numSuccess + 1
+ logging.info(f"Test succeeded ({numSuccess} successes so far)")
+
+ if numFail >= 3:
+ for i in interfaces:
+ i.close()
+ return
+
+ time.sleep(1)
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/docs/meshtastic/util.html b/docs/meshtastic/util.html
new file mode 100644
index 0000000..78fcc66
--- /dev/null
+++ b/docs/meshtastic/util.html
@@ -0,0 +1,147 @@
+
+
+
+
+
+
+meshtastic.util API documentation
+
+
+
+
+
+
+
+
+
+
+
+
+
Module meshtastic.util
+
+
+
+
+Expand source code
+
+
from collections import defaultdict
+import serial
+import serial.tools.list_ports
+
+
+def findPorts():
+ """Find all ports that might have meshtastic devices
+
+ Returns:
+ list -- a list of device paths
+ """
+ l = list(map(lambda port: port.device,
+ filter(lambda port: port.vid != None,
+ serial.tools.list_ports.comports())))
+ l.sort()
+ return l
+
+
+class dotdict(dict):
+ """dot.notation access to dictionary attributes"""
+ __getattr__ = dict.get
+ __setattr__ = dict.__setitem__
+ __delattr__ = dict.__delitem__
+
+
+
+
+
+
+
+
Functions
+
+
+def findPorts()
+
+
+
Find all ports that might have meshtastic devices
+
Returns
+
+
list -- a list of device paths
+
+
+
+
+Expand source code
+
+
def findPorts():
+ """Find all ports that might have meshtastic devices
+
+ Returns:
+ list -- a list of device paths
+ """
+ l = list(map(lambda port: port.device,
+ filter(lambda port: port.vid != None,
+ serial.tools.list_ports.comports())))
+ l.sort()
+ return l
+
+
+
+
+
+
Classes
+
+
+class dotdict
+(...)
+
+
+
dot.notation access to dictionary attributes
+
+
+Expand source code
+
+
class dotdict(dict):
+ """dot.notation access to dictionary attributes"""
+ __getattr__ = dict.get
+ __setattr__ = dict.__setitem__
+ __delattr__ = dict.__delitem__