diff --git a/docs/meshtastic/index.html b/docs/meshtastic/index.html index 7ecc916..e74d5f7 100644 --- a/docs/meshtastic/index.html +++ b/docs/meshtastic/index.html @@ -191,6 +191,16 @@ class MeshInterface: if not noProto: self._startConfig() + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is not None and exc_value is not None: + logging.error(f'An exception of type {exc_type} with value {exc_value} has occurred') + if traceback is not None: + logging.error(f'Traceback: {traceback}') + self.close() + def sendText(self, text, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False): """Send a utf8 string to some other node, if the node has a display it will also be shown on the device. @@ -277,6 +287,14 @@ class MeshInterface: self._sendToRadio(toRadio) return meshPacket + def waitForConfig(self, sleep=.1, maxsecs=20, attrs=('myInfo', 'nodes', 'radioConfig')): + """Block until radio config is received. Returns True if config has been received.""" + for _ in range(int(maxsecs/sleep)): + if all(map(lambda a: getattr(self, a, None), attrs)): + return True + time.sleep(sleep) + return False + def writeConfig(self): """Write the current (edited) radioConfig to the device""" if self.radioConfig == None: @@ -286,6 +304,55 @@ class MeshInterface: t.set_radio.CopyFrom(self.radioConfig) self._sendToRadio(t) + def getMyNode(self): + if self.myInfo is None: + return None + myId = self.myInfo.my_node_num + for _, nodeDict in self.nodes.items(): + if 'num' in nodeDict and nodeDict['num'] == myId: + if 'user' in nodeDict: + return nodeDict['user'] + return None + + def getLongName(self): + user = self.getMyNode() + if user is not None: + return user.get('longName', None) + return None + + def getShortName(self): + user = self.getMyNode() + if user is not None: + return user.get('shortName', None) + return None + + def setOwner(self, long_name, short_name=None): + """Set device owner name""" + nChars = 3 + minChars = 2 + if long_name is not None: + long_name = long_name.strip() + if short_name is None: + words = long_name.split() + if len(long_name) <= nChars: + short_name = long_name + elif len(words) >= minChars: + short_name = ''.join(map(lambda word: word[0], words)) + else: + trans = str.maketrans(dict.fromkeys('aeiouAEIOU')) + short_name = long_name[0] + long_name[1:].translate(trans) + if len(short_name) < nChars: + short_name = long_name[:nChars] + t = mesh_pb2.ToRadio() + if long_name is not None: + t.set_owner.long_name = long_name + if short_name is not None: + short_name = short_name.strip() + if len(short_name) > nChars: + short_name = short_name[:nChars] + t.set_owner.short_name = short_name + self._sendToRadio(t) + @property def channelURL(self): """The sharable URL that describes the current channel @@ -294,6 +361,19 @@ class MeshInterface: s = base64.urlsafe_b64encode(bytes).decode('ascii') return f"https://www.meshtastic.org/c/#{s}" + def setURL(self, url, write=True): + """Set mesh network URL""" + if self.radioConfig == None: + raise Exception("No RadioConfig has been read") + + # URLs are of the form https://www.meshtastic.org/c/#{base64_channel_settings} + # Split on '/#' to find the base64 encoded channel settings + splitURL = url.split("/#") + decodedURL = base64.urlsafe_b64decode(splitURL[-1]) + self.radioConfig.channel_settings.ParseFromString(decodedURL) + if write: + self.writeConfig() + def _generatePacketId(self): """Get a new unique packet ID""" if self.currentPacketId is None: @@ -357,6 +437,7 @@ class MeshInterface: self._nodesByNum[node["num"]] = node if "user" in node: # Some nodes might not have user/ids assigned yet self.nodes[node["user"]["id"]] = node + pub.sendMessage("meshtastic.node.updated", node=node, interface=self) elif fromRadio.config_complete_id == MY_CONFIG_ID: # we ignore the config_complete_id, it is unneeded for our stream API fromRadio.config_complete_id self._connected() @@ -847,6 +928,9 @@ class TCPInterface(StreamInterface):
sendPacketsendPositionsendTextsetOwnersetURLwaitForConfigwriteConfig
+def getLongName(self)
+def getLongName(self):
+ user = self.getMyNode()
+ if user is not None:
+ return user.get('longName', None)
+ return None
+
+def getMyNode(self)
+def getMyNode(self):
+ if self.myInfo is None:
+ return None
+ myId = self.myInfo.my_node_num
+ for _, nodeDict in self.nodes.items():
+ if 'num' in nodeDict and nodeDict['num'] == myId:
+ if 'user' in nodeDict:
+ return nodeDict['user']
+ return None
+
+def getShortName(self)
+def getShortName(self):
+ user = self.getMyNode()
+ if user is not None:
+ return user.get('shortName', None)
+ return None
+
def sendData(self, byteData, destinationId='^all', dataType=0, wantAck=False, wantResponse=False)
+def setOwner(self, long_name, short_name=None)
+Set device owner name
def setOwner(self, long_name, short_name=None):
+ """Set device owner name"""
+ nChars = 3
+ minChars = 2
+ if long_name is not None:
+ long_name = long_name.strip()
+ if short_name is None:
+ words = long_name.split()
+ if len(long_name) <= nChars:
+ short_name = long_name
+ elif len(words) >= minChars:
+ short_name = ''.join(map(lambda word: word[0], words))
+ else:
+ trans = str.maketrans(dict.fromkeys('aeiouAEIOU'))
+ short_name = long_name[0] + long_name[1:].translate(trans)
+ if len(short_name) < nChars:
+ short_name = long_name[:nChars]
+ t = mesh_pb2.ToRadio()
+ if long_name is not None:
+ t.set_owner.long_name = long_name
+ if short_name is not None:
+ short_name = short_name.strip()
+ if len(short_name) > nChars:
+ short_name = short_name[:nChars]
+ t.set_owner.short_name = short_name
+ self._sendToRadio(t)
+
+def setURL(self, url, write=True)
+Set mesh network URL
def setURL(self, url, write=True):
+ """Set mesh network URL"""
+ if self.radioConfig == None:
+ raise Exception("No RadioConfig has been read")
+
+ # URLs are of the form https://www.meshtastic.org/c/#{base64_channel_settings}
+ # Split on '/#' to find the base64 encoded channel settings
+ splitURL = url.split("/#")
+ decodedURL = base64.urlsafe_b64decode(splitURL[-1])
+ self.radioConfig.channel_settings.ParseFromString(decodedURL)
+ if write:
+ self.writeConfig()
+
+def waitForConfig(self, sleep=0.1, maxsecs=20, attrs=('myInfo', 'nodes', 'radioConfig'))
+Block until radio config is received. Returns True if config has been received.
def waitForConfig(self, sleep=.1, maxsecs=20, attrs=('myInfo', 'nodes', 'radioConfig')):
+ """Block until radio config is received. Returns True if config has been received."""
+ for _ in range(int(maxsecs/sleep)):
+ if all(map(lambda a: getattr(self, a, None), attrs)):
+ return True
+ time.sleep(sleep)
+ return False
+
def writeConfig(self)
sendPacketsendPositionsendTextsetOwnersetURLwaitForConfigwriteConfigsendPacketsendPositionsendTextsetOwnersetURLwaitForConfigwriteConfigsendPacket
sendPositionsendTextsetOwnersetURLwaitForConfigwriteConfigMeshInterface
diff --git a/setup.py b/setup.py
index 4097478..9fa0407 100644
--- a/setup.py
+++ b/setup.py
@@ -12,7 +12,7 @@ with open("README.md", "r") as fh:
# This call to setup() does all the work
setup(
name="meshtastic",
- version="1.1.6",
+ version="1.1.7",
description="Python API & client shell for talking to Meshtastic devices",
long_description=long_description,
long_description_content_type="text/markdown",