Module meshtastic.node
an API for Meshtastic devices
Primary class: SerialInterface Install with pip: "pip3 install meshtastic" Source code on github
properties of SerialInterface:
- radioConfig - Current radio configuration and device settings, if you write to this the new settings will be applied to the device.
- nodes - The database of received nodes. Includes always up-to-date location and username information for each node in the mesh. This is a read-only datastructure.
- nodesByNum - like "nodes" but keyed by nodeNum instead of nodeId
- myInfo - Contains read-only information about the local radio device (software version, hardware version, etc)
Published PubSub topics
We use a publish-subscribe model to communicate asynchronous events. Available topics:
- meshtastic.connection.established - published once we've successfully connected to the radio and downloaded the node DB
- meshtastic.connection.lost - published once we've lost our link to the radio
- meshtastic.receive.text(packet) - delivers a received packet as a dictionary, if you only care about a particular type of packet, you should subscribe to the full topic name. If you want to see all packets, simply subscribe to "meshtastic.receive".
- meshtastic.receive.position(packet)
- meshtastic.receive.user(packet)
- meshtastic.receive.data.portnum(packet) (where portnum is an integer or well known PortNum enum)
- meshtastic.node.updated(node = NodeInfo) - published when a node in the DB changes (appears, location changed, username changed, etc…)
We receive position, user, or data packets from the mesh. You probably only care about meshtastic.receive.data. The first argument for that publish will be the packet. Text or binary data packets (from sendData or sendText) will both arrive this way. If you print packet you'll see the fields in the dictionary. decoded.data.payload will contain the raw bytes that were sent. If the packet was sent with sendText, decoded.data.text will also be populated with the decoded string. For ASCII these two strings will be the same, but for unicode scripts they can be different.
Example Usage
import meshtastic
from pubsub import pub
def onReceive(packet, interface): # called when a packet arrives
print(f"Received: {packet}")
def onConnection(interface, topic=pub.AUTO_TOPIC): # called when we (re)connect to the radio
# defaults to broadcast, specify a destination ID if you wish
interface.sendText("hello mesh")
pub.subscribe(onReceive, "meshtastic.receive")
pub.subscribe(onConnection, "meshtastic.connection.established")
# By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0
interface = meshtastic.SerialInterface()
Expand source code
"""
# an API for Meshtastic devices
Primary class: SerialInterface
Install with pip: "[pip3 install meshtastic](https://pypi.org/project/meshtastic/)"
Source code on [github](https://github.com/meshtastic/Meshtastic-python)
properties of SerialInterface:
- radioConfig - Current radio configuration and device settings, if you write to this the new settings will be applied to
the device.
- nodes - The database of received nodes. Includes always up-to-date location and username information for each
node in the mesh. This is a read-only datastructure.
- nodesByNum - like "nodes" but keyed by nodeNum instead of nodeId
- myInfo - Contains read-only information about the local radio device (software version, hardware version, etc)
# Published PubSub topics
We use a [publish-subscribe](https://pypubsub.readthedocs.io/en/v4.0.3/) model to communicate asynchronous events. Available
topics:
- meshtastic.connection.established - published once we've successfully connected to the radio and downloaded the node DB
- meshtastic.connection.lost - published once we've lost our link to the radio
- meshtastic.receive.text(packet) - delivers a received packet as a dictionary, if you only care about a particular
type of packet, you should subscribe to the full topic name. If you want to see all packets, simply subscribe to "meshtastic.receive".
- meshtastic.receive.position(packet)
- meshtastic.receive.user(packet)
- meshtastic.receive.data.portnum(packet) (where portnum is an integer or well known PortNum enum)
- meshtastic.node.updated(node = NodeInfo) - published when a node in the DB changes (appears, location changed, username changed, etc...)
We receive position, user, or data packets from the mesh. You probably only care about meshtastic.receive.data. The first argument for
that publish will be the packet. Text or binary data packets (from sendData or sendText) will both arrive this way. If you print packet
you'll see the fields in the dictionary. decoded.data.payload will contain the raw bytes that were sent. If the packet was sent with
sendText, decoded.data.text will **also** be populated with the decoded string. For ASCII these two strings will be the same, but for
unicode scripts they can be different.
# Example Usage
```
import meshtastic
from pubsub import pub
def onReceive(packet, interface): # called when a packet arrives
print(f"Received: {packet}")
def onConnection(interface, topic=pub.AUTO_TOPIC): # called when we (re)connect to the radio
# defaults to broadcast, specify a destination ID if you wish
interface.sendText("hello mesh")
pub.subscribe(onReceive, "meshtastic.receive")
pub.subscribe(onConnection, "meshtastic.connection.established")
# By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0
interface = meshtastic.SerialInterface()
```
"""
import pygatt
import google.protobuf.json_format
import serial
import threading
import logging
import sys
import random
import traceback
import time
import base64
import platform
import socket
from . import mesh_pb2, portnums_pb2, apponly_pb2, admin_pb2, environmental_measurement_pb2, remote_hardware_pb2, channel_pb2, radioconfig_pb2, util
from .util import fixme, catchAndIgnore, stripnl, DeferredExecution, Timeout
from pubsub import pub
from dotmap import DotMap
from typing import *
from google.protobuf.json_format import MessageToJson
def pskToString(psk: bytes):
"""Given an array of PSK bytes, decode them into a human readable (but privacy protecting) string"""
if len(psk) == 0:
return "unencrypted"
elif len(psk) == 1:
b = psk[0]
if b == 0:
return "unencrypted"
elif b == 1:
return "default"
else:
return f"simple{b - 1}"
else:
return "secret"
class Node:
"""A model of a (local or remote) node in the mesh
Includes methods for radioConfig and channels
"""
def __init__(self, iface, nodeNum):
"""Constructor"""
self.iface = iface
self.nodeNum = nodeNum
self.radioConfig = None
self.channels = None
self._timeout = Timeout(maxSecs=60)
def showChannels(self):
"""Show human readable description of our channels"""
print("Channels:")
for c in self.channels:
if c.role != channel_pb2.Channel.Role.DISABLED:
cStr = stripnl(MessageToJson(c.settings))
print(
f" {channel_pb2.Channel.Role.Name(c.role)} psk={pskToString(c.settings.psk)} {cStr}")
publicURL = self.getURL(includeAll=False)
adminURL = self.getURL(includeAll=True)
print(f"\nPrimary channel URL: {publicURL}")
if adminURL != publicURL:
print(f"Complete URL (includes all channels): {adminURL}")
def showInfo(self):
"""Show human readable description of our node"""
print(
f"Preferences: {stripnl(MessageToJson(self.radioConfig.preferences))}\n")
self.showChannels()
def requestConfig(self):
"""
Send regular MeshPackets to ask for settings and channels
"""
self.radioConfig = None
self.channels = None
self.partialChannels = [] # We keep our channels in a temp array until finished
self._requestSettings()
def waitForConfig(self):
"""Block until radio config is received. Returns True if config has been received."""
return self._timeout.waitForSet(self, attrs=('radioConfig', 'channels'))
def writeConfig(self):
"""Write the current (edited) radioConfig to the device"""
if self.radioConfig == None:
raise Exception("No RadioConfig has been read")
p = admin_pb2.AdminMessage()
p.set_radio.CopyFrom(self.radioConfig)
self._sendAdmin(p)
logging.debug("Wrote config")
def writeChannel(self, channelIndex, adminIndex=0):
"""Write the current (edited) channel to the device"""
p = admin_pb2.AdminMessage()
p.set_channel.CopyFrom(self.channels[channelIndex])
self._sendAdmin(p, adminIndex=adminIndex)
logging.debug(f"Wrote channel {channelIndex}")
def deleteChannel(self, channelIndex):
"""Delete the specifed channelIndex and shift other channels up"""
ch = self.channels[channelIndex]
if ch.role != channel_pb2.Channel.Role.SECONDARY:
raise Exception("Only SECONDARY channels can be deleted")
# we are careful here because if we move the "admin" channel the channelIndex we need to use
# for sending admin channels will also change
adminIndex = self.iface.localNode._getAdminChannelIndex()
self.channels.pop(channelIndex)
self._fixupChannels() # expand back to 8 channels
index = channelIndex
while index < self.iface.myInfo.max_channels:
self.writeChannel(index, adminIndex=adminIndex)
index += 1
# if we are updating the local node, we might end up *moving* the admin channel index as we are writing
if (self.iface.localNode == self) and index >= adminIndex:
# We've now passed the old location for admin index (and writen it), so we can start finding it by name again
adminIndex = 0
def getChannelByName(self, name):
"""Try to find the named channel or return None"""
for c in (self.channels or []):
if c.settings and c.settings.name == name:
return c
return None
def getDisabledChannel(self):
"""Return the first channel that is disabled (i.e. available for some new use)"""
for c in self.channels:
if c.role == channel_pb2.Channel.Role.DISABLED:
return c
return None
def _getAdminChannelIndex(self):
"""Return the channel number of the admin channel, or 0 if no reserved channel"""
c = self.getChannelByName("admin")
if c:
return c.index
else:
return 0
def setOwner(self, long_name, short_name=None, is_licensed=False):
"""Set device owner name"""
nChars = 3
minChars = 2
if long_name is not None:
long_name = long_name.strip()
if short_name is None:
words = long_name.split()
if len(long_name) <= nChars:
short_name = long_name
elif len(words) >= minChars:
short_name = ''.join(map(lambda word: word[0], words))
else:
trans = str.maketrans(dict.fromkeys('aeiouAEIOU'))
short_name = long_name[0] + long_name[1:].translate(trans)
if len(short_name) < nChars:
short_name = long_name[:nChars]
p = admin_pb2.AdminMessage()
if long_name is not None:
p.set_owner.long_name = long_name
if short_name is not None:
short_name = short_name.strip()
if len(short_name) > nChars:
short_name = short_name[:nChars]
p.set_owner.short_name = short_name
p.set_owner.is_licensed = is_licensed
return self._sendAdmin(p)
def getURL(self, includeAll: bool = True):
"""The sharable URL that describes the current channel
"""
# Only keep the primary/secondary channels, assume primary is first
channelSet = apponly_pb2.ChannelSet()
for c in self.channels:
if c.role == channel_pb2.Channel.Role.PRIMARY or (includeAll and c.role == channel_pb2.Channel.Role.SECONDARY):
channelSet.settings.append(c.settings)
bytes = channelSet.SerializeToString()
s = base64.urlsafe_b64encode(bytes).decode('ascii')
return f"https://www.meshtastic.org/d/#{s}".replace("=", "")
def setURL(self, url):
"""Set mesh network URL"""
if self.radioConfig == None:
raise Exception("No RadioConfig has been read")
# URLs are of the form https://www.meshtastic.org/d/#{base64_channel_set}
# Split on '/#' to find the base64 encoded channel settings
splitURL = url.split("/#")
b64 = splitURL[-1]
# We normally strip padding to make for a shorter URL, but the python parser doesn't like
# that. So add back any missing padding
# per https://stackoverflow.com/a/9807138
missing_padding = len(b64) % 4
if missing_padding:
b64 += '=' * (4 - missing_padding)
decodedURL = base64.urlsafe_b64decode(b64)
channelSet = apponly_pb2.ChannelSet()
channelSet.ParseFromString(decodedURL)
i = 0
for chs in channelSet.settings:
ch = channel_pb2.Channel()
ch.role = channel_pb2.Channel.Role.PRIMARY if i == 0 else channel_pb2.Channel.Role.SECONDARY
ch.index = i
ch.settings.CopyFrom(chs)
self.channels[ch.index] = ch
self.writeChannel(ch.index)
i = i + 1
def _requestSettings(self):
"""
Done with initial config messages, now send regular MeshPackets to ask for settings
"""
p = admin_pb2.AdminMessage()
p.get_radio_request = True
def onResponse(p):
"""A closure to handle the response packet"""
self.radioConfig = p["decoded"]["admin"]["raw"].get_radio_response
logging.debug("Received radio config, now fetching channels...")
self._timeout.reset() # We made foreward progress
self._requestChannel(0) # now start fetching channels
# Show progress message for super slow operations
if self != self.iface.localNode:
logging.info(
"Requesting preferences from remote node (this could take a while)")
return self._sendAdmin(p,
wantResponse=True,
onResponse=onResponse)
def exitSimulator(self):
"""
Tell a simulator node to exit (this message is ignored for other nodes)
"""
p = admin_pb2.AdminMessage()
p.exit_simulator = True
return self._sendAdmin(p)
def reboot(self, secs: int = 10):
"""
Tell the node to reboot
"""
p = admin_pb2.AdminMessage()
p.reboot_seconds = secs
logging.info(f"Telling node to reboot in {secs} seconds")
return self._sendAdmin(p)
def _fixupChannels(self):
"""Fixup indexes and add disabled channels as needed"""
# Add extra disabled channels as needed
for index, ch in enumerate(self.channels):
ch.index = index # fixup indexes
self._fillChannels()
def _fillChannels(self):
"""Mark unused channels as disabled"""
# Add extra disabled channels as needed
index = len(self.channels)
while index < self.iface.myInfo.max_channels:
ch = channel_pb2.Channel()
ch.role = channel_pb2.Channel.Role.DISABLED
ch.index = index
self.channels.append(ch)
index += 1
def _requestChannel(self, channelNum: int):
"""
Done with initial config messages, now send regular MeshPackets to ask for settings
"""
p = admin_pb2.AdminMessage()
p.get_channel_request = channelNum + 1
# Show progress message for super slow operations
if self != self.iface.localNode:
logging.info(
f"Requesting channel {channelNum} info from remote node (this could take a while)")
else:
logging.debug(f"Requesting channel {channelNum}")
def onResponse(p):
"""A closure to handle the response packet"""
c = p["decoded"]["admin"]["raw"].get_channel_response
self.partialChannels.append(c)
self._timeout.reset() # We made foreward progress
logging.debug(f"Received channel {stripnl(c)}")
index = c.index
# for stress testing, we can always download all channels
fastChannelDownload = True
# Once we see a response that has NO settings, assume we are at the end of channels and stop fetching
quitEarly = (
c.role == channel_pb2.Channel.Role.DISABLED) and fastChannelDownload
if quitEarly or index >= self.iface.myInfo.max_channels - 1:
logging.debug("Finished downloading channels")
self.channels = self.partialChannels
self._fixupChannels()
# FIXME, the following should only be called after we have settings and channels
self.iface._connected() # Tell everone else we are ready to go
else:
self._requestChannel(index + 1)
return self._sendAdmin(p,
wantResponse=True,
onResponse=onResponse)
def _sendAdmin(self, p: admin_pb2.AdminMessage, wantResponse=False,
onResponse=None,
adminIndex=0):
"""Send an admin message to the specified node (or the local node if destNodeNum is zero)"""
if adminIndex == 0: # unless a special channel index was used, we want to use the admin index
adminIndex = self.iface.localNode._getAdminChannelIndex()
return self.iface.sendData(p, self.nodeNum,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=True,
wantResponse=wantResponse,
onResponse=onResponse,
channelIndex=adminIndex)
Functions
def pskToString(psk: bytes)-
Given an array of PSK bytes, decode them into a human readable (but privacy protecting) string
Expand source code
def pskToString(psk: bytes): """Given an array of PSK bytes, decode them into a human readable (but privacy protecting) string""" if len(psk) == 0: return "unencrypted" elif len(psk) == 1: b = psk[0] if b == 0: return "unencrypted" elif b == 1: return "default" else: return f"simple{b - 1}" else: return "secret"
Classes
class Node (iface, nodeNum)-
A model of a (local or remote) node in the mesh
Includes methods for radioConfig and channels
Constructor
Expand source code
class Node: """A model of a (local or remote) node in the mesh Includes methods for radioConfig and channels """ def __init__(self, iface, nodeNum): """Constructor""" self.iface = iface self.nodeNum = nodeNum self.radioConfig = None self.channels = None self._timeout = Timeout(maxSecs=60) def showChannels(self): """Show human readable description of our channels""" print("Channels:") for c in self.channels: if c.role != channel_pb2.Channel.Role.DISABLED: cStr = stripnl(MessageToJson(c.settings)) print( f" {channel_pb2.Channel.Role.Name(c.role)} psk={pskToString(c.settings.psk)} {cStr}") publicURL = self.getURL(includeAll=False) adminURL = self.getURL(includeAll=True) print(f"\nPrimary channel URL: {publicURL}") if adminURL != publicURL: print(f"Complete URL (includes all channels): {adminURL}") def showInfo(self): """Show human readable description of our node""" print( f"Preferences: {stripnl(MessageToJson(self.radioConfig.preferences))}\n") self.showChannels() def requestConfig(self): """ Send regular MeshPackets to ask for settings and channels """ self.radioConfig = None self.channels = None self.partialChannels = [] # We keep our channels in a temp array until finished self._requestSettings() def waitForConfig(self): """Block until radio config is received. Returns True if config has been received.""" return self._timeout.waitForSet(self, attrs=('radioConfig', 'channels')) def writeConfig(self): """Write the current (edited) radioConfig to the device""" if self.radioConfig == None: raise Exception("No RadioConfig has been read") p = admin_pb2.AdminMessage() p.set_radio.CopyFrom(self.radioConfig) self._sendAdmin(p) logging.debug("Wrote config") def writeChannel(self, channelIndex, adminIndex=0): """Write the current (edited) channel to the device""" p = admin_pb2.AdminMessage() p.set_channel.CopyFrom(self.channels[channelIndex]) self._sendAdmin(p, adminIndex=adminIndex) logging.debug(f"Wrote channel {channelIndex}") def deleteChannel(self, channelIndex): """Delete the specifed channelIndex and shift other channels up""" ch = self.channels[channelIndex] if ch.role != channel_pb2.Channel.Role.SECONDARY: raise Exception("Only SECONDARY channels can be deleted") # we are careful here because if we move the "admin" channel the channelIndex we need to use # for sending admin channels will also change adminIndex = self.iface.localNode._getAdminChannelIndex() self.channels.pop(channelIndex) self._fixupChannels() # expand back to 8 channels index = channelIndex while index < self.iface.myInfo.max_channels: self.writeChannel(index, adminIndex=adminIndex) index += 1 # if we are updating the local node, we might end up *moving* the admin channel index as we are writing if (self.iface.localNode == self) and index >= adminIndex: # We've now passed the old location for admin index (and writen it), so we can start finding it by name again adminIndex = 0 def getChannelByName(self, name): """Try to find the named channel or return None""" for c in (self.channels or []): if c.settings and c.settings.name == name: return c return None def getDisabledChannel(self): """Return the first channel that is disabled (i.e. available for some new use)""" for c in self.channels: if c.role == channel_pb2.Channel.Role.DISABLED: return c return None def _getAdminChannelIndex(self): """Return the channel number of the admin channel, or 0 if no reserved channel""" c = self.getChannelByName("admin") if c: return c.index else: return 0 def setOwner(self, long_name, short_name=None, is_licensed=False): """Set device owner name""" nChars = 3 minChars = 2 if long_name is not None: long_name = long_name.strip() if short_name is None: words = long_name.split() if len(long_name) <= nChars: short_name = long_name elif len(words) >= minChars: short_name = ''.join(map(lambda word: word[0], words)) else: trans = str.maketrans(dict.fromkeys('aeiouAEIOU')) short_name = long_name[0] + long_name[1:].translate(trans) if len(short_name) < nChars: short_name = long_name[:nChars] p = admin_pb2.AdminMessage() if long_name is not None: p.set_owner.long_name = long_name if short_name is not None: short_name = short_name.strip() if len(short_name) > nChars: short_name = short_name[:nChars] p.set_owner.short_name = short_name p.set_owner.is_licensed = is_licensed return self._sendAdmin(p) def getURL(self, includeAll: bool = True): """The sharable URL that describes the current channel """ # Only keep the primary/secondary channels, assume primary is first channelSet = apponly_pb2.ChannelSet() for c in self.channels: if c.role == channel_pb2.Channel.Role.PRIMARY or (includeAll and c.role == channel_pb2.Channel.Role.SECONDARY): channelSet.settings.append(c.settings) bytes = channelSet.SerializeToString() s = base64.urlsafe_b64encode(bytes).decode('ascii') return f"https://www.meshtastic.org/d/#{s}".replace("=", "") def setURL(self, url): """Set mesh network URL""" if self.radioConfig == None: raise Exception("No RadioConfig has been read") # URLs are of the form https://www.meshtastic.org/d/#{base64_channel_set} # Split on '/#' to find the base64 encoded channel settings splitURL = url.split("/#") b64 = splitURL[-1] # We normally strip padding to make for a shorter URL, but the python parser doesn't like # that. So add back any missing padding # per https://stackoverflow.com/a/9807138 missing_padding = len(b64) % 4 if missing_padding: b64 += '=' * (4 - missing_padding) decodedURL = base64.urlsafe_b64decode(b64) channelSet = apponly_pb2.ChannelSet() channelSet.ParseFromString(decodedURL) i = 0 for chs in channelSet.settings: ch = channel_pb2.Channel() ch.role = channel_pb2.Channel.Role.PRIMARY if i == 0 else channel_pb2.Channel.Role.SECONDARY ch.index = i ch.settings.CopyFrom(chs) self.channels[ch.index] = ch self.writeChannel(ch.index) i = i + 1 def _requestSettings(self): """ Done with initial config messages, now send regular MeshPackets to ask for settings """ p = admin_pb2.AdminMessage() p.get_radio_request = True def onResponse(p): """A closure to handle the response packet""" self.radioConfig = p["decoded"]["admin"]["raw"].get_radio_response logging.debug("Received radio config, now fetching channels...") self._timeout.reset() # We made foreward progress self._requestChannel(0) # now start fetching channels # Show progress message for super slow operations if self != self.iface.localNode: logging.info( "Requesting preferences from remote node (this could take a while)") return self._sendAdmin(p, wantResponse=True, onResponse=onResponse) def exitSimulator(self): """ Tell a simulator node to exit (this message is ignored for other nodes) """ p = admin_pb2.AdminMessage() p.exit_simulator = True return self._sendAdmin(p) def reboot(self, secs: int = 10): """ Tell the node to reboot """ p = admin_pb2.AdminMessage() p.reboot_seconds = secs logging.info(f"Telling node to reboot in {secs} seconds") return self._sendAdmin(p) def _fixupChannels(self): """Fixup indexes and add disabled channels as needed""" # Add extra disabled channels as needed for index, ch in enumerate(self.channels): ch.index = index # fixup indexes self._fillChannels() def _fillChannels(self): """Mark unused channels as disabled""" # Add extra disabled channels as needed index = len(self.channels) while index < self.iface.myInfo.max_channels: ch = channel_pb2.Channel() ch.role = channel_pb2.Channel.Role.DISABLED ch.index = index self.channels.append(ch) index += 1 def _requestChannel(self, channelNum: int): """ Done with initial config messages, now send regular MeshPackets to ask for settings """ p = admin_pb2.AdminMessage() p.get_channel_request = channelNum + 1 # Show progress message for super slow operations if self != self.iface.localNode: logging.info( f"Requesting channel {channelNum} info from remote node (this could take a while)") else: logging.debug(f"Requesting channel {channelNum}") def onResponse(p): """A closure to handle the response packet""" c = p["decoded"]["admin"]["raw"].get_channel_response self.partialChannels.append(c) self._timeout.reset() # We made foreward progress logging.debug(f"Received channel {stripnl(c)}") index = c.index # for stress testing, we can always download all channels fastChannelDownload = True # Once we see a response that has NO settings, assume we are at the end of channels and stop fetching quitEarly = ( c.role == channel_pb2.Channel.Role.DISABLED) and fastChannelDownload if quitEarly or index >= self.iface.myInfo.max_channels - 1: logging.debug("Finished downloading channels") self.channels = self.partialChannels self._fixupChannels() # FIXME, the following should only be called after we have settings and channels self.iface._connected() # Tell everone else we are ready to go else: self._requestChannel(index + 1) return self._sendAdmin(p, wantResponse=True, onResponse=onResponse) def _sendAdmin(self, p: admin_pb2.AdminMessage, wantResponse=False, onResponse=None, adminIndex=0): """Send an admin message to the specified node (or the local node if destNodeNum is zero)""" if adminIndex == 0: # unless a special channel index was used, we want to use the admin index adminIndex = self.iface.localNode._getAdminChannelIndex() return self.iface.sendData(p, self.nodeNum, portNum=portnums_pb2.PortNum.ADMIN_APP, wantAck=True, wantResponse=wantResponse, onResponse=onResponse, channelIndex=adminIndex)Methods
def deleteChannel(self, channelIndex)-
Delete the specifed channelIndex and shift other channels up
Expand source code
def deleteChannel(self, channelIndex): """Delete the specifed channelIndex and shift other channels up""" ch = self.channels[channelIndex] if ch.role != channel_pb2.Channel.Role.SECONDARY: raise Exception("Only SECONDARY channels can be deleted") # we are careful here because if we move the "admin" channel the channelIndex we need to use # for sending admin channels will also change adminIndex = self.iface.localNode._getAdminChannelIndex() self.channels.pop(channelIndex) self._fixupChannels() # expand back to 8 channels index = channelIndex while index < self.iface.myInfo.max_channels: self.writeChannel(index, adminIndex=adminIndex) index += 1 # if we are updating the local node, we might end up *moving* the admin channel index as we are writing if (self.iface.localNode == self) and index >= adminIndex: # We've now passed the old location for admin index (and writen it), so we can start finding it by name again adminIndex = 0 def exitSimulator(self)-
Tell a simulator node to exit (this message is ignored for other nodes)
Expand source code
def exitSimulator(self): """ Tell a simulator node to exit (this message is ignored for other nodes) """ p = admin_pb2.AdminMessage() p.exit_simulator = True return self._sendAdmin(p) def getChannelByName(self, name)-
Try to find the named channel or return None
Expand source code
def getChannelByName(self, name): """Try to find the named channel or return None""" for c in (self.channels or []): if c.settings and c.settings.name == name: return c return None def getDisabledChannel(self)-
Return the first channel that is disabled (i.e. available for some new use)
Expand source code
def getDisabledChannel(self): """Return the first channel that is disabled (i.e. available for some new use)""" for c in self.channels: if c.role == channel_pb2.Channel.Role.DISABLED: return c return None def getURL(self, includeAll: bool = True)-
The sharable URL that describes the current channel
Expand source code
def getURL(self, includeAll: bool = True): """The sharable URL that describes the current channel """ # Only keep the primary/secondary channels, assume primary is first channelSet = apponly_pb2.ChannelSet() for c in self.channels: if c.role == channel_pb2.Channel.Role.PRIMARY or (includeAll and c.role == channel_pb2.Channel.Role.SECONDARY): channelSet.settings.append(c.settings) bytes = channelSet.SerializeToString() s = base64.urlsafe_b64encode(bytes).decode('ascii') return f"https://www.meshtastic.org/d/#{s}".replace("=", "") def reboot(self, secs: int = 10)-
Tell the node to reboot
Expand source code
def reboot(self, secs: int = 10): """ Tell the node to reboot """ p = admin_pb2.AdminMessage() p.reboot_seconds = secs logging.info(f"Telling node to reboot in {secs} seconds") return self._sendAdmin(p) def requestConfig(self)-
Send regular MeshPackets to ask for settings and channels
Expand source code
def requestConfig(self): """ Send regular MeshPackets to ask for settings and channels """ self.radioConfig = None self.channels = None self.partialChannels = [] # We keep our channels in a temp array until finished self._requestSettings() def setOwner(self, long_name, short_name=None, is_licensed=False)-
Set device owner name
Expand source code
def setOwner(self, long_name, short_name=None, is_licensed=False): """Set device owner name""" nChars = 3 minChars = 2 if long_name is not None: long_name = long_name.strip() if short_name is None: words = long_name.split() if len(long_name) <= nChars: short_name = long_name elif len(words) >= minChars: short_name = ''.join(map(lambda word: word[0], words)) else: trans = str.maketrans(dict.fromkeys('aeiouAEIOU')) short_name = long_name[0] + long_name[1:].translate(trans) if len(short_name) < nChars: short_name = long_name[:nChars] p = admin_pb2.AdminMessage() if long_name is not None: p.set_owner.long_name = long_name if short_name is not None: short_name = short_name.strip() if len(short_name) > nChars: short_name = short_name[:nChars] p.set_owner.short_name = short_name p.set_owner.is_licensed = is_licensed return self._sendAdmin(p) def setURL(self, url)-
Set mesh network URL
Expand source code
def setURL(self, url): """Set mesh network URL""" if self.radioConfig == None: raise Exception("No RadioConfig has been read") # URLs are of the form https://www.meshtastic.org/d/#{base64_channel_set} # Split on '/#' to find the base64 encoded channel settings splitURL = url.split("/#") b64 = splitURL[-1] # We normally strip padding to make for a shorter URL, but the python parser doesn't like # that. So add back any missing padding # per https://stackoverflow.com/a/9807138 missing_padding = len(b64) % 4 if missing_padding: b64 += '=' * (4 - missing_padding) decodedURL = base64.urlsafe_b64decode(b64) channelSet = apponly_pb2.ChannelSet() channelSet.ParseFromString(decodedURL) i = 0 for chs in channelSet.settings: ch = channel_pb2.Channel() ch.role = channel_pb2.Channel.Role.PRIMARY if i == 0 else channel_pb2.Channel.Role.SECONDARY ch.index = i ch.settings.CopyFrom(chs) self.channels[ch.index] = ch self.writeChannel(ch.index) i = i + 1 def showChannels(self)-
Show human readable description of our channels
Expand source code
def showChannels(self): """Show human readable description of our channels""" print("Channels:") for c in self.channels: if c.role != channel_pb2.Channel.Role.DISABLED: cStr = stripnl(MessageToJson(c.settings)) print( f" {channel_pb2.Channel.Role.Name(c.role)} psk={pskToString(c.settings.psk)} {cStr}") publicURL = self.getURL(includeAll=False) adminURL = self.getURL(includeAll=True) print(f"\nPrimary channel URL: {publicURL}") if adminURL != publicURL: print(f"Complete URL (includes all channels): {adminURL}") def showInfo(self)-
Show human readable description of our node
Expand source code
def showInfo(self): """Show human readable description of our node""" print( f"Preferences: {stripnl(MessageToJson(self.radioConfig.preferences))}\n") self.showChannels() def waitForConfig(self)-
Block until radio config is received. Returns True if config has been received.
Expand source code
def waitForConfig(self): """Block until radio config is received. Returns True if config has been received.""" return self._timeout.waitForSet(self, attrs=('radioConfig', 'channels')) def writeChannel(self, channelIndex, adminIndex=0)-
Write the current (edited) channel to the device
Expand source code
def writeChannel(self, channelIndex, adminIndex=0): """Write the current (edited) channel to the device""" p = admin_pb2.AdminMessage() p.set_channel.CopyFrom(self.channels[channelIndex]) self._sendAdmin(p, adminIndex=adminIndex) logging.debug(f"Wrote channel {channelIndex}") def writeConfig(self)-
Write the current (edited) radioConfig to the device
Expand source code
def writeConfig(self): """Write the current (edited) radioConfig to the device""" if self.radioConfig == None: raise Exception("No RadioConfig has been read") p = admin_pb2.AdminMessage() p.set_radio.CopyFrom(self.radioConfig) self._sendAdmin(p) logging.debug("Wrote config")