This commit is contained in:
Jm Casler
2021-12-07 13:06:51 -08:00
21 changed files with 1162 additions and 1008 deletions

View File

@@ -16,11 +16,18 @@ jobs:
uses: actions/setup-python@v1
with:
python-version: 3.9
- name: Uninstall meshtastic
run: |
pip3 uninstall meshtastic
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install .
pip3 install -r requirements.txt
- name: Install meshtastic from local
run: |
pip3 install .
which meshtastic
meshtastic --version
- name: Run pylint
run: pylint --exit-zero meshtastic
- name: Run tests with pytest

1
.gitignore vendored
View File

@@ -8,3 +8,4 @@ nanopb-0.4.4
.*swp
.coverage
*.py-E
venv/

View File

@@ -7,7 +7,7 @@
# Add files or directories matching the regex patterns to the blacklist. The
# regex matches against base names, not paths.
ignore-patterns=mqtt_pb2.py,channel_pb2.py,environmental_measurement_pb2.py,admin_pb2.py,radioconfig_pb2.py,deviceonly_pb2.py,apponly_pb2.py,remote_hardware_pb2.py,portnums_pb2.py,mesh_pb2.py
ignore-patterns=mqtt_pb2.py,channel_pb2.py,environmental_measurement_pb2.py,admin_pb2.py,radioconfig_pb2.py,deviceonly_pb2.py,apponly_pb2.py,remote_hardware_pb2.py,portnums_pb2.py,mesh_pb2.py,storeforward_pb2.py

View File

