From 6c532215e423bb98e10eb6793ec2f7fb31314605 Mon Sep 17 00:00:00 2001 From: Kevin Hester Date: Fri, 2 Apr 2021 11:10:20 +0800 Subject: [PATCH] move Node to its own file --- meshtastic/__init__.py | 326 +-------------------------------- meshtastic/node.py | 403 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 404 insertions(+), 325 deletions(-) create mode 100644 meshtastic/node.py diff --git a/meshtastic/__init__.py b/meshtastic/__init__.py index 5af04c5..2e028a1 100644 --- a/meshtastic/__init__.py +++ b/meshtastic/__init__.py @@ -69,6 +69,7 @@ import platform import socket from . import mesh_pb2, portnums_pb2, apponly_pb2, admin_pb2, environmental_measurement_pb2, remote_hardware_pb2, channel_pb2, radioconfig_pb2, util from .util import fixme, catchAndIgnore, stripnl, DeferredExecution, Timeout +from .node import Node from pubsub import pub from dotmap import DotMap from typing import * @@ -115,331 +116,6 @@ class KnownProtocol(NamedTuple): onReceive: Callable = None -def pskToString(psk: bytes): - """Given an array of PSK bytes, decode them into a human readable (but privacy protecting) string""" - if len(psk) == 0: - return "unencrypted" - elif len(psk) == 1: - b = psk[0] - if b == 0: - return "unencrypted" - elif b == 1: - return "default" - else: - return f"simple{b - 1}" - else: - return "secret" - - -class Node: - """A model of a (local or remote) node in the mesh - - Includes methods for radioConfig and channels - """ - - def __init__(self, iface, nodeNum): - """Constructor""" - self.iface = iface - self.nodeNum = nodeNum - self.radioConfig = None - self.channels = None - self._timeout = Timeout(maxSecs=60) - - def showChannels(self): - """Show human readable description of our channels""" - print("Channels:") - for c in self.channels: - if c.role != channel_pb2.Channel.Role.DISABLED: - cStr = stripnl(MessageToJson(c.settings)) - print( - f" {channel_pb2.Channel.Role.Name(c.role)} psk={pskToString(c.settings.psk)} {cStr}") - publicURL = self.getURL(includeAll=False) - adminURL = self.getURL(includeAll=True) - print(f"\nPrimary channel URL: {publicURL}") - if adminURL != publicURL: - print(f"Complete URL (includes all channels): {adminURL}") - - def showInfo(self): - """Show human readable description of our node""" - print( - f"Preferences: {stripnl(MessageToJson(self.radioConfig.preferences))}\n") - self.showChannels() - - def requestConfig(self): - """ - Send regular MeshPackets to ask for settings and channels - """ - self.radioConfig = None - self.channels = None - self.partialChannels = [] # We keep our channels in a temp array until finished - - self._requestSettings() - - def waitForConfig(self): - """Block until radio config is received. Returns True if config has been received.""" - return self._timeout.waitForSet(self, attrs=('radioConfig', 'channels')) - - def writeConfig(self): - """Write the current (edited) radioConfig to the device""" - if self.radioConfig == None: - raise Exception("No RadioConfig has been read") - - p = admin_pb2.AdminMessage() - p.set_radio.CopyFrom(self.radioConfig) - - self._sendAdmin(p) - logging.debug("Wrote config") - - def writeChannel(self, channelIndex, adminIndex=0): - """Write the current (edited) channel to the device""" - - p = admin_pb2.AdminMessage() - p.set_channel.CopyFrom(self.channels[channelIndex]) - - self._sendAdmin(p, adminIndex=adminIndex) - logging.debug(f"Wrote channel {channelIndex}") - - def deleteChannel(self, channelIndex): - """Delete the specifed channelIndex and shift other channels up""" - ch = self.channels[channelIndex] - if ch.role != channel_pb2.Channel.Role.SECONDARY: - raise Exception("Only SECONDARY channels can be deleted") - - # we are careful here because if we move the "admin" channel the channelIndex we need to use - # for sending admin channels will also change - adminIndex = self.iface.localNode._getAdminChannelIndex() - - self.channels.pop(channelIndex) - self._fixupChannels() # expand back to 8 channels - - index = channelIndex - while index < self.iface.myInfo.max_channels: - self.writeChannel(index, adminIndex=adminIndex) - index += 1 - - # if we are updating the local node, we might end up *moving* the admin channel index as we are writing - if (self.iface.localNode == self) and index >= adminIndex: - # We've now passed the old location for admin index (and writen it), so we can start finding it by name again - adminIndex = 0 - - def getChannelByName(self, name): - """Try to find the named channel or return None""" - for c in (self.channels or []): - if c.settings and c.settings.name == name: - return c - return None - - def getDisabledChannel(self): - """Return the first channel that is disabled (i.e. available for some new use)""" - for c in self.channels: - if c.role == channel_pb2.Channel.Role.DISABLED: - return c - return None - - def _getAdminChannelIndex(self): - """Return the channel number of the admin channel, or 0 if no reserved channel""" - c = self.getChannelByName("admin") - if c: - return c.index - else: - return 0 - - def setOwner(self, long_name, short_name=None): - """Set device owner name""" - nChars = 3 - minChars = 2 - if long_name is not None: - long_name = long_name.strip() - if short_name is None: - words = long_name.split() - if len(long_name) <= nChars: - short_name = long_name - elif len(words) >= minChars: - short_name = ''.join(map(lambda word: word[0], words)) - else: - trans = str.maketrans(dict.fromkeys('aeiouAEIOU')) - short_name = long_name[0] + long_name[1:].translate(trans) - if len(short_name) < nChars: - short_name = long_name[:nChars] - - p = admin_pb2.AdminMessage() - - if long_name is not None: - p.set_owner.long_name = long_name - if short_name is not None: - short_name = short_name.strip() - if len(short_name) > nChars: - short_name = short_name[:nChars] - p.set_owner.short_name = short_name - - return self._sendAdmin(p) - - def getURL(self, includeAll: bool = True): - """The sharable URL that describes the current channel - """ - # Only keep the primary/secondary channels, assume primary is first - channelSet = apponly_pb2.ChannelSet() - for c in self.channels: - if c.role == channel_pb2.Channel.Role.PRIMARY or (includeAll and c.role == channel_pb2.Channel.Role.SECONDARY): - channelSet.settings.append(c.settings) - bytes = channelSet.SerializeToString() - s = base64.urlsafe_b64encode(bytes).decode('ascii') - return f"https://www.meshtastic.org/d/#{s}".replace("=", "") - - def setURL(self, url): - """Set mesh network URL""" - if self.radioConfig == None: - raise Exception("No RadioConfig has been read") - - # URLs are of the form https://www.meshtastic.org/d/#{base64_channel_set} - # Split on '/#' to find the base64 encoded channel settings - splitURL = url.split("/#") - b64 = splitURL[-1] - - # We normally strip padding to make for a shorter URL, but the python parser doesn't like - # that. So add back any missing padding - # per https://stackoverflow.com/a/9807138 - missing_padding = len(b64) % 4 - if missing_padding: - b64 += '=' * (4 - missing_padding) - - decodedURL = base64.urlsafe_b64decode(b64) - channelSet = apponly_pb2.ChannelSet() - channelSet.ParseFromString(decodedURL) - - i = 0 - for chs in channelSet.settings: - ch = channel_pb2.Channel() - ch.role = channel_pb2.Channel.Role.PRIMARY if i == 0 else channel_pb2.Channel.Role.SECONDARY - ch.index = i - ch.settings.CopyFrom(chs) - self.channels[ch.index] = ch - self.writeChannel(ch.index) - i = i + 1 - - def _requestSettings(self): - """ - Done with initial config messages, now send regular MeshPackets to ask for settings - """ - p = admin_pb2.AdminMessage() - p.get_radio_request = True - - def onResponse(p): - """A closure to handle the response packet""" - self.radioConfig = p["decoded"]["admin"]["raw"].get_radio_response - logging.debug("Received radio config, now fetching channels...") - self._timeout.reset() # We made foreward progress - self._requestChannel(0) # now start fetching channels - - # Show progress message for super slow operations - if self != self.iface.localNode: - logging.info( - "Requesting preferences from remote node (this could take a while)") - - return self._sendAdmin(p, - wantResponse=True, - onResponse=onResponse) - - def exitSimulator(self): - """ - Tell a simulator node to exit (this message is ignored for other nodes) - """ - p = admin_pb2.AdminMessage() - p.exit_simulator = True - - return self._sendAdmin(p) - - def reboot(self, secs: int = 10): - """ - Tell the node to reboot - """ - p = admin_pb2.AdminMessage() - p.reboot_seconds = secs - logging.info(f"Telling node to reboot in {secs} seconds") - - return self._sendAdmin(p) - - def _fixupChannels(self): - """Fixup indexes and add disabled channels as needed""" - - # Add extra disabled channels as needed - for index, ch in enumerate(self.channels): - ch.index = index # fixup indexes - - self._fillChannels() - - def _fillChannels(self): - """Mark unused channels as disabled""" - - # Add extra disabled channels as needed - index = len(self.channels) - while index < self.iface.myInfo.max_channels: - ch = channel_pb2.Channel() - ch.role = channel_pb2.Channel.Role.DISABLED - ch.index = index - self.channels.append(ch) - index += 1 - - def _requestChannel(self, channelNum: int): - """ - Done with initial config messages, now send regular MeshPackets to ask for settings - """ - p = admin_pb2.AdminMessage() - p.get_channel_request = channelNum + 1 - - # Show progress message for super slow operations - if self != self.iface.localNode: - logging.info( - f"Requesting channel {channelNum} info from remote node (this could take a while)") - else: - logging.debug(f"Requesting channel {channelNum}") - - def onResponse(p): - """A closure to handle the response packet""" - c = p["decoded"]["admin"]["raw"].get_channel_response - self.partialChannels.append(c) - self._timeout.reset() # We made foreward progress - logging.debug(f"Received channel {stripnl(c)}") - index = c.index - - # for stress testing, we can always download all channels - fastChannelDownload = True - - # Once we see a response that has NO settings, assume we are at the end of channels and stop fetching - quitEarly = ( - c.role == channel_pb2.Channel.Role.DISABLED) and fastChannelDownload - - if quitEarly or index >= self.iface.myInfo.max_channels - 1: - logging.debug("Finished downloading channels") - - self.channels = self.partialChannels - self._fixupChannels() - - # FIXME, the following should only be called after we have settings and channels - self.iface._connected() # Tell everone else we are ready to go - else: - self._requestChannel(index + 1) - - return self._sendAdmin(p, - wantResponse=True, - onResponse=onResponse) - - def _sendAdmin(self, p: admin_pb2.AdminMessage, wantResponse=False, - onResponse=None, - adminIndex=0): - """Send an admin message to the specified node (or the local node if destNodeNum is zero)""" - - if adminIndex == 0: # unless a special channel index was used, we want to use the admin index - adminIndex = self.iface.localNode._getAdminChannelIndex() - - return self.iface.sendData(p, self.nodeNum, - portNum=portnums_pb2.PortNum.ADMIN_APP, - wantAck=True, - wantResponse=wantResponse, - onResponse=onResponse, - channelIndex=adminIndex) - - class MeshInterface: """Interface class for meshtastic devices diff --git a/meshtastic/node.py b/meshtastic/node.py new file mode 100644 index 0000000..98285d2 --- /dev/null +++ b/meshtastic/node.py @@ -0,0 +1,403 @@ +""" +# an API for Meshtastic devices + +Primary class: SerialInterface +Install with pip: "[pip3 install meshtastic](https://pypi.org/project/meshtastic/)" +Source code on [github](https://github.com/meshtastic/Meshtastic-python) + +properties of SerialInterface: + +- radioConfig - Current radio configuration and device settings, if you write to this the new settings will be applied to +the device. +- nodes - The database of received nodes. Includes always up-to-date location and username information for each +node in the mesh. This is a read-only datastructure. +- nodesByNum - like "nodes" but keyed by nodeNum instead of nodeId +- myInfo - Contains read-only information about the local radio device (software version, hardware version, etc) + +# Published PubSub topics + +We use a [publish-subscribe](https://pypubsub.readthedocs.io/en/v4.0.3/) model to communicate asynchronous events. Available +topics: + +- meshtastic.connection.established - published once we've successfully connected to the radio and downloaded the node DB +- meshtastic.connection.lost - published once we've lost our link to the radio +- meshtastic.receive.text(packet) - delivers a received packet as a dictionary, if you only care about a particular +type of packet, you should subscribe to the full topic name. If you want to see all packets, simply subscribe to "meshtastic.receive". +- meshtastic.receive.position(packet) +- meshtastic.receive.user(packet) +- meshtastic.receive.data.portnum(packet) (where portnum is an integer or well known PortNum enum) +- meshtastic.node.updated(node = NodeInfo) - published when a node in the DB changes (appears, location changed, username changed, etc...) + +We receive position, user, or data packets from the mesh. You probably only care about meshtastic.receive.data. The first argument for +that publish will be the packet. Text or binary data packets (from sendData or sendText) will both arrive this way. If you print packet +you'll see the fields in the dictionary. decoded.data.payload will contain the raw bytes that were sent. If the packet was sent with +sendText, decoded.data.text will **also** be populated with the decoded string. For ASCII these two strings will be the same, but for +unicode scripts they can be different. + +# Example Usage +``` +import meshtastic +from pubsub import pub + +def onReceive(packet, interface): # called when a packet arrives + print(f"Received: {packet}") + +def onConnection(interface, topic=pub.AUTO_TOPIC): # called when we (re)connect to the radio + # defaults to broadcast, specify a destination ID if you wish + interface.sendText("hello mesh") + +pub.subscribe(onReceive, "meshtastic.receive") +pub.subscribe(onConnection, "meshtastic.connection.established") +# By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0 +interface = meshtastic.SerialInterface() + +``` + +""" + +import pygatt +import google.protobuf.json_format +import serial +import threading +import logging +import sys +import random +import traceback +import time +import base64 +import platform +import socket +from . import mesh_pb2, portnums_pb2, apponly_pb2, admin_pb2, environmental_measurement_pb2, remote_hardware_pb2, channel_pb2, radioconfig_pb2, util +from .util import fixme, catchAndIgnore, stripnl, DeferredExecution, Timeout +from pubsub import pub +from dotmap import DotMap +from typing import * +from google.protobuf.json_format import MessageToJson + + + +def pskToString(psk: bytes): + """Given an array of PSK bytes, decode them into a human readable (but privacy protecting) string""" + if len(psk) == 0: + return "unencrypted" + elif len(psk) == 1: + b = psk[0] + if b == 0: + return "unencrypted" + elif b == 1: + return "default" + else: + return f"simple{b - 1}" + else: + return "secret" + + +class Node: + """A model of a (local or remote) node in the mesh + + Includes methods for radioConfig and channels + """ + + def __init__(self, iface, nodeNum): + """Constructor""" + self.iface = iface + self.nodeNum = nodeNum + self.radioConfig = None + self.channels = None + self._timeout = Timeout(maxSecs=60) + + def showChannels(self): + """Show human readable description of our channels""" + print("Channels:") + for c in self.channels: + if c.role != channel_pb2.Channel.Role.DISABLED: + cStr = stripnl(MessageToJson(c.settings)) + print( + f" {channel_pb2.Channel.Role.Name(c.role)} psk={pskToString(c.settings.psk)} {cStr}") + publicURL = self.getURL(includeAll=False) + adminURL = self.getURL(includeAll=True) + print(f"\nPrimary channel URL: {publicURL}") + if adminURL != publicURL: + print(f"Complete URL (includes all channels): {adminURL}") + + def showInfo(self): + """Show human readable description of our node""" + print( + f"Preferences: {stripnl(MessageToJson(self.radioConfig.preferences))}\n") + self.showChannels() + + def requestConfig(self): + """ + Send regular MeshPackets to ask for settings and channels + """ + self.radioConfig = None + self.channels = None + self.partialChannels = [] # We keep our channels in a temp array until finished + + self._requestSettings() + + def waitForConfig(self): + """Block until radio config is received. Returns True if config has been received.""" + return self._timeout.waitForSet(self, attrs=('radioConfig', 'channels')) + + def writeConfig(self): + """Write the current (edited) radioConfig to the device""" + if self.radioConfig == None: + raise Exception("No RadioConfig has been read") + + p = admin_pb2.AdminMessage() + p.set_radio.CopyFrom(self.radioConfig) + + self._sendAdmin(p) + logging.debug("Wrote config") + + def writeChannel(self, channelIndex, adminIndex=0): + """Write the current (edited) channel to the device""" + + p = admin_pb2.AdminMessage() + p.set_channel.CopyFrom(self.channels[channelIndex]) + + self._sendAdmin(p, adminIndex=adminIndex) + logging.debug(f"Wrote channel {channelIndex}") + + def deleteChannel(self, channelIndex): + """Delete the specifed channelIndex and shift other channels up""" + ch = self.channels[channelIndex] + if ch.role != channel_pb2.Channel.Role.SECONDARY: + raise Exception("Only SECONDARY channels can be deleted") + + # we are careful here because if we move the "admin" channel the channelIndex we need to use + # for sending admin channels will also change + adminIndex = self.iface.localNode._getAdminChannelIndex() + + self.channels.pop(channelIndex) + self._fixupChannels() # expand back to 8 channels + + index = channelIndex + while index < self.iface.myInfo.max_channels: + self.writeChannel(index, adminIndex=adminIndex) + index += 1 + + # if we are updating the local node, we might end up *moving* the admin channel index as we are writing + if (self.iface.localNode == self) and index >= adminIndex: + # We've now passed the old location for admin index (and writen it), so we can start finding it by name again + adminIndex = 0 + + def getChannelByName(self, name): + """Try to find the named channel or return None""" + for c in (self.channels or []): + if c.settings and c.settings.name == name: + return c + return None + + def getDisabledChannel(self): + """Return the first channel that is disabled (i.e. available for some new use)""" + for c in self.channels: + if c.role == channel_pb2.Channel.Role.DISABLED: + return c + return None + + def _getAdminChannelIndex(self): + """Return the channel number of the admin channel, or 0 if no reserved channel""" + c = self.getChannelByName("admin") + if c: + return c.index + else: + return 0 + + def setOwner(self, long_name, short_name=None): + """Set device owner name""" + nChars = 3 + minChars = 2 + if long_name is not None: + long_name = long_name.strip() + if short_name is None: + words = long_name.split() + if len(long_name) <= nChars: + short_name = long_name + elif len(words) >= minChars: + short_name = ''.join(map(lambda word: word[0], words)) + else: + trans = str.maketrans(dict.fromkeys('aeiouAEIOU')) + short_name = long_name[0] + long_name[1:].translate(trans) + if len(short_name) < nChars: + short_name = long_name[:nChars] + + p = admin_pb2.AdminMessage() + + if long_name is not None: + p.set_owner.long_name = long_name + if short_name is not None: + short_name = short_name.strip() + if len(short_name) > nChars: + short_name = short_name[:nChars] + p.set_owner.short_name = short_name + + return self._sendAdmin(p) + + def getURL(self, includeAll: bool = True): + """The sharable URL that describes the current channel + """ + # Only keep the primary/secondary channels, assume primary is first + channelSet = apponly_pb2.ChannelSet() + for c in self.channels: + if c.role == channel_pb2.Channel.Role.PRIMARY or (includeAll and c.role == channel_pb2.Channel.Role.SECONDARY): + channelSet.settings.append(c.settings) + bytes = channelSet.SerializeToString() + s = base64.urlsafe_b64encode(bytes).decode('ascii') + return f"https://www.meshtastic.org/d/#{s}".replace("=", "") + + def setURL(self, url): + """Set mesh network URL""" + if self.radioConfig == None: + raise Exception("No RadioConfig has been read") + + # URLs are of the form https://www.meshtastic.org/d/#{base64_channel_set} + # Split on '/#' to find the base64 encoded channel settings + splitURL = url.split("/#") + b64 = splitURL[-1] + + # We normally strip padding to make for a shorter URL, but the python parser doesn't like + # that. So add back any missing padding + # per https://stackoverflow.com/a/9807138 + missing_padding = len(b64) % 4 + if missing_padding: + b64 += '=' * (4 - missing_padding) + + decodedURL = base64.urlsafe_b64decode(b64) + channelSet = apponly_pb2.ChannelSet() + channelSet.ParseFromString(decodedURL) + + i = 0 + for chs in channelSet.settings: + ch = channel_pb2.Channel() + ch.role = channel_pb2.Channel.Role.PRIMARY if i == 0 else channel_pb2.Channel.Role.SECONDARY + ch.index = i + ch.settings.CopyFrom(chs) + self.channels[ch.index] = ch + self.writeChannel(ch.index) + i = i + 1 + + def _requestSettings(self): + """ + Done with initial config messages, now send regular MeshPackets to ask for settings + """ + p = admin_pb2.AdminMessage() + p.get_radio_request = True + + def onResponse(p): + """A closure to handle the response packet""" + self.radioConfig = p["decoded"]["admin"]["raw"].get_radio_response + logging.debug("Received radio config, now fetching channels...") + self._timeout.reset() # We made foreward progress + self._requestChannel(0) # now start fetching channels + + # Show progress message for super slow operations + if self != self.iface.localNode: + logging.info( + "Requesting preferences from remote node (this could take a while)") + + return self._sendAdmin(p, + wantResponse=True, + onResponse=onResponse) + + def exitSimulator(self): + """ + Tell a simulator node to exit (this message is ignored for other nodes) + """ + p = admin_pb2.AdminMessage() + p.exit_simulator = True + + return self._sendAdmin(p) + + def reboot(self, secs: int = 10): + """ + Tell the node to reboot + """ + p = admin_pb2.AdminMessage() + p.reboot_seconds = secs + logging.info(f"Telling node to reboot in {secs} seconds") + + return self._sendAdmin(p) + + def _fixupChannels(self): + """Fixup indexes and add disabled channels as needed""" + + # Add extra disabled channels as needed + for index, ch in enumerate(self.channels): + ch.index = index # fixup indexes + + self._fillChannels() + + def _fillChannels(self): + """Mark unused channels as disabled""" + + # Add extra disabled channels as needed + index = len(self.channels) + while index < self.iface.myInfo.max_channels: + ch = channel_pb2.Channel() + ch.role = channel_pb2.Channel.Role.DISABLED + ch.index = index + self.channels.append(ch) + index += 1 + + def _requestChannel(self, channelNum: int): + """ + Done with initial config messages, now send regular MeshPackets to ask for settings + """ + p = admin_pb2.AdminMessage() + p.get_channel_request = channelNum + 1 + + # Show progress message for super slow operations + if self != self.iface.localNode: + logging.info( + f"Requesting channel {channelNum} info from remote node (this could take a while)") + else: + logging.debug(f"Requesting channel {channelNum}") + + def onResponse(p): + """A closure to handle the response packet""" + c = p["decoded"]["admin"]["raw"].get_channel_response + self.partialChannels.append(c) + self._timeout.reset() # We made foreward progress + logging.debug(f"Received channel {stripnl(c)}") + index = c.index + + # for stress testing, we can always download all channels + fastChannelDownload = True + + # Once we see a response that has NO settings, assume we are at the end of channels and stop fetching + quitEarly = ( + c.role == channel_pb2.Channel.Role.DISABLED) and fastChannelDownload + + if quitEarly or index >= self.iface.myInfo.max_channels - 1: + logging.debug("Finished downloading channels") + + self.channels = self.partialChannels + self._fixupChannels() + + # FIXME, the following should only be called after we have settings and channels + self.iface._connected() # Tell everone else we are ready to go + else: + self._requestChannel(index + 1) + + return self._sendAdmin(p, + wantResponse=True, + onResponse=onResponse) + + def _sendAdmin(self, p: admin_pb2.AdminMessage, wantResponse=False, + onResponse=None, + adminIndex=0): + """Send an admin message to the specified node (or the local node if destNodeNum is zero)""" + + if adminIndex == 0: # unless a special channel index was used, we want to use the admin index + adminIndex = self.iface.localNode._getAdminChannelIndex() + + return self.iface.sendData(p, self.nodeNum, + portNum=portnums_pb2.PortNum.ADMIN_APP, + wantAck=True, + wantResponse=wantResponse, + onResponse=onResponse, + channelIndex=adminIndex) + +