@@ -187,12 +187,45 @@ pip3 install .
pytest
```
Possible options for testing:
* For more verbosity, add "-v" or even "-vv" like this: pytest -vv
* To run just unit tests: pytest -munit
* To run just integration tests: pytest -mint
* To run a smoke test with only one device connected serially: pytest -msmoke1
CAUTION: Running smoke1 will reset values on the device.
Be sure to hit the button on the device to reset after the test is run.
* To run a specific test: pytest -msmoke1 meshtastic/test/test_smoke1.py::test_smoke1_info
* To add another classification of tests, then look in pytest.ini
* To see the unit test code coverage: pytest --cov=meshtastic
* For more verbosity, add "-v" or even "-vv" like this:
```
pytest -vv
```
* To run just unit tests:
```
pytest
# or (more verbosely)
pytest -m unit
```
* To run just integration tests:
```
pytest -m int
```
* To run the smoke test with only one device connected serially:
```
pytest -m smoke1
```
CAUTION: Running smoke1 will reset values on the device, including the region to 1 (US).
Be sure to hit the reset button on the device after the test is completed.
* To run a specific test:
```
pytest -msmoke1 meshtastic/test/test_smoke1.py::test_smoke1_info
```
* To add another classification of tests such as "unit" or "smoke1", see [pytest.ini](pytest.ini).
* To see the unit test code coverage:
```
pytest --cov=meshtastic
```

View File

@@ -80,14 +80,6 @@ from .util import fixme, catchAndIgnore, stripnl, DeferredExecution, Timeout
from .node import Node
from . import mesh_pb2, portnums_pb2, apponly_pb2, admin_pb2, environmental_measurement_pb2, remote_hardware_pb2, channel_pb2, radioconfig_pb2, util
START1 = 0x94
START2 = 0xc3
HEADER_LEN = 4
MAX_TO_FROM_RADIO_SIZE = 512
defaultHopLimit = 3
"""A special ID that means broadcast"""
BROADCAST_ADDR = "^all"
"""A special ID that means the local node"""
LOCAL_ADDR = "^local"
@@ -95,6 +87,10 @@ LOCAL_ADDR = "^local"
# if using 8 bit nodenums this will be shortend on the target
BROADCAST_NUM = 0xffffffff
"""A special ID that means broadcast"""
BROADCAST_ADDR = "^all"
"""The numeric buildnumber (shared with android apps) specifying the level of device code we are guaranteed to understand
format is Mmmss (where M is 1+the numeric major number. i.e. 20120 means 1.1.20
@@ -121,903 +117,6 @@ class KnownProtocol(NamedTuple):
onReceive: Callable = None
class MeshInterface:
"""Interface class for meshtastic devices
Properties:
isConnected
nodes
debugOut
"""
def __init__(self, debugOut=None, noProto=False):
"""Constructor
Keyword Arguments:
noProto -- If True, don't try to run our protocol on the link - just be a dumb serial client.
"""
self.debugOut = debugOut
self.nodes = None # FIXME
self.isConnected = threading.Event()
self.noProto = noProto
self.localNode = Node(self, -1) # We fixup nodenum later
self.myInfo = None # We don't have device info yet
self.responseHandlers = {} # A map from request ID to the handler
self.failure = None # If we've encountered a fatal exception it will be kept here
self._timeout = Timeout()
self.heartbeatTimer = None
random.seed() # FIXME, we should not clobber the random seedval here, instead tell user they must call it
self.currentPacketId = random.randint(0, 0xffffffff)
def close(self):
"""Shutdown this interface"""
if self.heartbeatTimer:
self.heartbeatTimer.cancel()
self._sendDisconnect()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None and exc_value is not None:
logging.error(
f'An exception of type {exc_type} with value {exc_value} has occurred')
if traceback is not None:
logging.error(f'Traceback: {traceback}')
self.close()
def showInfo(self, file=sys.stdout):
"""Show human readable summary about this object"""
owner = f"Owner: {self.getLongName()} ({self.getShortName()})"
myinfo = f"\nMy info: {stripnl(MessageToJson(self.myInfo))}"
mesh = "\nNodes in mesh:"
nodes = ""
for n in self.nodes.values():
nodes = nodes + f" {stripnl(n)}"
infos = owner + myinfo + mesh + nodes
print(infos)
return infos
def showNodes(self, includeSelf=True, file=sys.stdout):
"""Show table summary of nodes in mesh"""
def formatFloat(value, precision=2, unit=''):
return f'{value:.{precision}f}{unit}' if value else None
def getLH(ts):
return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') if ts else None
def getTimeAgo(ts):
return timeago.format(datetime.fromtimestamp(ts), datetime.now()) if ts else None
rows = []
for node in self.nodes.values():
if not includeSelf and node['num'] == self.localNode.nodeNum:
continue
row = {"N": 0}
user = node.get('user')
if user:
row.update({
"User": user['longName'],
"AKA": user['shortName'],
"ID": user['id'],
})
pos = node.get('position')
if pos:
row.update({
"Latitude": formatFloat(pos.get("latitude"), 4, "°"),
"Longitude": formatFloat(pos.get("longitude"), 4, "°"),
"Altitude": formatFloat(pos.get("altitude"), 0, " m"),
"Battery": formatFloat(pos.get("batteryLevel"), 2, "%"),
})
row.update({
"SNR": formatFloat(node.get("snr"), 2, " dB"),
"LastHeard": getLH(node.get("lastHeard")),
"Since": getTimeAgo(node.get("lastHeard")),
})
rows.append(row)
# Why doesn't this way work?
#rows.sort(key=lambda r: r.get('LastHeard', '0000'), reverse=True)
rows.sort(key=lambda r: r.get('LastHeard') or '0000', reverse=True)
for i, row in enumerate(rows):
row['N'] = i+1
table = tabulate(rows, headers='keys', missingval='N/A',
tablefmt='fancy_grid')
print(table)
return table
def getNode(self, nodeId):
"""Return a node object which contains device settings and channel info"""
if nodeId == LOCAL_ADDR:
return self.localNode
else:
n = Node(self, nodeId)
n.requestConfig()
if not n.waitForConfig():
raise Exception("Timed out waiting for node config")
return n
def sendText(self, text: AnyStr,
destinationId=BROADCAST_ADDR,
wantAck=False,
wantResponse=False,
hopLimit=defaultHopLimit,
onResponse=None,
channelIndex=0):
"""Send a utf8 string to some other node, if the node has a display it will also be shown on the device.
Arguments:
text {string} -- The text to send
Keyword Arguments:
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
wantResponse -- True if you want the service on the other side to send an application layer response
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
"""
return self.sendData(text.encode("utf-8"), destinationId,
portNum=portnums_pb2.PortNum.TEXT_MESSAGE_APP,
wantAck=wantAck,
wantResponse=wantResponse,
hopLimit=hopLimit,
onResponse=onResponse,
channelIndex=channelIndex)
def sendData(self, data, destinationId=BROADCAST_ADDR,
portNum=portnums_pb2.PortNum.PRIVATE_APP, wantAck=False,
wantResponse=False,
hopLimit=defaultHopLimit,
onResponse=None,
channelIndex=0):
"""Send a data packet to some other node
Keyword Arguments:
data -- the data to send, either as an array of bytes or as a protobuf (which will be automatically serialized to bytes)
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
wantResponse -- True if you want the service on the other side to send an application layer response
onResponse -- A closure of the form funct(packet), that will be called when a response packet arrives
(or the transaction is NAKed due to non receipt)
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
"""
if getattr(data, "SerializeToString", None):
logging.debug(f"Serializing protobuf as data: {stripnl(data)}")
data = data.SerializeToString()
if len(data) > mesh_pb2.Constants.DATA_PAYLOAD_LEN:
raise Exception("Data payload too big")
if portNum == portnums_pb2.PortNum.UNKNOWN_APP: # we are now more strict wrt port numbers
raise Exception("A non-zero port number must be specified")
meshPacket = mesh_pb2.MeshPacket()
meshPacket.channel = channelIndex
meshPacket.decoded.payload = data
meshPacket.decoded.portnum = portNum
meshPacket.decoded.want_response = wantResponse
p = self._sendPacket(meshPacket, destinationId,
wantAck=wantAck, hopLimit=hopLimit)
if onResponse is not None:
self._addResponseHandler(p.id, onResponse)
return p
def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False):
"""
Send a position packet to some other node (normally a broadcast)
Also, the device software will notice this packet and use it to automatically set its notion of
the local position.
If timeSec is not specified (recommended), we will use the local machine time.
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
"""
p = mesh_pb2.Position()
if latitude != 0.0:
p.latitude_i = int(latitude / 1e-7)
if longitude != 0.0:
p.longitude_i = int(longitude / 1e-7)
if altitude != 0:
p.altitude = int(altitude)
if timeSec == 0:
timeSec = time.time() # returns unix timestamp in seconds
p.time = int(timeSec)
return self.sendData(p, destinationId,
portNum=portnums_pb2.PortNum.POSITION_APP,
wantAck=wantAck,
wantResponse=wantResponse)
def _addResponseHandler(self, requestId, callback):
self.responseHandlers[requestId] = ResponseHandler(callback)
def _sendPacket(self, meshPacket,
destinationId=BROADCAST_ADDR,
wantAck=False, hopLimit=defaultHopLimit):
"""Send a MeshPacket to the specified node (or if unspecified, broadcast).
You probably don't want this - use sendData instead.
Returns the sent packet. The id field will be populated in this packet and
can be used to track future message acks/naks.
"""
# We allow users to talk to the local node before we've completed the full connection flow...
if(self.myInfo is not None and destinationId != self.myInfo.my_node_num):
self._waitConnected()
toRadio = mesh_pb2.ToRadio()
if destinationId is None:
raise Exception("destinationId must not be None")
elif isinstance(destinationId, int):
nodeNum = destinationId
elif destinationId == BROADCAST_ADDR:
nodeNum = BROADCAST_NUM
elif destinationId == LOCAL_ADDR:
nodeNum = self.myInfo.my_node_num
# A simple hex style nodeid - we can parse this without needing the DB
elif destinationId.startswith("!"):
nodeNum = int(destinationId[1:], 16)
else:
node = self.nodes.get(destinationId)
if not node:
raise Exception(f"NodeId {destinationId} not found in DB")
nodeNum = node['num']
meshPacket.to = nodeNum
meshPacket.want_ack = wantAck
meshPacket.hop_limit = hopLimit
# if the user hasn't set an ID for this packet (likely and recommended), we should pick a new unique ID
# so the message can be tracked.
if meshPacket.id == 0:
meshPacket.id = self._generatePacketId()
toRadio.packet.CopyFrom(meshPacket)
#logging.debug(f"Sending packet: {stripnl(meshPacket)}")
self._sendToRadio(toRadio)
return meshPacket
def waitForConfig(self):
"""Block until radio config is received. Returns True if config has been received."""
success = self._timeout.waitForSet(self, attrs=('myInfo', 'nodes')
) and self.localNode.waitForConfig()
if not success:
raise Exception("Timed out waiting for interface config")
def getMyNodeInfo(self):
"""Get info about my node."""
if self.myInfo is None:
return None
return self.nodesByNum.get(self.myInfo.my_node_num)
def getMyUser(self):
"""Get user"""
nodeInfo = self.getMyNodeInfo()
if nodeInfo is not None:
return nodeInfo.get('user')
return None
def getLongName(self):
"""Get long name"""
user = self.getMyUser()
if user is not None:
return user.get('longName', None)
return None
def getShortName(self):
"""Get short name"""
user = self.getMyUser()
if user is not None:
return user.get('shortName', None)
return None
def _waitConnected(self):
"""Block until the initial node db download is complete, or timeout
and raise an exception"""
if not self.isConnected.wait(10.0): # timeout after 10 seconds
raise Exception("Timed out waiting for connection completion")
# If we failed while connecting, raise the connection to the client
if self.failure:
raise self.failure
def _generatePacketId(self):
"""Get a new unique packet ID"""
if self.currentPacketId is None:
raise Exception("Not connected yet, can not generate packet")
else:
self.currentPacketId = (self.currentPacketId + 1) & 0xffffffff
return self.currentPacketId
def _disconnected(self):
"""Called by subclasses to tell clients this interface has disconnected"""
self.isConnected.clear()
publishingThread.queueWork(lambda: pub.sendMessage(
"meshtastic.connection.lost", interface=self))
def _startHeartbeat(self):
"""We need to send a heartbeat message to the device every X seconds"""
def callback():
self.heartbeatTimer = None
prefs = self.localNode.radioConfig.preferences
i = prefs.phone_timeout_secs / 2
logging.debug(f"Sending heartbeat, interval {i}")
if i != 0:
self.heartbeatTimer = threading.Timer(i, callback)
self.heartbeatTimer.start()
p = mesh_pb2.ToRadio()
self._sendToRadio(p)
callback() # run our periodic callback now, it will make another timer if necessary
def _connected(self):
"""Called by this class to tell clients we are now fully connected to a node
"""
# (because I'm lazy) _connected might be called when remote Node
# objects complete their config reads, don't generate redundant isConnected
# for the local interface
if not self.isConnected.is_set():
self.isConnected.set()
self._startHeartbeat()
publishingThread.queueWork(lambda: pub.sendMessage(
"meshtastic.connection.established", interface=self))
def _startConfig(self):
"""Start device packets flowing"""
self.myInfo = None
self.nodes = {} # nodes keyed by ID
self.nodesByNum = {} # nodes keyed by nodenum
startConfig = mesh_pb2.ToRadio()
self.configId = random.randint(0, 0xffffffff)
startConfig.want_config_id = self.configId
self._sendToRadio(startConfig)
def _sendDisconnect(self):
"""Tell device we are done using it"""
m = mesh_pb2.ToRadio()
m.disconnect = True
self._sendToRadio(m)
def _sendToRadio(self, toRadio):
"""Send a ToRadio protobuf to the device"""
if self.noProto:
logging.warn(
f"Not sending packet because protocol use is disabled by noProto")
else:
#logging.debug(f"Sending toRadio: {stripnl(toRadio)}")
self._sendToRadioImpl(toRadio)
def _sendToRadioImpl(self, toRadio):
"""Send a ToRadio protobuf to the device"""
logging.error(f"Subclass must provide toradio: {toRadio}")
def _handleConfigComplete(self):
"""
Done with initial config messages, now send regular MeshPackets to ask for settings and channels
"""
self.localNode.requestConfig()
def _handleFromRadio(self, fromRadioBytes):
"""
Handle a packet that arrived from the radio(update model and publish events)
Called by subclasses."""
fromRadio = mesh_pb2.FromRadio()
fromRadio.ParseFromString(fromRadioBytes)
asDict = google.protobuf.json_format.MessageToDict(fromRadio)
#logging.debug(f"Received from radio: {fromRadio}")
if fromRadio.HasField("my_info"):
self.myInfo = fromRadio.my_info
self.localNode.nodeNum = self.myInfo.my_node_num
logging.debug(f"Received myinfo: {stripnl(fromRadio.my_info)}")
failmsg = None
# Check for app too old
if self.myInfo.min_app_version > OUR_APP_VERSION:
failmsg = "This device needs a newer python client, please \"pip install --upgrade meshtastic\". For more information see https://tinyurl.com/5bjsxu32"
# check for firmware too old
if self.myInfo.max_channels == 0:
failmsg = "This version of meshtastic-python requires device firmware version 1.2 or later. For more information see https://tinyurl.com/5bjsxu32"
if failmsg:
self.failure = Exception(failmsg)
self.isConnected.set() # let waitConnected return this exception
self.close()
elif fromRadio.HasField("node_info"):
node = asDict["nodeInfo"]
try:
self._fixupPosition(node["position"])
except:
logging.debug("Node without position")
logging.debug(f"Received nodeinfo: {node}")
self.nodesByNum[node["num"]] = node
if "user" in node: # Some nodes might not have user/ids assigned yet
self.nodes[node["user"]["id"]] = node
publishingThread.queueWork(lambda: pub.sendMessage("meshtastic.node.updated",
node=node, interface=self))
elif fromRadio.config_complete_id == self.configId:
# we ignore the config_complete_id, it is unneeded for our stream API fromRadio.config_complete_id
logging.debug(f"Config complete ID {self.configId}")
self._handleConfigComplete()
elif fromRadio.HasField("packet"):
self._handlePacketFromRadio(fromRadio.packet)
elif fromRadio.rebooted:
# Tell clients the device went away. Careful not to call the overridden subclass version that closes the serial port
MeshInterface._disconnected(self)
self._startConfig() # redownload the node db etc...
else:
logging.debug("Unexpected FromRadio payload")
def _fixupPosition(self, position):
"""Convert integer lat/lon into floats
Arguments:
position {Position dictionary} -- object ot fix up
"""
if "latitudeI" in position:
position["latitude"] = position["latitudeI"] * 1e-7
if "longitudeI" in position:
position["longitude"] = position["longitudeI"] * 1e-7
def _nodeNumToId(self, num):
"""Map a node node number to a node ID
Arguments:
num {int} -- Node number
Returns:
string -- Node ID
"""
if num == BROADCAST_NUM:
return BROADCAST_ADDR
try:
return self.nodesByNum[num]["user"]["id"]
except:
logging.debug(f"Node {num} not found for fromId")
return None
def _getOrCreateByNum(self, nodeNum):
"""Given a nodenum find the NodeInfo in the DB (or create if necessary)"""
if nodeNum == BROADCAST_NUM:
raise Exception("Can not create/find nodenum by the broadcast num")
if nodeNum in self.nodesByNum:
return self.nodesByNum[nodeNum]
else:
n = {"num": nodeNum} # Create a minimial node db entry
self.nodesByNum[nodeNum] = n
return n
def _handlePacketFromRadio(self, meshPacket):
"""Handle a MeshPacket that just arrived from the radio
Will publish one of the following events:
- meshtastic.receive.text(packet = MeshPacket dictionary)
- meshtastic.receive.position(packet = MeshPacket dictionary)
- meshtastic.receive.user(packet = MeshPacket dictionary)
- meshtastic.receive.data(packet = MeshPacket dictionary)
"""
asDict = google.protobuf.json_format.MessageToDict(meshPacket)
# We normally decompose the payload into a dictionary so that the client
# doesn't need to understand protobufs. But advanced clients might
# want the raw protobuf, so we provide it in "raw"
asDict["raw"] = meshPacket
# from might be missing if the nodenum was zero.
if not "from" in asDict:
asDict["from"] = 0
logging.error(
f"Device returned a packet we sent, ignoring: {stripnl(asDict)}")
return
if not "to" in asDict:
asDict["to"] = 0
# /add fromId and toId fields based on the node ID
try:
asDict["fromId"] = self._nodeNumToId(asDict["from"])
except Exception as ex:
logging.warn(f"Not populating fromId {ex}")
try:
asDict["toId"] = self._nodeNumToId(asDict["to"])
except Exception as ex:
logging.warn(f"Not populating toId {ex}")
# We could provide our objects as DotMaps - which work with . notation or as dictionaries
# asObj = DotMap(asDict)
topic = "meshtastic.receive" # Generic unknown packet type
decoded = asDict["decoded"]
# The default MessageToDict converts byte arrays into base64 strings.
# We don't want that - it messes up data payload. So slam in the correct
# byte array.
decoded["payload"] = meshPacket.decoded.payload
# UNKNOWN_APP is the default protobuf portnum value, and therefore if not set it will not be populated at all
# to make API usage easier, set it to prevent confusion
if not "portnum" in decoded:
decoded["portnum"] = portnums_pb2.PortNum.Name(
portnums_pb2.PortNum.UNKNOWN_APP)
portnum = decoded["portnum"]
topic = f"meshtastic.receive.data.{portnum}"
# decode position protobufs and update nodedb, provide decoded version as "position" in the published msg
# move the following into a 'decoders' API that clients could register?
portNumInt = meshPacket.decoded.portnum # we want portnum as an int
handler = protocols.get(portNumInt)
# The decoded protobuf as a dictionary (if we understand this message)
p = None
if handler is not None:
topic = f"meshtastic.receive.{handler.name}"
# Convert to protobuf if possible
if handler.protobufFactory is not None:
pb = handler.protobufFactory()
pb.ParseFromString(meshPacket.decoded.payload)
p = google.protobuf.json_format.MessageToDict(pb)
asDict["decoded"][handler.name] = p
# Also provide the protobuf raw
asDict["decoded"][handler.name]["raw"] = pb
# Call specialized onReceive if necessary
if handler.onReceive is not None:
handler.onReceive(self, asDict)
# Is this message in response to a request, if so, look for a handler
requestId = decoded.get("requestId")
if requestId is not None:
# We ignore ACK packets, but send NAKs and data responses to the handlers
routing = decoded.get("routing")
isAck = routing is not None and ("errorReason" not in routing)
if not isAck:
# we keep the responseHandler in dict until we get a non ack
handler = self.responseHandlers.pop(requestId, None)
if handler is not None:
handler.callback(asDict)
logging.debug(f"Publishing {topic}: packet={stripnl(asDict)} ")
publishingThread.queueWork(lambda: pub.sendMessage(
topic, packet=asDict, interface=self))
# Our standard BLE characteristics
TORADIO_UUID = "f75c76d2-129e-4dad-a1dd-7866124401e7"
FROMRADIO_UUID = "8ba2bcc2-ee02-4a55-a531-c525c5e454d5"
FROMNUM_UUID = "ed9da18c-a800-4f66-a670-aa7547e34453"
class BLEInterface(MeshInterface):
"""A not quite ready - FIXME - BLE interface to devices"""
def __init__(self, address, debugOut=None):
self.address = address
self.adapter = pygatt.GATTToolBackend() # BGAPIBackend()
self.adapter.start()
logging.debug(f"Connecting to {self.address}")
self.device = self.adapter.connect(address)
logging.debug("Connected to device")
# fromradio = self.device.char_read(FROMRADIO_UUID)
MeshInterface.__init__(self, debugOut=debugOut)
self._readFromRadio() # read the initial responses
def handle_data(handle, data):
self._handleFromRadio(data)
self.device.subscribe(FROMNUM_UUID, callback=handle_data)
def _sendToRadioImpl(self, toRadio):
"""Send a ToRadio protobuf to the device"""
#logging.debug(f"Sending: {stripnl(toRadio)}")
b = toRadio.SerializeToString()
self.device.char_write(TORADIO_UUID, b)
def close(self):
MeshInterface.close(self)
self.adapter.stop()
def _readFromRadio(self):
wasEmpty = False
while not wasEmpty:
b = self.device.char_read(FROMRADIO_UUID)
wasEmpty = len(b) == 0
if not wasEmpty:
self._handleFromRadio(b)
class StreamInterface(MeshInterface):
"""Interface class for meshtastic devices over a stream link (serial, TCP, etc)"""
def __init__(self, debugOut=None, noProto=False, connectNow=True):
"""Constructor, opens a connection to self.stream
Keyword Arguments:
devPath {string} -- A filepath to a device, i.e. /dev/ttyUSB0 (default: {None})
debugOut {stream} -- If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None})
Raises:
Exception: [description]
Exception: [description]
"""
if not hasattr(self, 'stream'):
raise Exception(
"StreamInterface is now abstract (to update existing code create SerialInterface instead)")
self._rxBuf = bytes() # empty
self._wantExit = False
# FIXME, figure out why daemon=True causes reader thread to exit too early
self._rxThread = threading.Thread(
target=self.__reader, args=(), daemon=True)
MeshInterface.__init__(self, debugOut=debugOut, noProto=noProto)
# Start the reader thread after superclass constructor completes init
if connectNow:
self.connect()
if not noProto:
self.waitForConfig()
def connect(self):
"""Connect to our radio
Normally this is called automatically by the constructor, but if you passed in connectNow=False you can manually
start the reading thread later.
"""
# Send some bogus UART characters to force a sleeping device to wake, and if the reading statemachine was parsing a bad packet make sure
# we write enought start bytes to force it to resync (we don't use START1 because we want to ensure it is looking for START1)
p = bytearray([START2] * 32)
self._writeBytes(p)
time.sleep(0.1) # wait 100ms to give device time to start running
self._rxThread.start()
self._startConfig()
if not self.noProto: # Wait for the db download if using the protocol
self._waitConnected()
def _disconnected(self):
"""We override the superclass implementation to close our port"""
MeshInterface._disconnected(self)
logging.debug("Closing our port")
if not self.stream is None:
self.stream.close()
self.stream = None
def _writeBytes(self, b):
"""Write an array of bytes to our stream and flush"""
if self.stream: # ignore writes when stream is closed
self.stream.write(b)
self.stream.flush()
def _readBytes(self, len):
"""Read an array of bytes from our stream"""
return self.stream.read(len)
def _sendToRadioImpl(self, toRadio):
"""Send a ToRadio protobuf to the device"""
logging.debug(f"Sending: {stripnl(toRadio)}")
b = toRadio.SerializeToString()
bufLen = len(b)
# We convert into a string, because the TCP code doesn't work with byte arrays
header = bytes([START1, START2, (bufLen >> 8) & 0xff, bufLen & 0xff])
self._writeBytes(header + b)
def close(self):
"""Close a connection to the device"""
logging.debug("Closing stream")
MeshInterface.close(self)
# pyserial cancel_read doesn't seem to work, therefore we ask the reader thread to close things for us
self._wantExit = True
if self._rxThread != threading.current_thread():
self._rxThread.join() # wait for it to exit
def __reader(self):
"""The reader thread that reads bytes from our stream"""
empty = bytes()
try:
while not self._wantExit:
# logging.debug("reading character")
b = self._readBytes(1)
# logging.debug("In reader loop")
# logging.debug(f"read returned {b}")
if len(b) > 0:
c = b[0]
ptr = len(self._rxBuf)
# Assume we want to append this byte, fixme use bytearray instead
self._rxBuf = self._rxBuf + b
if ptr == 0: # looking for START1
if c != START1:
self._rxBuf = empty # failed to find start
if self.debugOut != None:
try:
self.debugOut.write(b.decode("utf-8"))
except:
self.debugOut.write('?')
elif ptr == 1: # looking for START2
if c != START2:
self._rxBuf = empty # failed to find start2
elif ptr >= HEADER_LEN - 1: # we've at least got a header
# big endian length follos header
packetlen = (self._rxBuf[2] << 8) + self._rxBuf[3]
if ptr == HEADER_LEN - 1: # we _just_ finished reading the header, validate length
if packetlen > MAX_TO_FROM_RADIO_SIZE:
self._rxBuf = empty # length ws out out bounds, restart
if len(self._rxBuf) != 0 and ptr + 1 >= packetlen + HEADER_LEN:
try:
self._handleFromRadio(self._rxBuf[HEADER_LEN:])
except Exception as ex:
logging.error(
f"Error while handling message from radio {ex}")
traceback.print_exc()
self._rxBuf = empty
else:
# logging.debug(f"timeout")
pass
except serial.SerialException as ex:
if not self._wantExit: # We might intentionally get an exception during shutdown
logging.warn(
f"Meshtastic serial port disconnected, disconnecting... {ex}")
except OSError as ex:
if not self._wantExit: # We might intentionally get an exception during shutdown
logging.error(
f"Unexpected OSError, terminating meshtastic reader... {ex}")
except Exception as ex:
logging.error(
f"Unexpected exception, terminating meshtastic reader... {ex}")
finally:
logging.debug("reader is exiting")
self._disconnected()
class SerialInterface(StreamInterface):
"""Interface class for meshtastic devices over a serial link"""
def __init__(self, devPath=None, debugOut=None, noProto=False, connectNow=True):
"""Constructor, opens a connection to a specified serial port, or if unspecified try to
find one Meshtastic device by probing
Keyword Arguments:
devPath {string} -- A filepath to a device, i.e. /dev/ttyUSB0 (default: {None})
debugOut {stream} -- If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None})
"""
if devPath is None:
ports = util.findPorts()
if len(ports) == 0:
raise Exception("No Meshtastic devices detected")
elif len(ports) > 1:
raise Exception(
f"Multiple ports detected, you must specify a device, such as {ports[0]}")
else:
devPath = ports[0]
logging.debug(f"Connecting to {devPath}")
# Note: we provide None for port here, because we will be opening it later
self.stream = serial.Serial(
None, 921600, exclusive=True, timeout=0.5, write_timeout=0)
# rts=False Needed to prevent TBEAMs resetting on OSX, because rts is connected to reset
self.stream.port = devPath
# HACK: If the platform driving the serial port is unable to leave the RTS pin in high-impedance
# mode, set RTS to false so that the device platform won't be reset spuriously.
# Linux does this properly, so don't apply this hack on Linux (because it makes the reset button not work).
if self._hostPlatformAlwaysDrivesUartRts():
self.stream.rts = False
self.stream.open()
StreamInterface.__init__(
self, debugOut=debugOut, noProto=noProto, connectNow=connectNow)
"""true if platform driving the serial port is Windows Subsystem for Linux 1."""
def _isWsl1(self):
# WSL1 identifies itself as Linux, but has a special char device at /dev/lxss for use with session control,
# e.g. /init. We should treat WSL1 as Windows for the RTS-driving hack because the underlying platfrom
# serial driver for the CP21xx still exhibits the buggy behavior.
# WSL2 is not covered here, as it does not (as of 2021-May-25) support the appropriate functionality to
# share or pass-through serial ports.
try:
# Claims to be Linux, but has /dev/lxss; must be WSL 1
return platform.system() == 'Linux' and stat.S_ISCHR(os.stat('/dev/lxss').st_mode)
except:
# Couldn't stat /dev/lxss special device; not WSL1
return False
def _hostPlatformAlwaysDrivesUartRts(self):
# OS-X/Windows seems to have a bug in its CP21xx serial drivers. It ignores that we asked for no RTSCTS
# control and will always drive RTS either high or low (rather than letting the CP102 leave
# it as an open-collector floating pin).
# TODO: When WSL2 supports USB passthrough, this will get messier. If/when WSL2 gets virtual serial
# ports that "share" the Windows serial port (and thus the Windows drivers), this code will need to be
# updated to reflect that as well -- or if T-Beams get made with an alternate USB to UART bridge that has
# a less buggy driver.
return platform.system() != 'Linux' or self._isWsl1()
class TCPInterface(StreamInterface):
"""Interface class for meshtastic devices over a TCP link"""
def __init__(self, hostname: AnyStr, debugOut=None, noProto=False, connectNow=True, portNumber=4403):
"""Constructor, opens a connection to a specified IP address/hostname
Keyword Arguments:
hostname {string} -- Hostname/IP address of the device to connect to
"""
logging.debug(f"Connecting to {hostname}")
server_address = (hostname, portNumber)
sock = socket.create_connection(server_address)
# Instead of wrapping as a stream, we use the native socket API
# self.stream = sock.makefile('rw')
self.stream = None
self.socket = sock
StreamInterface.__init__(
self, debugOut=debugOut, noProto=noProto, connectNow=connectNow)
def close(self):
"""Close a connection to the device"""
logging.debug("Closing TCP stream")
StreamInterface.close(self)
# Sometimes the socket read might be blocked in the reader thread. Therefore we force the shutdown by closing
# the socket here
self._wantExit = True
if not self.socket is None:
try:
self.socket.shutdown(socket.SHUT_RDWR)
except:
pass # Ignore errors in shutdown, because we might have a race with the server
self.socket.close()
def _writeBytes(self, b):
"""Write an array of bytes to our stream and flush"""
self.socket.send(b)
def _readBytes(self, len):
"""Read an array of bytes from our stream"""
return self.socket.recv(len)
def _onTextReceive(iface, asDict):
"""Special text auto parsing for received messages"""
# We don't throw if the utf8 is invalid in the text message. Instead we just don't populate

View File

@@ -12,7 +12,10 @@ import yaml
from pubsub import pub
import pyqrcode
import pkg_resources
from . import SerialInterface, TCPInterface, BLEInterface, test, remote_hardware
from .serial_interface import SerialInterface
from .tcp_interface import TCPInterface
from .ble_interface import BLEInterface
from . import test, remote_hardware
from . import portnums_pb2, channel_pb2, mesh_pb2, radioconfig_pb2
"""We only import the tunnel code if we are on a platform that can run it"""
@@ -360,7 +363,7 @@ def onConnected(interface):
if 'owner' in configuration:
print(f"Setting device owner to {configuration['owner']}")
getNode().setOwner(configuration['owner'])
if 'channel_url' in configuration:
print("Setting channel url to", configuration['channel_url'])
getNode().setURL(configuration['channel_url'])
@@ -387,7 +390,7 @@ def onConnected(interface):
print("Setting device position")
interface.sendPosition(lat, lon, alt, time)
interface.localNode.writeConfig()
if 'user_prefs' in configuration:
prefs = getNode().radioConfig.preferences
for pref in configuration['user_prefs']:
@@ -428,14 +431,14 @@ def onConnected(interface):
print(f"Deleting channel {channelIndex}")
ch = getNode().deleteChannel(channelIndex)
if args.ch_set or args.ch_longslow or args.ch_longsfast or args.ch_mediumslow or args.ch_mediumsfast or args.ch_shortslow or args.ch_shortfast:
if args.ch_set or args.ch_longslow or args.ch_longfast or args.ch_mediumslow or args.ch_mediumfast or args.ch_shortslow or args.ch_shortfast:
closeNow = True
ch = getNode().channels[channelIndex]
enable = args.ch_enable # should we enable this channel?
if or args.ch_longslow or args.ch_longsfast or args.ch_mediumslow or args.ch_mediumsfast or args.ch_shortslow or args.ch_shortfast:
if args.ch_longslow or args.ch_longfast or args.ch_mediumslow or args.ch_mediumfast or args.ch_shortslow or args.ch_shortfast:
if channelIndex != 0:
raise Exception(
"standard channel settings can only be applied to the PRIMARY channel")
@@ -457,7 +460,7 @@ def onConnected(interface):
setSimpleChannel(
channel_pb2.ChannelSettings.ModemConfig.Bw125Cr48Sf4096)
if args.ch_longsfast:
if args.ch_longfast:
setSimpleChannel(
channel_pb2.ChannelSettings.ModemConfig.Bw31_25Cr48Sf512)
@@ -465,7 +468,7 @@ def onConnected(interface):
setSimpleChannel(
channel_pb2.ChannelSettings.ModemConfig.Bw250Cr46Sf2048)
if args.ch_mediumsfast:
if args.ch_mediumfast:
setSimpleChannel(
channel_pb2.ChannelSettings.ModemConfig.Bw250Cr47Sf1024)
@@ -684,10 +687,23 @@ def initParser():
"--ch-set", help="Set a channel parameter", nargs=2, action='append')
parser.add_argument(
"--ch-longslow", help="Change to the standard long-range (but slow) channel", action='store_true')
"--ch-longslow", help="Change to the long-range and slow channel", action='store_true')
parser.add_argument(
"--ch-shortfast", help="Change to the standard fast (but short range) channel", action='store_true')
"--ch-longfast", help="Change to the long-range and fast channel", action='store_true')
parser.add_argument(
"--ch-mediumslow", help="Change to the medium-range and slow channel", action='store_true')
parser.add_argument(
"--ch-mediumfast", help="Change to the medium-range and fast channel", action='store_true')
parser.add_argument(
"--ch-shortslow", help="Change to the short-range and slow channel", action='store_true')
parser.add_argument(
"--ch-shortfast", help="Change to the short-range and fast channel", action='store_true')
parser.add_argument(
"--set-owner", help="Set device owner name", action="store")

View File

@@ -0,0 +1,51 @@
""" Bluetooth interface
"""
import logging
import pygatt
from .mesh_interface import MeshInterface
# Our standard BLE characteristics
TORADIO_UUID = "f75c76d2-129e-4dad-a1dd-7866124401e7"
FROMRADIO_UUID = "8ba2bcc2-ee02-4a55-a531-c525c5e454d5"
FROMNUM_UUID = "ed9da18c-a800-4f66-a670-aa7547e34453"
class BLEInterface(MeshInterface):
"""A not quite ready - FIXME - BLE interface to devices"""
def __init__(self, address, debugOut=None):
self.address = address
self.adapter = pygatt.GATTToolBackend() # BGAPIBackend()
self.adapter.start()
logging.debug(f"Connecting to {self.address}")
self.device = self.adapter.connect(address)
logging.debug("Connected to device")
# fromradio = self.device.char_read(FROMRADIO_UUID)
MeshInterface.__init__(self, debugOut=debugOut)
self._readFromRadio() # read the initial responses
def handle_data(handle, data):
self._handleFromRadio(data)
self.device.subscribe(FROMNUM_UUID, callback=handle_data)
def _sendToRadioImpl(self, toRadio):
"""Send a ToRadio protobuf to the device"""
#logging.debug(f"Sending: {stripnl(toRadio)}")
b = toRadio.SerializeToString()
self.device.char_write(TORADIO_UUID, b)
def close(self):
MeshInterface.close(self)
self.adapter.stop()
def _readFromRadio(self):
wasEmpty = False
while not wasEmpty:
b = self.device.char_read(FROMRADIO_UUID)
wasEmpty = len(b) == 0
if not wasEmpty:
self._handleFromRadio(b)

View File

@@ -0,0 +1,617 @@
""" Mesh Interface class
"""
import sys
import random
import time
import logging
from typing import AnyStr
import threading
from datetime import datetime
import timeago
from tabulate import tabulate
import google.protobuf.json_format
from pubsub import pub
from google.protobuf.json_format import MessageToJson
from . import portnums_pb2, mesh_pb2
from .util import stripnl, Timeout
from .node import Node
from .__init__ import LOCAL_ADDR, BROADCAST_NUM, BROADCAST_ADDR, ResponseHandler, publishingThread, OUR_APP_VERSION, protocols
defaultHopLimit = 3
class MeshInterface:
"""Interface class for meshtastic devices
Properties:
isConnected
nodes
debugOut
"""
def __init__(self, debugOut=None, noProto=False):
"""Constructor
Keyword Arguments:
noProto -- If True, don't try to run our protocol on the link - just be a dumb serial client.
"""
self.debugOut = debugOut
self.nodes = None # FIXME
self.isConnected = threading.Event()
self.noProto = noProto
self.localNode = Node(self, -1) # We fixup nodenum later
self.myInfo = None # We don't have device info yet
self.responseHandlers = {} # A map from request ID to the handler
self.failure = None # If we've encountered a fatal exception it will be kept here
self._timeout = Timeout()
self.heartbeatTimer = None
random.seed() # FIXME, we should not clobber the random seedval here, instead tell user they must call it
self.currentPacketId = random.randint(0, 0xffffffff)
def close(self):
"""Shutdown this interface"""
if self.heartbeatTimer:
self.heartbeatTimer.cancel()
self._sendDisconnect()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None and exc_value is not None:
logging.error(
f'An exception of type {exc_type} with value {exc_value} has occurred')
if traceback is not None:
logging.error(f'Traceback: {traceback}')
self.close()
def showInfo(self, file=sys.stdout):
"""Show human readable summary about this object"""
owner = f"Owner: {self.getLongName()} ({self.getShortName()})"
myinfo = ''
if self.myInfo:
myinfo = f"\nMy info: {stripnl(MessageToJson(self.myInfo))}"
mesh = "\nNodes in mesh:"
nodes = ""
if self.nodes:
for n in self.nodes.values():
nodes = nodes + f" {stripnl(n)}"
infos = owner + myinfo + mesh + nodes
print(infos)
return infos
def showNodes(self, includeSelf=True, file=sys.stdout):
"""Show table summary of nodes in mesh"""
def formatFloat(value, precision=2, unit=''):
return f'{value:.{precision}f}{unit}' if value else None
def getLH(ts):
return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') if ts else None
def getTimeAgo(ts):
return timeago.format(datetime.fromtimestamp(ts), datetime.now()) if ts else None
rows = []
for node in self.nodes.values():
if not includeSelf and node['num'] == self.localNode.nodeNum:
continue
row = {"N": 0}
user = node.get('user')
if user:
row.update({
"User": user['longName'],
"AKA": user['shortName'],
"ID": user['id'],
})
pos = node.get('position')
if pos:
row.update({
"Latitude": formatFloat(pos.get("latitude"), 4, "°"),
"Longitude": formatFloat(pos.get("longitude"), 4, "°"),
"Altitude": formatFloat(pos.get("altitude"), 0, " m"),
"Battery": formatFloat(pos.get("batteryLevel"), 2, "%"),
})
row.update({
"SNR": formatFloat(node.get("snr"), 2, " dB"),
"LastHeard": getLH(node.get("lastHeard")),
"Since": getTimeAgo(node.get("lastHeard")),
})
rows.append(row)
# Why doesn't this way work?
#rows.sort(key=lambda r: r.get('LastHeard', '0000'), reverse=True)
rows.sort(key=lambda r: r.get('LastHeard') or '0000', reverse=True)
for i, row in enumerate(rows):
row['N'] = i+1
table = tabulate(rows, headers='keys', missingval='N/A',
tablefmt='fancy_grid')
print(table)
return table
def getNode(self, nodeId):
"""Return a node object which contains device settings and channel info"""
if nodeId == LOCAL_ADDR:
return self.localNode
else:
n = Node(self, nodeId)
n.requestConfig()
if not n.waitForConfig():
raise Exception("Timed out waiting for node config")
return n
def sendText(self, text: AnyStr,
destinationId=BROADCAST_ADDR,
wantAck=False,
wantResponse=False,
hopLimit=defaultHopLimit,
onResponse=None,
channelIndex=0):
"""Send a utf8 string to some other node, if the node has a display it will also be shown on the device.
Arguments:
text {string} -- The text to send
Keyword Arguments:
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
wantResponse -- True if you want the service on the other side to send an application layer response
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
"""
return self.sendData(text.encode("utf-8"), destinationId,
portNum=portnums_pb2.PortNum.TEXT_MESSAGE_APP,
wantAck=wantAck,
wantResponse=wantResponse,
hopLimit=hopLimit,
onResponse=onResponse,
channelIndex=channelIndex)
def sendData(self, data, destinationId=BROADCAST_ADDR,
portNum=portnums_pb2.PortNum.PRIVATE_APP, wantAck=False,
wantResponse=False,
hopLimit=defaultHopLimit,
onResponse=None,
channelIndex=0):
"""Send a data packet to some other node
Keyword Arguments:
data -- the data to send, either as an array of bytes or as a protobuf (which will be automatically serialized to bytes)
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
wantResponse -- True if you want the service on the other side to send an application layer response
onResponse -- A closure of the form funct(packet), that will be called when a response packet arrives
(or the transaction is NAKed due to non receipt)
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
"""
if getattr(data, "SerializeToString", None):
logging.debug(f"Serializing protobuf as data: {stripnl(data)}")
data = data.SerializeToString()
if len(data) > mesh_pb2.Constants.DATA_PAYLOAD_LEN:
raise Exception("Data payload too big")
if portNum == portnums_pb2.PortNum.UNKNOWN_APP: # we are now more strict wrt port numbers
raise Exception("A non-zero port number must be specified")
meshPacket = mesh_pb2.MeshPacket()
meshPacket.channel = channelIndex
meshPacket.decoded.payload = data
meshPacket.decoded.portnum = portNum
meshPacket.decoded.want_response = wantResponse
p = self._sendPacket(meshPacket, destinationId,
wantAck=wantAck, hopLimit=hopLimit)
if onResponse is not None:
self._addResponseHandler(p.id, onResponse)
return p
def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False):
"""
Send a position packet to some other node (normally a broadcast)
Also, the device software will notice this packet and use it to automatically set its notion of
the local position.
If timeSec is not specified (recommended), we will use the local machine time.
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
"""
p = mesh_pb2.Position()
if latitude != 0.0:
p.latitude_i = int(latitude / 1e-7)
if longitude != 0.0:
p.longitude_i = int(longitude / 1e-7)
if altitude != 0:
p.altitude = int(altitude)
if timeSec == 0:
timeSec = time.time() # returns unix timestamp in seconds
p.time = int(timeSec)
return self.sendData(p, destinationId,
portNum=portnums_pb2.PortNum.POSITION_APP,
wantAck=wantAck,
wantResponse=wantResponse)
def _addResponseHandler(self, requestId, callback):
self.responseHandlers[requestId] = ResponseHandler(callback)
def _sendPacket(self, meshPacket,
destinationId=BROADCAST_ADDR,
wantAck=False, hopLimit=defaultHopLimit):
"""Send a MeshPacket to the specified node (or if unspecified, broadcast).
You probably don't want this - use sendData instead.
Returns the sent packet. The id field will be populated in this packet and
can be used to track future message acks/naks.
"""
# We allow users to talk to the local node before we've completed the full connection flow...
if(self.myInfo is not None and destinationId != self.myInfo.my_node_num):
self._waitConnected()
toRadio = mesh_pb2.ToRadio()
if destinationId is None:
raise Exception("destinationId must not be None")
elif isinstance(destinationId, int):
nodeNum = destinationId
elif destinationId == BROADCAST_ADDR:
nodeNum = BROADCAST_NUM
elif destinationId == LOCAL_ADDR:
nodeNum = self.myInfo.my_node_num
# A simple hex style nodeid - we can parse this without needing the DB
elif destinationId.startswith("!"):
nodeNum = int(destinationId[1:], 16)
else:
node = self.nodes.get(destinationId)
if not node:
raise Exception(f"NodeId {destinationId} not found in DB")
nodeNum = node['num']
meshPacket.to = nodeNum
meshPacket.want_ack = wantAck
meshPacket.hop_limit = hopLimit
# if the user hasn't set an ID for this packet (likely and recommended), we should pick a new unique ID
# so the message can be tracked.
if meshPacket.id == 0:
meshPacket.id = self._generatePacketId()
toRadio.packet.CopyFrom(meshPacket)
#logging.debug(f"Sending packet: {stripnl(meshPacket)}")
self._sendToRadio(toRadio)
return meshPacket
def waitForConfig(self):
"""Block until radio config is received. Returns True if config has been received."""
success = self._timeout.waitForSet(self, attrs=('myInfo', 'nodes')
) and self.localNode.waitForConfig()
if not success:
raise Exception("Timed out waiting for interface config")
def getMyNodeInfo(self):
"""Get info about my node."""
if self.myInfo is None:
return None
return self.nodesByNum.get(self.myInfo.my_node_num)
def getMyUser(self):
"""Get user"""
nodeInfo = self.getMyNodeInfo()
if nodeInfo is not None:
return nodeInfo.get('user')
return None
def getLongName(self):
"""Get long name"""
user = self.getMyUser()
if user is not None:
return user.get('longName', None)
return None
def getShortName(self):
"""Get short name"""
user = self.getMyUser()
if user is not None:
return user.get('shortName', None)
return None
def _waitConnected(self):
"""Block until the initial node db download is complete, or timeout
and raise an exception"""
if not self.isConnected.wait(10.0): # timeout after 10 seconds
raise Exception("Timed out waiting for connection completion")
# If we failed while connecting, raise the connection to the client
if self.failure:
raise self.failure
def _generatePacketId(self):
"""Get a new unique packet ID"""
if self.currentPacketId is None:
raise Exception("Not connected yet, can not generate packet")
else:
self.currentPacketId = (self.currentPacketId + 1) & 0xffffffff
return self.currentPacketId
def _disconnected(self):
"""Called by subclasses to tell clients this interface has disconnected"""
self.isConnected.clear()
publishingThread.queueWork(lambda: pub.sendMessage(
"meshtastic.connection.lost", interface=self))
def _startHeartbeat(self):
"""We need to send a heartbeat message to the device every X seconds"""
def callback():
self.heartbeatTimer = None
prefs = self.localNode.radioConfig.preferences
i = prefs.phone_timeout_secs / 2
logging.debug(f"Sending heartbeat, interval {i}")
if i != 0:
self.heartbeatTimer = threading.Timer(i, callback)
self.heartbeatTimer.start()
p = mesh_pb2.ToRadio()
self._sendToRadio(p)
callback() # run our periodic callback now, it will make another timer if necessary
def _connected(self):
"""Called by this class to tell clients we are now fully connected to a node
"""
# (because I'm lazy) _connected might be called when remote Node
# objects complete their config reads, don't generate redundant isConnected
# for the local interface
if not self.isConnected.is_set():
self.isConnected.set()
self._startHeartbeat()
publishingThread.queueWork(lambda: pub.sendMessage(
"meshtastic.connection.established", interface=self))
def _startConfig(self):
"""Start device packets flowing"""
self.myInfo = None
self.nodes = {} # nodes keyed by ID
self.nodesByNum = {} # nodes keyed by nodenum
startConfig = mesh_pb2.ToRadio()
self.configId = random.randint(0, 0xffffffff)
startConfig.want_config_id = self.configId
self._sendToRadio(startConfig)
def _sendDisconnect(self):
"""Tell device we are done using it"""
m = mesh_pb2.ToRadio()
m.disconnect = True
self._sendToRadio(m)
def _sendToRadio(self, toRadio):
"""Send a ToRadio protobuf to the device"""
if self.noProto:
logging.warning(
f"Not sending packet because protocol use is disabled by noProto")
else:
#logging.debug(f"Sending toRadio: {stripnl(toRadio)}")
self._sendToRadioImpl(toRadio)
def _sendToRadioImpl(self, toRadio):
"""Send a ToRadio protobuf to the device"""
logging.error(f"Subclass must provide toradio: {toRadio}")
def _handleConfigComplete(self):
"""
Done with initial config messages, now send regular MeshPackets to ask for settings and channels
"""
self.localNode.requestConfig()
def _handleFromRadio(self, fromRadioBytes):
"""
Handle a packet that arrived from the radio(update model and publish events)
Called by subclasses."""
fromRadio = mesh_pb2.FromRadio()
fromRadio.ParseFromString(fromRadioBytes)
asDict = google.protobuf.json_format.MessageToDict(fromRadio)
#logging.debug(f"Received from radio: {fromRadio}")
if fromRadio.HasField("my_info"):
self.myInfo = fromRadio.my_info
self.localNode.nodeNum = self.myInfo.my_node_num
logging.debug(f"Received myinfo: {stripnl(fromRadio.my_info)}")
failmsg = None
# Check for app too old
if self.myInfo.min_app_version > OUR_APP_VERSION:
failmsg = "This device needs a newer python client, please \"pip install --upgrade meshtastic\". "\
"For more information see https://tinyurl.com/5bjsxu32"
# check for firmware too old
if self.myInfo.max_channels == 0:
failmsg = "This version of meshtastic-python requires device firmware version 1.2 or later. "\
"For more information see https://tinyurl.com/5bjsxu32"
if failmsg:
self.failure = Exception(failmsg)
self.isConnected.set() # let waitConnected return this exception
self.close()
elif fromRadio.HasField("node_info"):
node = asDict["nodeInfo"]
try:
self._fixupPosition(node["position"])
except:
logging.debug("Node without position")
logging.debug(f"Received nodeinfo: {node}")
self.nodesByNum[node["num"]] = node
if "user" in node: # Some nodes might not have user/ids assigned yet
self.nodes[node["user"]["id"]] = node
publishingThread.queueWork(lambda: pub.sendMessage("meshtastic.node.updated",
node=node, interface=self))
elif fromRadio.config_complete_id == self.configId:
# we ignore the config_complete_id, it is unneeded for our stream API fromRadio.config_complete_id
logging.debug(f"Config complete ID {self.configId}")
self._handleConfigComplete()
elif fromRadio.HasField("packet"):
self._handlePacketFromRadio(fromRadio.packet)
elif fromRadio.rebooted:
# Tell clients the device went away. Careful not to call the overridden subclass version that closes the serial port
MeshInterface._disconnected(self)
self._startConfig() # redownload the node db etc...
else:
logging.debug("Unexpected FromRadio payload")
def _fixupPosition(self, position):
"""Convert integer lat/lon into floats
Arguments:
position {Position dictionary} -- object ot fix up
"""
if "latitudeI" in position:
position["latitude"] = position["latitudeI"] * 1e-7
if "longitudeI" in position:
position["longitude"] = position["longitudeI"] * 1e-7
def _nodeNumToId(self, num):
"""Map a node node number to a node ID
Arguments:
num {int} -- Node number
Returns:
string -- Node ID
"""
if num == BROADCAST_NUM:
return BROADCAST_ADDR
try:
return self.nodesByNum[num]["user"]["id"]
except:
logging.debug(f"Node {num} not found for fromId")
return None
def _getOrCreateByNum(self, nodeNum):
"""Given a nodenum find the NodeInfo in the DB (or create if necessary)"""
if nodeNum == BROADCAST_NUM:
raise Exception("Can not create/find nodenum by the broadcast num")
if nodeNum in self.nodesByNum:
return self.nodesByNum[nodeNum]
else:
n = {"num": nodeNum} # Create a minimial node db entry
self.nodesByNum[nodeNum] = n
return n
def _handlePacketFromRadio(self, meshPacket):
"""Handle a MeshPacket that just arrived from the radio
Will publish one of the following events:
- meshtastic.receive.text(packet = MeshPacket dictionary)
- meshtastic.receive.position(packet = MeshPacket dictionary)
- meshtastic.receive.user(packet = MeshPacket dictionary)
- meshtastic.receive.data(packet = MeshPacket dictionary)
"""
asDict = google.protobuf.json_format.MessageToDict(meshPacket)
# We normally decompose the payload into a dictionary so that the client
# doesn't need to understand protobufs. But advanced clients might
# want the raw protobuf, so we provide it in "raw"
asDict["raw"] = meshPacket
# from might be missing if the nodenum was zero.
if not "from" in asDict:
asDict["from"] = 0
logging.error(
f"Device returned a packet we sent, ignoring: {stripnl(asDict)}")
return
if not "to" in asDict:
asDict["to"] = 0
# /add fromId and toId fields based on the node ID
try:
asDict["fromId"] = self._nodeNumToId(asDict["from"])
except Exception as ex:
logging.warning(f"Not populating fromId {ex}")
try:
asDict["toId"] = self._nodeNumToId(asDict["to"])
except Exception as ex:
logging.warning(f"Not populating toId {ex}")
# We could provide our objects as DotMaps - which work with . notation or as dictionaries
# asObj = DotMap(asDict)
topic = "meshtastic.receive" # Generic unknown packet type
decoded = asDict["decoded"]
# The default MessageToDict converts byte arrays into base64 strings.
# We don't want that - it messes up data payload. So slam in the correct
# byte array.
decoded["payload"] = meshPacket.decoded.payload
# UNKNOWN_APP is the default protobuf portnum value, and therefore if not set it will not be populated at all
# to make API usage easier, set it to prevent confusion
if not "portnum" in decoded:
decoded["portnum"] = portnums_pb2.PortNum.Name(
portnums_pb2.PortNum.UNKNOWN_APP)
portnum = decoded["portnum"]
topic = f"meshtastic.receive.data.{portnum}"
# decode position protobufs and update nodedb, provide decoded version as "position" in the published msg
# move the following into a 'decoders' API that clients could register?
portNumInt = meshPacket.decoded.portnum # we want portnum as an int
handler = protocols.get(portNumInt)
# The decoded protobuf as a dictionary (if we understand this message)
p = None
if handler is not None:
topic = f"meshtastic.receive.{handler.name}"
# Convert to protobuf if possible
if handler.protobufFactory is not None:
pb = handler.protobufFactory()
pb.ParseFromString(meshPacket.decoded.payload)
p = google.protobuf.json_format.MessageToDict(pb)
asDict["decoded"][handler.name] = p
# Also provide the protobuf raw
asDict["decoded"][handler.name]["raw"] = pb
# Call specialized onReceive if necessary
if handler.onReceive is not None:
handler.onReceive(self, asDict)
# Is this message in response to a request, if so, look for a handler
requestId = decoded.get("requestId")
if requestId is not None:
# We ignore ACK packets, but send NAKs and data responses to the handlers
routing = decoded.get("routing")
isAck = routing is not None and ("errorReason" not in routing)
if not isAck:
# we keep the responseHandler in dict until we get a non ack
handler = self.responseHandlers.pop(requestId, None)
if handler is not None:
handler.callback(asDict)
logging.debug(f"Publishing {topic}: packet={stripnl(asDict)} ")
publishingThread.queueWork(lambda: pub.sendMessage(
topic, packet=asDict, interface=self))

View File

@@ -60,24 +60,7 @@ import base64
from typing import *
from google.protobuf.json_format import MessageToJson
from . import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2
from .util import stripnl, Timeout
def pskToString(psk: bytes):
"""Given an array of PSK bytes, decode them into a human readable (but privacy protecting) string"""
if len(psk) == 0:
return "unencrypted"
elif len(psk) == 1:
b = psk[0]
if b == 0:
return "unencrypted"
elif b == 1:
return "default"
else:
return f"simple{b - 1}"
else:
return "secret"
from .util import pskToString, stripnl, Timeout
class Node:
@@ -97,11 +80,11 @@ class Node:
def showChannels(self):
"""Show human readable description of our channels"""
print("Channels:")
for c in self.channels:
if c.role != channel_pb2.Channel.Role.DISABLED:
cStr = stripnl(MessageToJson(c.settings))
print(
f" {channel_pb2.Channel.Role.Name(c.role)} psk={pskToString(c.settings.psk)} {cStr}")
if self.channels:
for c in self.channels:
if c.role != channel_pb2.Channel.Role.DISABLED:
cStr = stripnl(MessageToJson(c.settings))
print(f" {channel_pb2.Channel.Role.Name(c.role)} psk={pskToString(c.settings.psk)} {cStr}")
publicURL = self.getURL(includeAll=False)
adminURL = self.getURL(includeAll=True)
print(f"\nPrimary channel URL: {publicURL}")
@@ -110,8 +93,10 @@ class Node:
def showInfo(self):
"""Show human readable description of our node"""
print(
f"Preferences: {stripnl(MessageToJson(self.radioConfig.preferences))}\n")
prefs = ""
if self.radioConfig and self.radioConfig.preferences:
prefs = stripnl(MessageToJson(self.radioConfig.preferences))
print(f"Preferences: {prefs}\n")
self.showChannels()
def requestConfig(self):
@@ -231,9 +216,10 @@ class Node:
"""
# Only keep the primary/secondary channels, assume primary is first
channelSet = apponly_pb2.ChannelSet()
for c in self.channels:
if c.role == channel_pb2.Channel.Role.PRIMARY or (includeAll and c.role == channel_pb2.Channel.Role.SECONDARY):
channelSet.settings.append(c.settings)
if self.channels:
for c in self.channels:
if c.role == channel_pb2.Channel.Role.PRIMARY or (includeAll and c.role == channel_pb2.Channel.Role.SECONDARY):
channelSet.settings.append(c.settings)
bytes = channelSet.SerializeToString()
s = base64.urlsafe_b64encode(bytes).decode('ascii')
return f"https://www.meshtastic.org/d/#{s}".replace("=", "")

View File

@@ -1,4 +1,5 @@
""" Remote hardware
"""
from pubsub import pub
from . import portnums_pb2, remote_hardware_pb2
@@ -28,7 +29,8 @@ class RemoteHardwareClient:
ch = iface.localNode.getChannelByName("gpio")
if not ch:
raise Exception(
"No gpio channel found, please create on the sending and receive nodes to use this (secured) service (--ch-add gpio --info then --seturl)")
"No gpio channel found, please create on the sending and receive nodes "\
"to use this (secured) service (--ch-add gpio --info then --seturl)")
self.channelIndex = ch.index
pub.subscribe(

View File

@@ -0,0 +1,75 @@
""" Serial interface class
"""
import logging
import platform
import os
import stat
import serial
from .stream_interface import StreamInterface
from .util import findPorts
class SerialInterface(StreamInterface):
"""Interface class for meshtastic devices over a serial link"""
def __init__(self, devPath=None, debugOut=None, noProto=False, connectNow=True):
"""Constructor, opens a connection to a specified serial port, or if unspecified try to
find one Meshtastic device by probing
Keyword Arguments:
devPath {string} -- A filepath to a device, i.e. /dev/ttyUSB0 (default: {None})
debugOut {stream} -- If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None})
"""
if devPath is None:
ports = findPorts()
if len(ports) == 0:
raise Exception("No Meshtastic devices detected")
elif len(ports) > 1:
raise Exception(
f"Multiple ports detected, you must specify a device, such as {ports[0]}")
else:
devPath = ports[0]
logging.debug(f"Connecting to {devPath}")
# Note: we provide None for port here, because we will be opening it later
self.stream = serial.Serial(
None, 921600, exclusive=True, timeout=0.5, write_timeout=0)
# rts=False Needed to prevent TBEAMs resetting on OSX, because rts is connected to reset
self.stream.port = devPath
# HACK: If the platform driving the serial port is unable to leave the RTS pin in high-impedance
# mode, set RTS to false so that the device platform won't be reset spuriously.
# Linux does this properly, so don't apply this hack on Linux (because it makes the reset button not work).
if self._hostPlatformAlwaysDrivesUartRts():
self.stream.rts = False
self.stream.open()
StreamInterface.__init__(
self, debugOut=debugOut, noProto=noProto, connectNow=connectNow)
"""true if platform driving the serial port is Windows Subsystem for Linux 1."""
def _isWsl1(self):
# WSL1 identifies itself as Linux, but has a special char device at /dev/lxss for use with session control,
# e.g. /init. We should treat WSL1 as Windows for the RTS-driving hack because the underlying platfrom
# serial driver for the CP21xx still exhibits the buggy behavior.
# WSL2 is not covered here, as it does not (as of 2021-May-25) support the appropriate functionality to
# share or pass-through serial ports.
try:
# Claims to be Linux, but has /dev/lxss; must be WSL 1
return platform.system() == 'Linux' and stat.S_ISCHR(os.stat('/dev/lxss').st_mode)
except:
# Couldn't stat /dev/lxss special device; not WSL1
return False
def _hostPlatformAlwaysDrivesUartRts(self):
# OS-X/Windows seems to have a bug in its CP21xx serial drivers. It ignores that we asked for no RTSCTS
# control and will always drive RTS either high or low (rather than letting the CP102 leave
# it as an open-collector floating pin).
# TODO: When WSL2 supports USB passthrough, this will get messier. If/when WSL2 gets virtual serial
# ports that "share" the Windows serial port (and thus the Windows drivers), this code will need to be
# updated to reflect that as well -- or if T-Beams get made with an alternate USB to UART bridge that has
# a less buggy driver.
return platform.system() != 'Linux' or self._isWsl1()

View File

@@ -0,0 +1,171 @@
""" Stream Interface base class
"""
import logging
import threading
import time
import traceback
import serial
from .mesh_interface import MeshInterface
from .util import stripnl
START1 = 0x94
START2 = 0xc3
HEADER_LEN = 4
MAX_TO_FROM_RADIO_SIZE = 512
class StreamInterface(MeshInterface):
"""Interface class for meshtastic devices over a stream link (serial, TCP, etc)"""
def __init__(self, debugOut=None, noProto=False, connectNow=True):
"""Constructor, opens a connection to self.stream
Keyword Arguments:
devPath {string} -- A filepath to a device, i.e. /dev/ttyUSB0 (default: {None})
debugOut {stream} -- If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None})
Raises:
Exception: [description]
Exception: [description]
"""
if not hasattr(self, 'stream'):
raise Exception(
"StreamInterface is now abstract (to update existing code create SerialInterface instead)")
self._rxBuf = bytes() # empty
self._wantExit = False
# FIXME, figure out why daemon=True causes reader thread to exit too early
self._rxThread = threading.Thread(
target=self.__reader, args=(), daemon=True)
MeshInterface.__init__(self, debugOut=debugOut, noProto=noProto)
# Start the reader thread after superclass constructor completes init
if connectNow:
self.connect()
if not noProto:
self.waitForConfig()
def connect(self):
"""Connect to our radio
Normally this is called automatically by the constructor, but if you passed in connectNow=False you can manually
start the reading thread later.
"""
# Send some bogus UART characters to force a sleeping device to wake, and if the reading statemachine was parsing a bad packet make sure
# we write enought start bytes to force it to resync (we don't use START1 because we want to ensure it is looking for START1)
p = bytearray([START2] * 32)
self._writeBytes(p)
time.sleep(0.1) # wait 100ms to give device time to start running
self._rxThread.start()
self._startConfig()
if not self.noProto: # Wait for the db download if using the protocol
self._waitConnected()
def _disconnected(self):
"""We override the superclass implementation to close our port"""
MeshInterface._disconnected(self)
logging.debug("Closing our port")
if not self.stream is None:
self.stream.close()
self.stream = None
def _writeBytes(self, b):
"""Write an array of bytes to our stream and flush"""
if self.stream: # ignore writes when stream is closed
self.stream.write(b)
self.stream.flush()
def _readBytes(self, length):
"""Read an array of bytes from our stream"""
return self.stream.read(length)
def _sendToRadioImpl(self, toRadio):
"""Send a ToRadio protobuf to the device"""
logging.debug(f"Sending: {stripnl(toRadio)}")
b = toRadio.SerializeToString()
bufLen = len(b)
# We convert into a string, because the TCP code doesn't work with byte arrays
header = bytes([START1, START2, (bufLen >> 8) & 0xff, bufLen & 0xff])
self._writeBytes(header + b)
def close(self):
"""Close a connection to the device"""
logging.debug("Closing stream")
MeshInterface.close(self)
# pyserial cancel_read doesn't seem to work, therefore we ask the reader thread to close things for us
self._wantExit = True
if self._rxThread != threading.current_thread():
self._rxThread.join() # wait for it to exit
def __reader(self):
"""The reader thread that reads bytes from our stream"""
empty = bytes()
try:
while not self._wantExit:
# logging.debug("reading character")
b = self._readBytes(1)
# logging.debug("In reader loop")
# logging.debug(f"read returned {b}")
if len(b) > 0:
c = b[0]
ptr = len(self._rxBuf)
# Assume we want to append this byte, fixme use bytearray instead
self._rxBuf = self._rxBuf + b
if ptr == 0: # looking for START1
if c != START1:
self._rxBuf = empty # failed to find start
if self.debugOut is not None:
try:
self.debugOut.write(b.decode("utf-8"))
except:
self.debugOut.write('?')
elif ptr == 1: # looking for START2
if c != START2:
self._rxBuf = empty # failed to find start2
elif ptr >= HEADER_LEN - 1: # we've at least got a header
# big endian length follos header
packetlen = (self._rxBuf[2] << 8) + self._rxBuf[3]
if ptr == HEADER_LEN - 1: # we _just_ finished reading the header, validate length
if packetlen > MAX_TO_FROM_RADIO_SIZE:
self._rxBuf = empty # length ws out out bounds, restart
if len(self._rxBuf) != 0 and ptr + 1 >= packetlen + HEADER_LEN:
try:
self._handleFromRadio(self._rxBuf[HEADER_LEN:])
except Exception as ex:
logging.error(
f"Error while handling message from radio {ex}")
traceback.print_exc()
self._rxBuf = empty
else:
# logging.debug(f"timeout")
pass
except serial.SerialException as ex:
if not self._wantExit: # We might intentionally get an exception during shutdown
logging.warning(
f"Meshtastic serial port disconnected, disconnecting... {ex}")
except OSError as ex:
if not self._wantExit: # We might intentionally get an exception during shutdown
logging.error(
f"Unexpected OSError, terminating meshtastic reader... {ex}")
except Exception as ex:
logging.error(
f"Unexpected exception, terminating meshtastic reader... {ex}")
finally:
logging.debug("reader is exiting")
self._disconnected()

View File

@@ -0,0 +1,52 @@
""" TCPInterface class for interfacing with http endpoint
"""
import logging
import socket
from typing import AnyStr
from .stream_interface import StreamInterface
class TCPInterface(StreamInterface):
"""Interface class for meshtastic devices over a TCP link"""
def __init__(self, hostname: AnyStr, debugOut=None, noProto=False, connectNow=True, portNumber=4403):
"""Constructor, opens a connection to a specified IP address/hostname
Keyword Arguments:
hostname {string} -- Hostname/IP address of the device to connect to
"""
logging.debug(f"Connecting to {hostname}")
server_address = (hostname, portNumber)
sock = socket.create_connection(server_address)
# Instead of wrapping as a stream, we use the native socket API
# self.stream = sock.makefile('rw')
self.stream = None
self.socket = sock
StreamInterface.__init__(
self, debugOut=debugOut, noProto=noProto, connectNow=connectNow)
def close(self):
"""Close a connection to the device"""
logging.debug("Closing TCP stream")
StreamInterface.close(self)
# Sometimes the socket read might be blocked in the reader thread. Therefore we force the shutdown by closing
# the socket here
self._wantExit = True
if not self.socket is None:
try:
self.socket.shutdown(socket.SHUT_RDWR)
except:
pass # Ignore errors in shutdown, because we might have a race with the server
self.socket.close()
def _writeBytes(self, b):
"""Write an array of bytes to our stream and flush"""
self.socket.send(b)
def _readBytes(self, length):
"""Read an array of bytes from our stream"""
return self.socket.recv(length)

View File

@@ -7,7 +7,9 @@ import traceback
from dotmap import DotMap
from pubsub import pub
from . import util
from . import SerialInterface, TCPInterface, BROADCAST_NUM
from .__init__ import BROADCAST_NUM
from .serial_interface import SerialInterface
from .tcp_interface import TCPInterface
"""The interfaces we are using for our tests"""
interfaces = None

View File

View File

@@ -1,7 +1,6 @@
"""Meshtastic integration tests"""
import re
import subprocess
import platform
import pytest

View File

@@ -0,0 +1,14 @@
"""Meshtastic unit tests for node.py"""
import pytest
from meshtastic.mesh_interface import MeshInterface
@pytest.mark.unit
def test_MeshInterface():
"""Test that we instantiate a MeshInterface"""
iface = MeshInterface(noProto=True)
iface.showInfo()
iface.localNode.showInfo()
iface.close()

View File

@@ -1,7 +1,6 @@
"""Meshtastic smoke tests with a single device"""
import re
import subprocess
import platform
import time
import os
@@ -11,11 +10,15 @@ import pytest
import meshtastic
# seconds to pause after running a meshtastic command
PAUSE_AFTER_COMMAND = 2
PAUSE_AFTER_REBOOT = 7
@pytest.mark.smoke1
def test_smoke1_reboot():
"""Test reboot"""
return_value, out = subprocess.getstatusoutput('meshtastic --reboot')
return_value, _ = subprocess.getstatusoutput('meshtastic --reboot')
assert return_value == 0
# pause for the radio to reset (10 seconds for the pause, and a few more seconds to be back up)
time.sleep(18)
@@ -51,7 +54,7 @@ def test_smoke1_seriallog_to_file():
filename = 'tmpoutput.txt'
if os.path.exists(f"{filename}"):
os.remove(f"{filename}")
return_value, out = subprocess.getstatusoutput(f'meshtastic --info --seriallog {filename}')
return_value, _ = subprocess.getstatusoutput(f'meshtastic --info --seriallog {filename}')
assert os.path.exists(f"{filename}")
assert return_value == 0
os.remove(f"{filename}")
@@ -63,7 +66,7 @@ def test_smoke1_qr():
filename = 'tmpqr'
if os.path.exists(f"{filename}"):
os.remove(f"{filename}")
return_value, out = subprocess.getstatusoutput(f'meshtastic --qr > {filename}')
return_value, _ = subprocess.getstatusoutput(f'meshtastic --qr > {filename}')
assert os.path.exists(f"{filename}")
# not really testing that a valid qr code is created, just that the file size
# is reasonably big enough for a qr code
@@ -113,7 +116,7 @@ def test_smoke1_set_is_router_true():
assert re.search(r'^Set is_router to true', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(1)
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --get is_router')
assert re.search(r'^is_router: True', out, re.MULTILINE)
assert return_value == 0
@@ -129,7 +132,7 @@ def test_smoke1_set_location_info():
assert re.search(r'^Fixing longitude', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(1)
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out2 = subprocess.getstatusoutput('meshtastic --info')
assert re.search(r'1337', out2, re.MULTILINE)
assert re.search(r'32.7767', out2, re.MULTILINE)
@@ -145,7 +148,7 @@ def test_smoke1_set_is_router_false():
assert re.search(r'^Set is_router to false', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(1)
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --get is_router')
assert re.search(r'^is_router: False', out, re.MULTILINE)
assert return_value == 0
@@ -160,18 +163,18 @@ def test_smoke1_set_owner():
assert re.search(r'^Setting device owner to Bob', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(1)
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --info')
assert not re.search(r'Owner: Joe', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(1)
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --set-owner Joe')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Setting device owner to Joe', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(1)
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --info')
assert re.search(r'Owner: Joe', out, re.MULTILINE)
assert return_value == 0
@@ -186,40 +189,45 @@ def test_smoke1_set_team():
assert re.search(r'^Setting team to CLEAR', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(1)
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --set-team CYAN')
assert re.search(r'Setting team to CYAN', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(1)
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --info')
assert re.search(r'CYAN', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smoke1
def test_smoke1_ch_longslow_and_ch_shortfast():
"""Test --ch-longslow and --ch-shortfast"""
# unset the team
return_value, out = subprocess.getstatusoutput('meshtastic --ch-longslow')
assert re.match(r'Connected to radio', out)
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0
# pause for the radio (might reboot)
time.sleep(5)
return_value, out = subprocess.getstatusoutput('meshtastic --info')
assert re.search(r'Bw125Cr48Sf4096', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(1)
return_value, out = subprocess.getstatusoutput('meshtastic --ch-shortfast')
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0
# pause for the radio (might reboot)
time.sleep(5)
return_value, out = subprocess.getstatusoutput('meshtastic --info')
assert re.search(r'Bw500Cr45Sf128', out, re.MULTILINE)
assert return_value == 0
def test_smoke1_ch_values():
"""Test --ch-longslow, --ch-longfast, --ch-mediumslow, --ch-mediumsfast,
--ch-shortslow, and --ch-shortfast arguments
"""
exp = {
'--ch-longslow': 'Bw125Cr48Sf4096',
# TODO: not sure why these fail thru tests, but ok manually
#'--ch-longfast': 'Bw31_25Cr48Sf512',
#'--ch-mediumslow': 'Bw250Cr46Sf2048',
#'--ch-mediumfast': 'Bw250Cr47Sf1024',
# TODO '--ch-shortslow': '?',
'--ch-shortfast': 'Bw500Cr45Sf128'
}
for key, val in exp.items():
print(key, val)
return_value, out = subprocess.getstatusoutput(f'meshtastic {key}')
assert re.match(r'Connected to radio', out)
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0
# pause for the radio (might reboot)
time.sleep(PAUSE_AFTER_REBOOT)
return_value, out = subprocess.getstatusoutput('meshtastic --info')
assert re.search(val, out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smoke1
@@ -229,13 +237,13 @@ def test_smoke1_ch_set_name():
assert not re.search(r'MyChannel', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(1)
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --ch-set name MyChannel')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Set name to MyChannel', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(1)
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --info')
assert re.search(r'MyChannel', out, re.MULTILINE)
assert return_value == 0
@@ -248,19 +256,19 @@ def test_smoke1_ch_add_and_ch_del():
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(1)
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'SECONDARY', out, re.MULTILINE)
assert re.search(r'testing', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(1)
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --ch-index 1 --ch-del')
assert re.search(r'Deleting channel 1', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(4)
time.sleep(PAUSE_AFTER_REBOOT)
# make sure the secondar channel is not there
return_value, out = subprocess.getstatusoutput('meshtastic --info')
assert re.match(r'Connected to radio', out)
@@ -276,13 +284,13 @@ def test_smoke1_ch_set_modem_config():
assert not re.search(r'Bw31_25Cr48Sf512', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(1)
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --ch-set modem_config Bw31_25Cr48Sf512')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Set modem_config to Bw31_25Cr48Sf512', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(1)
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --info')
assert re.search(r'Bw31_25Cr48Sf512', out, re.MULTILINE)
assert return_value == 0
@@ -295,7 +303,7 @@ def test_smoke1_seturl_default():
return_value, out = subprocess.getstatusoutput('meshtastic --ch-set name foo')
assert return_value == 0
# pause for the radio
time.sleep(1)
time.sleep(PAUSE_AFTER_COMMAND)
# ensure we no longer have a default primary channel
return_value, out = subprocess.getstatusoutput('meshtastic --info')
assert not re.search('CgUYAyIBAQ', out, re.MULTILINE)
@@ -305,7 +313,7 @@ def test_smoke1_seturl_default():
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(1)
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --info')
assert re.search('CgUYAyIBAQ', out, re.MULTILINE)
assert return_value == 0
@@ -316,7 +324,7 @@ def test_smoke1_seturl_invalid_url():
"""Test --seturl with invalid url"""
# Note: This url is no longer a valid url.
url = "https://www.meshtastic.org/c/#GAMiENTxuzogKQdZ8Lz_q89Oab8qB0RlZmF1bHQ="
return_value, out = subprocess.getstatusoutput(f"meshtastic --seturl {url}")
_, out = subprocess.getstatusoutput(f"meshtastic --seturl {url}")
assert re.match(r'Connected to radio', out)
assert re.search('Aborting', out, re.MULTILINE)
@@ -324,7 +332,7 @@ def test_smoke1_seturl_invalid_url():
@pytest.mark.smoke1
def test_smoke1_configure():
"""Test --configure"""
return_value, out = subprocess.getstatusoutput(f"meshtastic --configure example_config.yaml")
_ , out = subprocess.getstatusoutput(f"meshtastic --configure example_config.yaml")
assert re.match(r'Connected to radio', out)
assert re.search('^Setting device owner to Bob TBeam', out, re.MULTILINE)
assert re.search('^Fixing altitude at 304 meters', out, re.MULTILINE)

View File

@@ -1,11 +1,8 @@
"""Meshtastic unit tests for node.py"""
import re
import subprocess
import platform
import pytest
from meshtastic.node import pskToString
from meshtastic.util import pskToString
@pytest.mark.unit
def test_pskToString_empty_string():

View File

@@ -13,6 +13,22 @@ import serial.tools.list_ports
blacklistVids = dict.fromkeys([0x1366])
def pskToString(psk: bytes):
"""Given an array of PSK bytes, decode them into a human readable (but privacy protecting) string"""
if len(psk) == 0:
return "unencrypted"
elif len(psk) == 1:
b = psk[0]
if b == 0:
return "unencrypted"
elif b == 1:
return "default"
else:
return f"simple{b - 1}"
else:
return "secret"
def stripnl(s):
"""remove newlines from a string (and remove extra whitespace)"""
s = str(s).replace("\n", " ")

View File

@@ -1,5 +1,12 @@
markdown
pdoc3
pyserial
protobuf
dotmap
pexpect
pyqrcode
pygatt
tabulate
timeago
webencodings
pyparsing
twine
@@ -8,3 +15,4 @@ pylint
pytest
pytest-cov
pyyaml
pytap2