Files
python/docs/meshtastic/index.html

2819 lines
130 KiB
HTML
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, minimum-scale=1" />
<meta name="generator" content="pdoc 0.9.1" />
<title>meshtastic API documentation</title>
<meta name="description" content="an API for Meshtastic devices …" />
<link rel="preload stylesheet" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/10up-sanitize.css/11.0.1/sanitize.min.css" integrity="sha256-PK9q560IAAa6WVRRh76LtCaI8pjTJ2z11v0miyNNjrs=" crossorigin>
<link rel="preload stylesheet" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/10up-sanitize.css/11.0.1/typography.min.css" integrity="sha256-7l/o7C8jubJiy74VsKTidCy1yBkRtiUGbVkYBylBqUg=" crossorigin>
<link rel="stylesheet preload" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.1.1/styles/github.min.css" crossorigin>
<style>:root{--highlight-color:#fe9}.flex{display:flex !important}body{line-height:1.5em}#content{padding:20px}#sidebar{padding:30px;overflow:hidden}#sidebar > *:last-child{margin-bottom:2cm}.http-server-breadcrumbs{font-size:130%;margin:0 0 15px 0}#footer{font-size:.75em;padding:5px 30px;border-top:1px solid #ddd;text-align:right}#footer p{margin:0 0 0 1em;display:inline-block}#footer p:last-child{margin-right:30px}h1,h2,h3,h4,h5{font-weight:300}h1{font-size:2.5em;line-height:1.1em}h2{font-size:1.75em;margin:1em 0 .50em 0}h3{font-size:1.4em;margin:25px 0 10px 0}h4{margin:0;font-size:105%}h1:target,h2:target,h3:target,h4:target,h5:target,h6:target{background:var(--highlight-color);padding:.2em 0}a{color:#058;text-decoration:none;transition:color .3s ease-in-out}a:hover{color:#e82}.title code{font-weight:bold}h2[id^="header-"]{margin-top:2em}.ident{color:#900}pre code{background:#f8f8f8;font-size:.8em;line-height:1.4em}code{background:#f2f2f1;padding:1px 4px;overflow-wrap:break-word}h1 code{background:transparent}pre{background:#f8f8f8;border:0;border-top:1px solid #ccc;border-bottom:1px solid #ccc;margin:1em 0;padding:1ex}#http-server-module-list{display:flex;flex-flow:column}#http-server-module-list div{display:flex}#http-server-module-list dt{min-width:10%}#http-server-module-list p{margin-top:0}.toc ul,#index{list-style-type:none;margin:0;padding:0}#index code{background:transparent}#index h3{border-bottom:1px solid #ddd}#index ul{padding:0}#index h4{margin-top:.6em;font-weight:bold}@media (min-width:200ex){#index .two-column{column-count:2}}@media (min-width:300ex){#index .two-column{column-count:3}}dl{margin-bottom:2em}dl dl:last-child{margin-bottom:4em}dd{margin:0 0 1em 3em}#header-classes + dl > dd{margin-bottom:3em}dd dd{margin-left:2em}dd p{margin:10px 0}.name{background:#eee;font-weight:bold;font-size:.85em;padding:5px 10px;display:inline-block;min-width:40%}.name:hover{background:#e0e0e0}dt:target .name{background:var(--highlight-color)}.name > span:first-child{white-space:nowrap}.name.class > span:nth-child(2){margin-left:.4em}.inherited{color:#999;border-left:5px solid #eee;padding-left:1em}.inheritance em{font-style:normal;font-weight:bold}.desc h2{font-weight:400;font-size:1.25em}.desc h3{font-size:1em}.desc dt code{background:inherit}.source summary,.git-link-div{color:#666;text-align:right;font-weight:400;font-size:.8em;text-transform:uppercase}.source summary > *{white-space:nowrap;cursor:pointer}.git-link{color:inherit;margin-left:1em}.source pre{max-height:500px;overflow:auto;margin:0}.source pre code{font-size:12px;overflow:visible}.hlist{list-style:none}.hlist li{display:inline}.hlist li:after{content:',\2002'}.hlist li:last-child:after{content:none}.hlist .hlist{display:inline;padding-left:1em}img{max-width:100%}td{padding:0 .5em}.admonition{padding:.1em .5em;margin-bottom:1em}.admonition-title{font-weight:bold}.admonition.note,.admonition.info,.admonition.important{background:#aef}.admonition.todo,.admonition.versionadded,.admonition.tip,.admonition.hint{background:#dfd}.admonition.warning,.admonition.versionchanged,.admonition.deprecated{background:#fd4}.admonition.error,.admonition.danger,.admonition.caution{background:lightpink}</style>
<style media="screen and (min-width: 700px)">@media screen and (min-width:700px){#sidebar{width:30%;height:100vh;overflow:auto;position:sticky;top:0}#content{width:70%;max-width:100ch;padding:3em 4em;border-left:1px solid #ddd}pre code{font-size:1em}.item .name{font-size:1em}main{display:flex;flex-direction:row-reverse;justify-content:flex-end}.toc ul ul,#index ul{padding-left:1.5em}.toc > ul > li{margin-top:.5em}}</style>
<style media="print">@media print{#sidebar h1{page-break-before:always}.source{display:none}}@media print{*{background:transparent !important;color:#000 !important;box-shadow:none !important;text-shadow:none !important}a[href]:after{content:" (" attr(href) ")";font-size:90%}a[href][title]:after{content:none}abbr[title]:after{content:" (" attr(title) ")"}.ir a:after,a[href^="javascript:"]:after,a[href^="#"]:after{content:""}pre,blockquote{border:1px solid #999;page-break-inside:avoid}thead{display:table-header-group}tr,img{page-break-inside:avoid}img{max-width:100% !important}@page{margin:0.5cm}p,h2,h3{orphans:3;widows:3}h1,h2,h3,h4,h5,h6{page-break-after:avoid}}</style>
<script defer src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.1.1/highlight.min.js" integrity="sha256-Uv3H6lx7dJmRfRvH8TH6kJD1TSK1aFcwgx+mdg3epi8=" crossorigin></script>
<script>window.addEventListener('DOMContentLoaded', () => hljs.initHighlighting())</script>
</head>
<body>
<main>
<article id="content">
<header>
<h1 class="title">Package <code>meshtastic</code></h1>
</header>
<section id="section-intro">
<h1 id="an-api-for-meshtastic-devices">an API for Meshtastic devices</h1>
<p>Primary class: SerialInterface
Install with pip: "<a href="https://pypi.org/project/meshtastic/">pip3 install meshtastic</a>"
Source code on <a href="https://github.com/meshtastic/Meshtastic-python">github</a></p>
<p>properties of SerialInterface:</p>
<ul>
<li>radioConfig - Current radio configuration and device settings, if you write to this the new settings will be applied to
the device.</li>
<li>nodes - The database of received nodes.
Includes always up-to-date location and username information for each
node in the mesh.
This is a read-only datastructure.</li>
<li>nodesByNum - like "nodes" but keyed by nodeNum instead of nodeId</li>
<li>myInfo - Contains read-only information about the local radio device (software version, hardware version, etc)</li>
</ul>
<h1 id="published-pubsub-topics">Published PubSub topics</h1>
<p>We use a <a href="https://pypubsub.readthedocs.io/en/v4.0.3/">publish-subscribe</a> model to communicate asynchronous events.
Available
topics:</p>
<ul>
<li>meshtastic.connection.established - published once we've successfully connected to the radio and downloaded the node DB</li>
<li>meshtastic.connection.lost - published once we've lost our link to the radio</li>
<li>meshtastic.receive.text(packet) - delivers a received packet as a dictionary, if you only care about a particular
type of packet, you should subscribe to the full topic name.
If you want to see all packets, simply subscribe to "meshtastic.receive".</li>
<li>meshtastic.receive.position(packet)</li>
<li>meshtastic.receive.user(packet)</li>
<li>meshtastic.receive.data.portnum(packet) (where portnum is an integer or well known PortNum enum)</li>
<li>meshtastic.node.updated(node = NodeInfo) - published when a node in the DB changes (appears, location changed, username changed, etc&hellip;)</li>
</ul>
<p>We receive position, user, or data packets from the mesh.
You probably only care about meshtastic.receive.data.
The first argument for
that publish will be the packet.
Text or binary data packets (from sendData or sendText) will both arrive this way.
If you print packet
you'll see the fields in the dictionary.
decoded.data.payload will contain the raw bytes that were sent.
If the packet was sent with
sendText, decoded.data.text will <strong>also</strong> be populated with the decoded string.
For ASCII these two strings will be the same, but for
unicode scripts they can be different.</p>
<h1 id="example-usage">Example Usage</h1>
<pre><code>import meshtastic
from pubsub import pub
def onReceive(packet, interface): # called when a packet arrives
print(f&quot;Received: {packet}&quot;)
def onConnection(interface, topic=pub.AUTO_TOPIC): # called when we (re)connect to the radio
# defaults to broadcast, specify a destination ID if you wish
interface.sendText(&quot;hello mesh&quot;)
pub.subscribe(onReceive, &quot;meshtastic.receive&quot;)
pub.subscribe(onConnection, &quot;meshtastic.connection.established&quot;)
# By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0
interface = meshtastic.SerialInterface()
</code></pre>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">&#34;&#34;&#34;
# an API for Meshtastic devices
Primary class: SerialInterface
Install with pip: &#34;[pip3 install meshtastic](https://pypi.org/project/meshtastic/)&#34;
Source code on [github](https://github.com/meshtastic/Meshtastic-python)
properties of SerialInterface:
- radioConfig - Current radio configuration and device settings, if you write to this the new settings will be applied to
the device.
- nodes - The database of received nodes. Includes always up-to-date location and username information for each
node in the mesh. This is a read-only datastructure.
- nodesByNum - like &#34;nodes&#34; but keyed by nodeNum instead of nodeId
- myInfo - Contains read-only information about the local radio device (software version, hardware version, etc)
# Published PubSub topics
We use a [publish-subscribe](https://pypubsub.readthedocs.io/en/v4.0.3/) model to communicate asynchronous events. Available
topics:
- meshtastic.connection.established - published once we&#39;ve successfully connected to the radio and downloaded the node DB
- meshtastic.connection.lost - published once we&#39;ve lost our link to the radio
- meshtastic.receive.text(packet) - delivers a received packet as a dictionary, if you only care about a particular
type of packet, you should subscribe to the full topic name. If you want to see all packets, simply subscribe to &#34;meshtastic.receive&#34;.
- meshtastic.receive.position(packet)
- meshtastic.receive.user(packet)
- meshtastic.receive.data.portnum(packet) (where portnum is an integer or well known PortNum enum)
- meshtastic.node.updated(node = NodeInfo) - published when a node in the DB changes (appears, location changed, username changed, etc...)
We receive position, user, or data packets from the mesh. You probably only care about meshtastic.receive.data. The first argument for
that publish will be the packet. Text or binary data packets (from sendData or sendText) will both arrive this way. If you print packet
you&#39;ll see the fields in the dictionary. decoded.data.payload will contain the raw bytes that were sent. If the packet was sent with
sendText, decoded.data.text will **also** be populated with the decoded string. For ASCII these two strings will be the same, but for
unicode scripts they can be different.
# Example Usage
```
import meshtastic
from pubsub import pub
def onReceive(packet, interface): # called when a packet arrives
print(f&#34;Received: {packet}&#34;)
def onConnection(interface, topic=pub.AUTO_TOPIC): # called when we (re)connect to the radio
# defaults to broadcast, specify a destination ID if you wish
interface.sendText(&#34;hello mesh&#34;)
pub.subscribe(onReceive, &#34;meshtastic.receive&#34;)
pub.subscribe(onConnection, &#34;meshtastic.connection.established&#34;)
# By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0
interface = meshtastic.SerialInterface()
```
&#34;&#34;&#34;
import pygatt
import google.protobuf.json_format
import serial
import threading
import logging
import sys
import random
import traceback
import time
import base64
import platform
import socket
from . import mesh_pb2, portnums_pb2, apponly_pb2, admin_pb2, environmental_measurement_pb2, remote_hardware_pb2, channel_pb2, radioconfig_pb2, util
from .util import fixme, catchAndIgnore, stripnl
from pubsub import pub
from dotmap import DotMap
from typing import *
START1 = 0x94
START2 = 0xc3
HEADER_LEN = 4
MAX_TO_FROM_RADIO_SIZE = 512
defaultHopLimit = 3
BROADCAST_ADDR = &#34;^all&#34; # A special ID that means broadcast
# if using 8 bit nodenums this will be shortend on the target
BROADCAST_NUM = 0xffffffff
MY_CONFIG_ID = 42
&#34;&#34;&#34;The numeric buildnumber (shared with android apps) specifying the level of device code we are guaranteed to understand
format is Mmmss (where M is 1+the numeric major number. i.e. 20120 means 1.1.20
&#34;&#34;&#34;
OUR_APP_VERSION = 20200
class ResponseHandler(NamedTuple):
&#34;&#34;&#34;A pending response callback, waiting for a response to one of our messages&#34;&#34;&#34;
# requestId: int - used only as a key
callback: Callable
# FIXME, add timestamp and age out old requests
class KnownProtocol(NamedTuple):
&#34;&#34;&#34;Used to automatically decode known protocol payloads&#34;&#34;&#34;
name: str
# portnum: int, now a key
# If set, will be called to prase as a protocol buffer
protobufFactory: Callable = None
# If set, invoked as onReceive(interface, packet)
onReceive: Callable = None
class MeshInterface:
&#34;&#34;&#34;Interface class for meshtastic devices
Properties:
isConnected
nodes
debugOut
&#34;&#34;&#34;
def __init__(self, debugOut=None, noProto=False):
&#34;&#34;&#34;Constructor
Keyword Arguments:
noProto -- If True, don&#39;t try to run our protocol on the link - just be a dumb serial client.
&#34;&#34;&#34;
self.debugOut = debugOut
self.nodes = None # FIXME
self.isConnected = threading.Event()
self.noProto = noProto
self.myInfo = None # We don&#39;t have device info yet
self.responseHandlers = {} # A map from request ID to the handler
self.failure = None # If we&#39;ve encountered a fatal exception it will be kept here
random.seed() # FIXME, we should not clobber the random seedval here, instead tell user they must call it
self.currentPacketId = random.randint(0, 0xffffffff)
self._startConfig()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None and exc_value is not None:
logging.error(
f&#39;An exception of type {exc_type} with value {exc_value} has occurred&#39;)
if traceback is not None:
logging.error(f&#39;Traceback: {traceback}&#39;)
self.close()
def sendText(self, text: AnyStr,
destinationId=BROADCAST_ADDR,
wantAck=False,
wantResponse=False,
hopLimit=defaultHopLimit,
onResponse=None):
&#34;&#34;&#34;Send a utf8 string to some other node, if the node has a display it will also be shown on the device.
Arguments:
text {string} -- The text to send
Keyword Arguments:
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
wantResponse -- True if you want the service on the other side to send an application layer response
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
return self.sendData(text.encode(&#34;utf-8&#34;), destinationId,
portNum=portnums_pb2.PortNum.TEXT_MESSAGE_APP,
wantAck=wantAck,
wantResponse=wantResponse,
hopLimit=hopLimit,
onResponse=onResponse)
def sendData(self, data, destinationId=BROADCAST_ADDR,
portNum=portnums_pb2.PortNum.PRIVATE_APP, wantAck=False,
wantResponse=False,
hopLimit=defaultHopLimit,
onResponse=None):
&#34;&#34;&#34;Send a data packet to some other node
Keyword Arguments:
data -- the data to send, either as an array of bytes or as a protobuf (which will be automatically serialized to bytes)
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
wantResponse -- True if you want the service on the other side to send an application layer response
onResponse -- A closure of the form funct(packet), that will be called when a response packet arrives (or the transaction is NAKed due to non receipt)
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
if getattr(data, &#34;SerializeToString&#34;, None):
logging.debug(f&#34;Serializing protobuf as data: {stripnl(data)}&#34;)
data = data.SerializeToString()
if len(data) &gt; mesh_pb2.Constants.DATA_PAYLOAD_LEN:
raise Exception(&#34;Data payload too big&#34;)
if portNum == portnums_pb2.PortNum.UNKNOWN_APP: # we are now more strict wrt port numbers
raise Exception(&#34;A non-zero port number must be specified&#34;)
meshPacket = mesh_pb2.MeshPacket()
meshPacket.decoded.payload = data
meshPacket.decoded.portnum = portNum
meshPacket.decoded.want_response = wantResponse
p = self._sendPacket(meshPacket, destinationId, wantAck=wantAck, hopLimit=hopLimit)
if onResponse is not None:
self._addResponseHandler(p.id, onResponse)
return p
def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False):
&#34;&#34;&#34;
Send a position packet to some other node (normally a broadcast)
Also, the device software will notice this packet and use it to automatically set its notion of
the local position.
If timeSec is not specified (recommended), we will use the local machine time.
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
p = mesh_pb2.Position()
if(latitude != 0.0):
p.latitude_i = int(latitude / 1e-7)
if(longitude != 0.0):
p.longitude_i = int(longitude / 1e-7)
if(altitude != 0):
p.altitude = int(altitude)
if timeSec == 0:
timeSec = time.time() # returns unix timestamp in seconds
p.time = int(timeSec)
return self.sendData(p, destinationId,
portNum=portnums_pb2.PortNum.POSITION_APP,
wantAck=wantAck,
wantResponse=wantResponse)
def _addResponseHandler(self, requestId, callback):
self.responseHandlers[requestId] = ResponseHandler(callback)
def _sendPacket(self, meshPacket,
destinationId=BROADCAST_ADDR,
wantAck=False, hopLimit=defaultHopLimit):
&#34;&#34;&#34;Send a MeshPacket to the specified node (or if unspecified, broadcast).
You probably don&#39;t want this - use sendData instead.
Returns the sent packet. The id field will be populated in this packet and
can be used to track future message acks/naks.
&#34;&#34;&#34;
# We allow users to talk to the local node before we&#39;ve completed the full connection flow...
if(self.myInfo is not None and destinationId != self.myInfo.my_node_num):
self._waitConnected()
toRadio = mesh_pb2.ToRadio()
if destinationId is None:
raise Exception(&#34;destinationId must not be None&#34;)
elif isinstance(destinationId, int):
nodeNum = destinationId
elif destinationId == BROADCAST_ADDR:
nodeNum = BROADCAST_NUM
else:
nodeNum = self.nodes[destinationId][&#39;num&#39;]
meshPacket.to = nodeNum
meshPacket.want_ack = wantAck
meshPacket.hop_limit = hopLimit
# if the user hasn&#39;t set an ID for this packet (likely and recommended), we should pick a new unique ID
# so the message can be tracked.
if meshPacket.id == 0:
meshPacket.id = self._generatePacketId()
toRadio.packet.CopyFrom(meshPacket)
#logging.debug(f&#34;Sending packet: {stripnl(meshPacket)}&#34;)
self._sendToRadio(toRadio)
return meshPacket
def waitForConfig(self, sleep=.1, maxsecs=20, attrs=(&#39;myInfo&#39;, &#39;nodes&#39;, &#39;radioConfig&#39;, &#39;channels&#39;)):
&#34;&#34;&#34;Block until radio config is received. Returns True if config has been received.&#34;&#34;&#34;
for _ in range(int(maxsecs/sleep)):
if all(map(lambda a: getattr(self, a, None), attrs)):
return True
time.sleep(sleep)
return False
def writeConfig(self):
&#34;&#34;&#34;Write the current (edited) radioConfig to the device&#34;&#34;&#34;
if self.radioConfig == None:
raise Exception(&#34;No RadioConfig has been read&#34;)
p = admin_pb2.AdminMessage()
p.set_radio.CopyFrom(self.radioConfig)
self.sendData(p, self.myInfo.my_node_num,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=True)
logging.debug(&#34;Wrote config&#34;)
def writeChannel(self, channelIndex):
&#34;&#34;&#34;Write the current (edited) channel to the device&#34;&#34;&#34;
p = admin_pb2.AdminMessage()
p.set_channel.CopyFrom(self.channels[channelIndex])
self.sendData(p, self.myInfo.my_node_num,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=True)
logging.debug(&#34;Wrote channel {channelIndex}&#34;)
def getMyNodeInfo(self):
if self.myInfo is None:
return None
return self.nodesByNum.get(self.myInfo.my_node_num)
def getMyUser(self):
nodeInfo = self.getMyNodeInfo()
if nodeInfo is not None:
return nodeInfo.get(&#39;user&#39;)
return None
def getLongName(self):
user = self.getMyUser()
if user is not None:
return user.get(&#39;longName&#39;, None)
return None
def getShortName(self):
user = self.getMyUser()
if user is not None:
return user.get(&#39;shortName&#39;, None)
return None
def setOwner(self, long_name, short_name=None):
&#34;&#34;&#34;Set device owner name&#34;&#34;&#34;
nChars = 3
minChars = 2
if long_name is not None:
long_name = long_name.strip()
if short_name is None:
words = long_name.split()
if len(long_name) &lt;= nChars:
short_name = long_name
elif len(words) &gt;= minChars:
short_name = &#39;&#39;.join(map(lambda word: word[0], words))
else:
trans = str.maketrans(dict.fromkeys(&#39;aeiouAEIOU&#39;))
short_name = long_name[0] + long_name[1:].translate(trans)
if len(short_name) &lt; nChars:
short_name = long_name[:nChars]
p = admin_pb2.AdminMessage()
if long_name is not None:
p.set_owner.long_name = long_name
if short_name is not None:
short_name = short_name.strip()
if len(short_name) &gt; nChars:
short_name = short_name[:nChars]
p.set_owner.short_name = short_name
return self.sendData(p, self.myInfo.my_node_num,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=True)
@property
def channelURL(self):
&#34;&#34;&#34;The sharable URL that describes the current channel
&#34;&#34;&#34;
# Only keep the primary/secondary channels, assume primary is first
channelSet = apponly_pb2.ChannelSet()
for c in self.channels:
if c.role != channel_pb2.Channel.Role.DISABLED:
channelSet.settings.append(c.settings)
bytes = channelSet.SerializeToString()
s = base64.urlsafe_b64encode(bytes).decode(&#39;ascii&#39;)
return f&#34;https://www.meshtastic.org/d/#{s}&#34;.replace(&#34;=&#34;, &#34;&#34;)
def setURL(self, url):
&#34;&#34;&#34;Set mesh network URL&#34;&#34;&#34;
if self.radioConfig == None:
raise Exception(&#34;No RadioConfig has been read&#34;)
# URLs are of the form https://www.meshtastic.org/d/#{base64_channel_set}
# Split on &#39;/#&#39; to find the base64 encoded channel settings
splitURL = url.split(&#34;/#&#34;)
decodedURL = base64.urlsafe_b64decode(splitURL[-1])
channelSet = apponly_pb2.ChannelSet()
channelSet.ParseFromString(decodedURL)
i = 0
for chs in channelSet.settings:
ch = channel_pb2.Channel()
ch.role = channel_pb2.Channel.Role.PRIMARY if i == 0 else channel_pb2.Channel.Role.SECONDARY
ch.index = i
ch.settings.CopyFrom(chs)
self.channels[ch.index] = ch
self.writeChannel(ch.index)
i = i + 1
def _waitConnected(self):
&#34;&#34;&#34;Block until the initial node db download is complete, or timeout
and raise an exception&#34;&#34;&#34;
if not self.isConnected.wait(5.0): # timeout after 5 seconds
raise Exception(&#34;Timed out waiting for connection completion&#34;)
# If we failed while connecting, raise the connection to the client
if self.failure:
raise self.failure
def _generatePacketId(self):
&#34;&#34;&#34;Get a new unique packet ID&#34;&#34;&#34;
if self.currentPacketId is None:
raise Exception(&#34;Not connected yet, can not generate packet&#34;)
else:
self.currentPacketId = (self.currentPacketId + 1) &amp; 0xffffffff
return self.currentPacketId
def _disconnected(self):
&#34;&#34;&#34;Called by subclasses to tell clients this interface has disconnected&#34;&#34;&#34;
self.isConnected.clear()
catchAndIgnore(&#34;disconnection publish&#34;, lambda: pub.sendMessage(
&#34;meshtastic.connection.lost&#34;, interface=self))
def _connected(self):
&#34;&#34;&#34;Called by this class to tell clients we are now fully connected to a node
&#34;&#34;&#34;
self.isConnected.set()
catchAndIgnore(&#34;connection publish&#34;, lambda: pub.sendMessage(
&#34;meshtastic.connection.established&#34;, interface=self))
def _startConfig(self):
&#34;&#34;&#34;Start device packets flowing&#34;&#34;&#34;
self.myInfo = None
self.nodes = {} # nodes keyed by ID
self.nodesByNum = {} # nodes keyed by nodenum
self.radioConfig = None
self.channels = None
self.partialChannels = [] # We keep our channels in a temp array until finished
startConfig = mesh_pb2.ToRadio()
startConfig.want_config_id = MY_CONFIG_ID # we don&#39;t use this value
self._sendToRadio(startConfig)
def _sendToRadio(self, toRadio):
&#34;&#34;&#34;Send a ToRadio protobuf to the device&#34;&#34;&#34;
if self.noProto:
logging.warn(
f&#34;Not sending packet because protocol use is disabled by noProto&#34;)
else:
#logging.debug(f&#34;Sending toRadio: {stripnl(toRadio)}&#34;)
self._sendToRadioImpl(toRadio)
def _sendToRadioImpl(self, toRadio):
&#34;&#34;&#34;Send a ToRadio protobuf to the device&#34;&#34;&#34;
logging.error(f&#34;Subclass must provide toradio: {toRadio}&#34;)
def _handleConfigComplete(self):
&#34;&#34;&#34;
Done with initial config messages, now send regular MeshPackets to ask for settings and channels
&#34;&#34;&#34;
self._requestSettings()
self._requestChannel(0)
def _requestSettings(self):
&#34;&#34;&#34;
Done with initial config messages, now send regular MeshPackets to ask for settings
&#34;&#34;&#34;
p = admin_pb2.AdminMessage()
p.get_radio_request = True
def onResponse(p):
&#34;&#34;&#34;A closure to handle the response packet&#34;&#34;&#34;
self.radioConfig = p[&#34;decoded&#34;][&#34;admin&#34;][&#34;raw&#34;].get_radio_response
return self.sendData(p, self.myInfo.my_node_num,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=True,
wantResponse=True,
onResponse=onResponse)
def _requestChannel(self, channelNum: int):
&#34;&#34;&#34;
Done with initial config messages, now send regular MeshPackets to ask for settings
&#34;&#34;&#34;
p = admin_pb2.AdminMessage()
p.get_channel_request = channelNum + 1
logging.debug(f&#34;Requesting channel {channelNum}&#34;)
def onResponse(p):
&#34;&#34;&#34;A closure to handle the response packet&#34;&#34;&#34;
c = p[&#34;decoded&#34;][&#34;admin&#34;][&#34;raw&#34;].get_channel_response
self.partialChannels.append(c)
logging.debug(f&#34;Received channel {stripnl(c)}&#34;)
index = c.index
# for stress testing, we can always download all channels
fastChannelDownload = True
# Once we see a response that has NO settings, assume we are at the end of channels and stop fetching
quitEarly = (c.role == channel_pb2.Channel.Role.DISABLED) and fastChannelDownload
if quitEarly or index &gt;= self.myInfo.max_channels - 1:
self.channels = self.partialChannels
# FIXME, the following should only be called after we have settings and channels
self._connected() # Tell everone else we are ready to go
else:
self._requestChannel(index + 1)
return self.sendData(p, self.myInfo.my_node_num,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=True,
wantResponse=True,
onResponse=onResponse)
def _handleFromRadio(self, fromRadioBytes):
&#34;&#34;&#34;
Handle a packet that arrived from the radio(update model and publish events)
Called by subclasses.&#34;&#34;&#34;
fromRadio = mesh_pb2.FromRadio()
fromRadio.ParseFromString(fromRadioBytes)
asDict = google.protobuf.json_format.MessageToDict(fromRadio)
if fromRadio.HasField(&#34;my_info&#34;):
self.myInfo = fromRadio.my_info
logging.debug(f&#34;Received myinfo: {stripnl(fromRadio.my_info)}&#34;)
failmsg = None
# Check for app too old
if self.myInfo.min_app_version &gt; OUR_APP_VERSION:
failmsg = &#34;This device needs a newer python client, please \&#34;pip install --upgrade meshtastic\&#34;. For more information see https://tinyurl.com/5bjsxu32&#34;
# check for firmware too old
if self.myInfo.max_channels == 0:
failmsg = &#34;This version of meshtastic-python requires device firmware version 1.2 or later. For more information see https://tinyurl.com/5bjsxu32&#34;
if failmsg:
self.failure = Exception(failmsg)
self.isConnected.set() # let waitConnected return this exception
self.close()
elif fromRadio.HasField(&#34;node_info&#34;):
node = asDict[&#34;nodeInfo&#34;]
try:
self._fixupPosition(node[&#34;position&#34;])
except:
logging.debug(&#34;Node without position&#34;)
logging.debug(f&#34;Received nodeinfo: {node}&#34;)
self.nodesByNum[node[&#34;num&#34;]] = node
if &#34;user&#34; in node: # Some nodes might not have user/ids assigned yet
self.nodes[node[&#34;user&#34;][&#34;id&#34;]] = node
pub.sendMessage(&#34;meshtastic.node.updated&#34;,
node=node, interface=self)
elif fromRadio.config_complete_id == MY_CONFIG_ID:
# we ignore the config_complete_id, it is unneeded for our stream API fromRadio.config_complete_id
self._handleConfigComplete()
elif fromRadio.HasField(&#34;packet&#34;):
self._handlePacketFromRadio(fromRadio.packet)
elif fromRadio.rebooted:
# Tell clients the device went away. Careful not to call the overridden subclass version that closes the serial port
MeshInterface._disconnected(self)
self._startConfig() # redownload the node db etc...
else:
logging.debug(&#34;Unexpected FromRadio payload&#34;)
def _fixupPosition(self, position):
&#34;&#34;&#34;Convert integer lat/lon into floats
Arguments:
position {Position dictionary} -- object ot fix up
&#34;&#34;&#34;
if &#34;latitudeI&#34; in position:
position[&#34;latitude&#34;] = position[&#34;latitudeI&#34;] * 1e-7
if &#34;longitudeI&#34; in position:
position[&#34;longitude&#34;] = position[&#34;longitudeI&#34;] * 1e-7
def _nodeNumToId(self, num):
&#34;&#34;&#34;Map a node node number to a node ID
Arguments:
num {int} -- Node number
Returns:
string -- Node ID
&#34;&#34;&#34;
if num == BROADCAST_NUM:
return BROADCAST_ADDR
try:
return self.nodesByNum[num][&#34;user&#34;][&#34;id&#34;]
except:
logging.warn(&#34;Node not found for fromId&#34;)
return None
def _getOrCreateByNum(self, nodeNum):
&#34;&#34;&#34;Given a nodenum find the NodeInfo in the DB (or create if necessary)&#34;&#34;&#34;
if nodeNum == BROADCAST_NUM:
raise Exception(&#34;Can not create/find nodenum by the broadcast num&#34;)
if nodeNum in self.nodesByNum:
return self.nodesByNum[nodeNum]
else:
n = {&#34;num&#34;: nodeNum} # Create a minimial node db entry
self.nodesByNum[nodeNum] = n
return n
def _handlePacketFromRadio(self, meshPacket):
&#34;&#34;&#34;Handle a MeshPacket that just arrived from the radio
Will publish one of the following events:
- meshtastic.receive.text(packet = MeshPacket dictionary)
- meshtastic.receive.position(packet = MeshPacket dictionary)
- meshtastic.receive.user(packet = MeshPacket dictionary)
- meshtastic.receive.data(packet = MeshPacket dictionary)
&#34;&#34;&#34;
asDict = google.protobuf.json_format.MessageToDict(meshPacket)
# We normally decompose the payload into a dictionary so that the client
# doesn&#39;t need to understand protobufs. But advanced clients might
# want the raw protobuf, so we provide it in &#34;raw&#34;
asDict[&#34;raw&#34;] = meshPacket
# from might be missing if the nodenum was zero.
if not &#34;from&#34; in asDict:
asDict[&#34;from&#34;] = 0
logging.error(f&#34;Device returned a packet we sent, ignoring: {stripnl(asDict)}&#34;)
return
if not &#34;to&#34; in asDict:
asDict[&#34;to&#34;] = 0
# /add fromId and toId fields based on the node ID
asDict[&#34;fromId&#34;] = self._nodeNumToId(asDict[&#34;from&#34;])
asDict[&#34;toId&#34;] = self._nodeNumToId(asDict[&#34;to&#34;])
# We could provide our objects as DotMaps - which work with . notation or as dictionaries
# asObj = DotMap(asDict)
topic = &#34;meshtastic.receive&#34; # Generic unknown packet type
decoded = asDict[&#34;decoded&#34;]
# The default MessageToDict converts byte arrays into base64 strings.
# We don&#39;t want that - it messes up data payload. So slam in the correct
# byte array.
decoded[&#34;payload&#34;] = meshPacket.decoded.payload
# UNKNOWN_APP is the default protobuf portnum value, and therefore if not set it will not be populated at all
# to make API usage easier, set it to prevent confusion
if not &#34;portnum&#34; in decoded:
decoded[&#34;portnum&#34;] = portnums_pb2.PortNum.Name(
portnums_pb2.PortNum.UNKNOWN_APP)
portnum = decoded[&#34;portnum&#34;]
topic = f&#34;meshtastic.receive.data.{portnum}&#34;
# decode position protobufs and update nodedb, provide decoded version as &#34;position&#34; in the published msg
# move the following into a &#39;decoders&#39; API that clients could register?
portNumInt = meshPacket.decoded.portnum # we want portnum as an int
handler = protocols.get(portNumInt)
# The decoded protobuf as a dictionary (if we understand this message)
p = None
if handler is not None:
topic = f&#34;meshtastic.receive.{handler.name}&#34;
# Convert to protobuf if possible
if handler.protobufFactory is not None:
pb = handler.protobufFactory()
pb.ParseFromString(meshPacket.decoded.payload)
p = google.protobuf.json_format.MessageToDict(pb)
asDict[&#34;decoded&#34;][handler.name] = p
# Also provide the protobuf raw
asDict[&#34;decoded&#34;][handler.name][&#34;raw&#34;] = pb
# Call specialized onReceive if necessary
if handler.onReceive is not None:
handler.onReceive(self, asDict)
# Is this message in response to a request, if so, look for a handler
requestId = decoded.get(&#34;requestId&#34;)
if requestId is not None:
# We ignore ACK packets, but send NAKs and data responses to the handlers
routing = decoded.get(&#34;routing&#34;)
isAck = routing is not None and (&#34;errorReason&#34; not in routing)
if not isAck:
# we keep the responseHandler in dict until we get a non ack
handler = self.responseHandlers.pop(requestId, None)
if handler is not None:
handler.callback(asDict)
logging.debug(f&#34;Publishing {topic}: packet={stripnl(asDict)} &#34;)
catchAndIgnore(f&#34;publishing {topic}&#34;, lambda: pub.sendMessage(
topic, packet=asDict, interface=self))
# Our standard BLE characteristics
TORADIO_UUID = &#34;f75c76d2-129e-4dad-a1dd-7866124401e7&#34;
FROMRADIO_UUID = &#34;8ba2bcc2-ee02-4a55-a531-c525c5e454d5&#34;
FROMNUM_UUID = &#34;ed9da18c-a800-4f66-a670-aa7547e34453&#34;
class BLEInterface(MeshInterface):
&#34;&#34;&#34;A not quite ready - FIXME - BLE interface to devices&#34;&#34;&#34;
def __init__(self, address, debugOut=None):
self.address = address
self.adapter = pygatt.GATTToolBackend() # BGAPIBackend()
self.adapter.start()
logging.debug(f&#34;Connecting to {self.address}&#34;)
self.device = self.adapter.connect(address)
logging.debug(&#34;Connected to device&#34;)
# fromradio = self.device.char_read(FROMRADIO_UUID)
MeshInterface.__init__(self, debugOut=debugOut)
self._readFromRadio() # read the initial responses
def handle_data(handle, data):
self._handleFromRadio(data)
self.device.subscribe(FROMNUM_UUID, callback=handle_data)
def _sendToRadioImpl(self, toRadio):
&#34;&#34;&#34;Send a ToRadio protobuf to the device&#34;&#34;&#34;
# logging.debug(f&#34;Sending: {stripnl(toRadio)}&#34;)
b = toRadio.SerializeToString()
self.device.char_write(TORADIO_UUID, b)
def close(self):
self.adapter.stop()
def _readFromRadio(self):
wasEmpty = False
while not wasEmpty:
b = self.device.char_read(FROMRADIO_UUID)
wasEmpty = len(b) == 0
if not wasEmpty:
self._handleFromRadio(b)
class StreamInterface(MeshInterface):
&#34;&#34;&#34;Interface class for meshtastic devices over a stream link (serial, TCP, etc)&#34;&#34;&#34;
def __init__(self, debugOut=None, noProto=False, connectNow=True):
&#34;&#34;&#34;Constructor, opens a connection to self.stream
Keyword Arguments:
devPath {string} -- A filepath to a device, i.e. /dev/ttyUSB0 (default: {None})
debugOut {stream} -- If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None})
Raises:
Exception: [description]
Exception: [description]
&#34;&#34;&#34;
if not hasattr(self, &#39;stream&#39;):
raise Exception(
&#34;StreamInterface is now abstract (to update existing code create SerialInterface instead)&#34;)
self._rxBuf = bytes() # empty
self._wantExit = False
# FIXME, figure out why daemon=True causes reader thread to exit too early
self._rxThread = threading.Thread(target=self.__reader, args=())
MeshInterface.__init__(self, debugOut=debugOut, noProto=noProto)
# Start the reader thread after superclass constructor completes init
if connectNow:
self.connect()
def connect(self):
&#34;&#34;&#34;Connect to our radio
Normally this is called automatically by the constructor, but if you passed in connectNow=False you can manually
start the reading thread later.
&#34;&#34;&#34;
# Send some bogus UART characters to force a sleeping device to wake
self._writeBytes(bytes([START1, START1, START1, START1]))
time.sleep(0.1) # wait 100ms to give device time to start running
self._rxThread.start()
if not self.noProto: # Wait for the db download if using the protocol
self._waitConnected()
def _disconnected(self):
&#34;&#34;&#34;We override the superclass implementation to close our port&#34;&#34;&#34;
MeshInterface._disconnected(self)
logging.debug(&#34;Closing our port&#34;)
if not self.stream is None:
self.stream.close()
def _writeBytes(self, b):
&#34;&#34;&#34;Write an array of bytes to our stream and flush&#34;&#34;&#34;
self.stream.write(b)
self.stream.flush()
def _readBytes(self, len):
&#34;&#34;&#34;Read an array of bytes from our stream&#34;&#34;&#34;
return self.stream.read(len)
def _sendToRadioImpl(self, toRadio):
&#34;&#34;&#34;Send a ToRadio protobuf to the device&#34;&#34;&#34;
logging.debug(f&#34;Sending: {stripnl(toRadio)}&#34;)
b = toRadio.SerializeToString()
bufLen = len(b)
# We convert into a string, because the TCP code doesn&#39;t work with byte arrays
header = bytes([START1, START2, (bufLen &gt;&gt; 8) &amp; 0xff, bufLen &amp; 0xff])
self._writeBytes(header + b)
def close(self):
&#34;&#34;&#34;Close a connection to the device&#34;&#34;&#34;
logging.debug(&#34;Closing stream&#34;)
# pyserial cancel_read doesn&#39;t seem to work, therefore we ask the reader thread to close things for us
self._wantExit = True
if self._rxThread != threading.current_thread():
self._rxThread.join() # wait for it to exit
def __reader(self):
&#34;&#34;&#34;The reader thread that reads bytes from our stream&#34;&#34;&#34;
empty = bytes()
try:
while not self._wantExit:
# logging.debug(&#34;reading character&#34;)
b = self._readBytes(1)
# logging.debug(&#34;In reader loop&#34;)
if len(b) &gt; 0:
# logging.debug(f&#34;read returned {b}&#34;)
c = b[0]
ptr = len(self._rxBuf)
# Assume we want to append this byte, fixme use bytearray instead
self._rxBuf = self._rxBuf + b
if ptr == 0: # looking for START1
if c != START1:
self._rxBuf = empty # failed to find start
if self.debugOut != None:
try:
self.debugOut.write(b.decode(&#34;utf-8&#34;))
except:
self.debugOut.write(&#39;?&#39;)
elif ptr == 1: # looking for START2
if c != START2:
self._rxBuf = empty # failed to find start2
elif ptr &gt;= HEADER_LEN: # we&#39;ve at least got a header
# big endian length follos header
packetlen = (self._rxBuf[2] &lt;&lt; 8) + self._rxBuf[3]
if ptr == HEADER_LEN: # we _just_ finished reading the header, validate length
if packetlen &gt; MAX_TO_FROM_RADIO_SIZE:
self._rxBuf = empty # length ws out out bounds, restart
if len(self._rxBuf) != 0 and ptr + 1 == packetlen + HEADER_LEN:
try:
self._handleFromRadio(self._rxBuf[HEADER_LEN:])
except Exception as ex:
logging.error(
f&#34;Error while handling message from radio {ex}&#34;)
traceback.print_exc()
self._rxBuf = empty
else:
# logging.debug(f&#34;timeout&#34;)
pass
except serial.SerialException as ex:
if not self._wantExit: # We might intentionally get an exception during shutdown
logging.warn(
f&#34;Meshtastic serial port disconnected, disconnecting... {ex}&#34;)
except OSError as ex:
if not self._wantExit: # We might intentionally get an exception during shutdown
logging.error(
f&#34;Unexpected OSError, terminating meshtastic reader... {ex}&#34;)
except Exception as ex:
logging.error(
f&#34;Unexpected exception, terminating meshtastic reader... {ex}&#34;)
finally:
logging.debug(&#34;reader is exiting&#34;)
self._disconnected()
class SerialInterface(StreamInterface):
&#34;&#34;&#34;Interface class for meshtastic devices over a serial link&#34;&#34;&#34;
def __init__(self, devPath=None, debugOut=None, noProto=False, connectNow=True):
&#34;&#34;&#34;Constructor, opens a connection to a specified serial port, or if unspecified try to
find one Meshtastic device by probing
Keyword Arguments:
devPath {string} -- A filepath to a device, i.e. /dev/ttyUSB0 (default: {None})
debugOut {stream} -- If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None})
&#34;&#34;&#34;
if devPath is None:
ports = util.findPorts()
if len(ports) == 0:
raise Exception(&#34;No Meshtastic devices detected&#34;)
elif len(ports) &gt; 1:
raise Exception(
f&#34;Multiple ports detected, you must specify a device, such as {ports[0]}&#34;)
else:
devPath = ports[0]
logging.debug(f&#34;Connecting to {devPath}&#34;)
# Note: we provide None for port here, because we will be opening it later
self.stream = serial.Serial(
None, 921600, exclusive=True, timeout=0.5)
# rts=False Needed to prevent TBEAMs resetting on OSX, because rts is connected to reset
self.stream.port = devPath
# OS-X/Windows seems to have a bug in its serial driver. It ignores that we asked for no RTSCTS
# control and will always drive RTS either high or low (rather than letting the CP102 leave
# it as an open-collector floating pin). Since it is going to drive it anyways we want to make
# sure it is driven low, so that the TBEAM won&#39;t reset
# Linux does this properly, so don&#39;t apply this hack (because it makes the reset button not work)
if platform.system() != &#39;Linux&#39;:
self.stream.rts = False
self.stream.open()
StreamInterface.__init__(
self, debugOut=debugOut, noProto=noProto, connectNow=connectNow)
class TCPInterface(StreamInterface):
&#34;&#34;&#34;Interface class for meshtastic devices over a TCP link&#34;&#34;&#34;
def __init__(self, hostname: AnyStr, debugOut=None, noProto=False, connectNow=True, portNumber=4403):
&#34;&#34;&#34;Constructor, opens a connection to a specified IP address/hostname
Keyword Arguments:
hostname {string} -- Hostname/IP address of the device to connect to
&#34;&#34;&#34;
logging.debug(f&#34;Connecting to {hostname}&#34;)
server_address = (hostname, portNumber)
sock = socket.create_connection(server_address)
# Instead of wrapping as a stream, we use the native socket API
# self.stream = sock.makefile(&#39;rw&#39;)
self.stream = None
self.socket = sock
StreamInterface.__init__(
self, debugOut=debugOut, noProto=noProto, connectNow=connectNow)
def close(self):
&#34;&#34;&#34;Close a connection to the device&#34;&#34;&#34;
logging.debug(&#34;Closing TCP stream&#34;)
# Sometimes the socket read might be blocked in the reader thread. Therefore we force the shutdown by closing
# the socket here
self._wantExit = True
if not self.socket is None:
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
StreamInterface.close(self)
def _writeBytes(self, b):
&#34;&#34;&#34;Write an array of bytes to our stream and flush&#34;&#34;&#34;
self.socket.send(b)
def _readBytes(self, len):
&#34;&#34;&#34;Read an array of bytes from our stream&#34;&#34;&#34;
return self.socket.recv(len)
def _onTextReceive(iface, asDict):
&#34;&#34;&#34;Special text auto parsing for received messages&#34;&#34;&#34;
# We don&#39;t throw if the utf8 is invalid in the text message. Instead we just don&#39;t populate
# the decoded.data.text and we log an error message. This at least allows some delivery to
# the app and the app can deal with the missing decoded representation.
#
# Usually btw this problem is caused by apps sending binary data but setting the payload type to
# text.
try:
asBytes = asDict[&#34;decoded&#34;][&#34;payload&#34;]
asDict[&#34;decoded&#34;][&#34;text&#34;] = asBytes.decode(&#34;utf-8&#34;)
except Exception as ex:
logging.error(f&#34;Malformatted utf8 in text message: {ex}&#34;)
def _onPositionReceive(iface, asDict):
&#34;&#34;&#34;Special auto parsing for received messages&#34;&#34;&#34;
p = asDict[&#34;decoded&#34;][&#34;position&#34;]
iface._fixupPosition(p)
# update node DB as needed
iface._getOrCreateByNum(asDict[&#34;from&#34;])[&#34;position&#34;] = p
def _onNodeInfoReceive(iface, asDict):
&#34;&#34;&#34;Special auto parsing for received messages&#34;&#34;&#34;
p = asDict[&#34;decoded&#34;][&#34;user&#34;]
# decode user protobufs and update nodedb, provide decoded version as &#34;position&#34; in the published msg
# update node DB as needed
n = iface._getOrCreateByNum(asDict[&#34;from&#34;])
n[&#34;user&#34;] = p
# We now have a node ID, make sure it is uptodate in that table
iface.nodes[p[&#34;id&#34;]] = n
&#34;&#34;&#34;Well known message payloads can register decoders for automatic protobuf parsing&#34;&#34;&#34;
protocols = {
portnums_pb2.PortNum.TEXT_MESSAGE_APP: KnownProtocol(&#34;text&#34;, onReceive=_onTextReceive),
portnums_pb2.PortNum.POSITION_APP: KnownProtocol(&#34;position&#34;, mesh_pb2.Position, _onPositionReceive),
portnums_pb2.PortNum.NODEINFO_APP: KnownProtocol(&#34;user&#34;, mesh_pb2.User, _onNodeInfoReceive),
portnums_pb2.PortNum.ADMIN_APP: KnownProtocol(&#34;admin&#34;, admin_pb2.AdminMessage),
portnums_pb2.PortNum.ROUTING_APP: KnownProtocol(&#34;routing&#34;, mesh_pb2.Routing),
portnums_pb2.PortNum.ENVIRONMENTAL_MEASUREMENT_APP: KnownProtocol(&#34;environmental&#34;, environmental_measurement_pb2.EnvironmentalMeasurement),
portnums_pb2.PortNum.REMOTE_HARDWARE_APP: KnownProtocol(
&#34;remotehw&#34;, remote_hardware_pb2.HardwareMessage)
}</code></pre>
</details>
</section>
<section>
<h2 class="section-title" id="header-submodules">Sub-modules</h2>
<dl>
<dt><code class="name"><a title="meshtastic.admin_pb2" href="admin_pb2.html">meshtastic.admin_pb2</a></code></dt>
<dd>
<div class="desc"><p>Generated protocol buffer code.</p></div>
</dd>
<dt><code class="name"><a title="meshtastic.apponly_pb2" href="apponly_pb2.html">meshtastic.apponly_pb2</a></code></dt>
<dd>
<div class="desc"><p>Generated protocol buffer code.</p></div>
</dd>
<dt><code class="name"><a title="meshtastic.ble" href="ble.html">meshtastic.ble</a></code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt><code class="name"><a title="meshtastic.channel_pb2" href="channel_pb2.html">meshtastic.channel_pb2</a></code></dt>
<dd>
<div class="desc"><p>Generated protocol buffer code.</p></div>
</dd>
<dt><code class="name"><a title="meshtastic.deviceonly_pb2" href="deviceonly_pb2.html">meshtastic.deviceonly_pb2</a></code></dt>
<dd>
<div class="desc"><p>Generated protocol buffer code.</p></div>
</dd>
<dt><code class="name"><a title="meshtastic.environmental_measurement_pb2" href="environmental_measurement_pb2.html">meshtastic.environmental_measurement_pb2</a></code></dt>
<dd>
<div class="desc"><p>Generated protocol buffer code.</p></div>
</dd>
<dt><code class="name"><a title="meshtastic.mesh_pb2" href="mesh_pb2.html">meshtastic.mesh_pb2</a></code></dt>
<dd>
<div class="desc"><p>Generated protocol buffer code.</p></div>
</dd>
<dt><code class="name"><a title="meshtastic.portnums_pb2" href="portnums_pb2.html">meshtastic.portnums_pb2</a></code></dt>
<dd>
<div class="desc"><p>Generated protocol buffer code.</p></div>
</dd>
<dt><code class="name"><a title="meshtastic.radioconfig_pb2" href="radioconfig_pb2.html">meshtastic.radioconfig_pb2</a></code></dt>
<dd>
<div class="desc"><p>Generated protocol buffer code.</p></div>
</dd>
<dt><code class="name"><a title="meshtastic.remote_hardware" href="remote_hardware.html">meshtastic.remote_hardware</a></code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt><code class="name"><a title="meshtastic.remote_hardware_pb2" href="remote_hardware_pb2.html">meshtastic.remote_hardware_pb2</a></code></dt>
<dd>
<div class="desc"><p>Generated protocol buffer code.</p></div>
</dd>
<dt><code class="name"><a title="meshtastic.test" href="test.html">meshtastic.test</a></code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt><code class="name"><a title="meshtastic.tunnel" href="tunnel.html">meshtastic.tunnel</a></code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt><code class="name"><a title="meshtastic.util" href="util.html">meshtastic.util</a></code></dt>
<dd>
<div class="desc"></div>
</dd>
</dl>
</section>
<section>
<h2 class="section-title" id="header-variables">Global variables</h2>
<dl>
<dt id="meshtastic.MY_CONFIG_ID"><code class="name">var <span class="ident">MY_CONFIG_ID</span></code></dt>
<dd>
<div class="desc"><p>The numeric buildnumber (shared with android apps) specifying the level of device code we are guaranteed to understand</p>
<p>format is Mmmss (where M is 1+the numeric major number. i.e. 20120 means 1.1.20</p></div>
</dd>
</dl>
</section>
<section>
</section>
<section>
<h2 class="section-title" id="header-classes">Classes</h2>
<dl>
<dt id="meshtastic.BLEInterface"><code class="flex name class">
<span>class <span class="ident">BLEInterface</span></span>
<span>(</span><span>address, debugOut=None)</span>
</code></dt>
<dd>
<div class="desc"><p>A not quite ready - FIXME - BLE interface to devices</p>
<p>Constructor</p>
<p>Keyword Arguments:
noProto &ndash; If True, don't try to run our protocol on the link - just be a dumb serial client.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">class BLEInterface(MeshInterface):
&#34;&#34;&#34;A not quite ready - FIXME - BLE interface to devices&#34;&#34;&#34;
def __init__(self, address, debugOut=None):
self.address = address
self.adapter = pygatt.GATTToolBackend() # BGAPIBackend()
self.adapter.start()
logging.debug(f&#34;Connecting to {self.address}&#34;)
self.device = self.adapter.connect(address)
logging.debug(&#34;Connected to device&#34;)
# fromradio = self.device.char_read(FROMRADIO_UUID)
MeshInterface.__init__(self, debugOut=debugOut)
self._readFromRadio() # read the initial responses
def handle_data(handle, data):
self._handleFromRadio(data)
self.device.subscribe(FROMNUM_UUID, callback=handle_data)
def _sendToRadioImpl(self, toRadio):
&#34;&#34;&#34;Send a ToRadio protobuf to the device&#34;&#34;&#34;
# logging.debug(f&#34;Sending: {stripnl(toRadio)}&#34;)
b = toRadio.SerializeToString()
self.device.char_write(TORADIO_UUID, b)
def close(self):
self.adapter.stop()
def _readFromRadio(self):
wasEmpty = False
while not wasEmpty:
b = self.device.char_read(FROMRADIO_UUID)
wasEmpty = len(b) == 0
if not wasEmpty:
self._handleFromRadio(b)</code></pre>
</details>
<h3>Ancestors</h3>
<ul class="hlist">
<li><a title="meshtastic.MeshInterface" href="#meshtastic.MeshInterface">MeshInterface</a></li>
</ul>
<h3>Methods</h3>
<dl>
<dt id="meshtastic.BLEInterface.close"><code class="name flex">
<span>def <span class="ident">close</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def close(self):
self.adapter.stop()</code></pre>
</details>
</dd>
</dl>
<h3>Inherited members</h3>
<ul class="hlist">
<li><code><b><a title="meshtastic.MeshInterface" href="#meshtastic.MeshInterface">MeshInterface</a></b></code>:
<ul class="hlist">
<li><code><a title="meshtastic.MeshInterface.channelURL" href="#meshtastic.MeshInterface.channelURL">channelURL</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendData" href="#meshtastic.MeshInterface.sendData">sendData</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendPosition" href="#meshtastic.MeshInterface.sendPosition">sendPosition</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendText" href="#meshtastic.MeshInterface.sendText">sendText</a></code></li>
<li><code><a title="meshtastic.MeshInterface.setOwner" href="#meshtastic.MeshInterface.setOwner">setOwner</a></code></li>
<li><code><a title="meshtastic.MeshInterface.setURL" href="#meshtastic.MeshInterface.setURL">setURL</a></code></li>
<li><code><a title="meshtastic.MeshInterface.waitForConfig" href="#meshtastic.MeshInterface.waitForConfig">waitForConfig</a></code></li>
<li><code><a title="meshtastic.MeshInterface.writeChannel" href="#meshtastic.MeshInterface.writeChannel">writeChannel</a></code></li>
<li><code><a title="meshtastic.MeshInterface.writeConfig" href="#meshtastic.MeshInterface.writeConfig">writeConfig</a></code></li>
</ul>
</li>
</ul>
</dd>
<dt id="meshtastic.KnownProtocol"><code class="flex name class">
<span>class <span class="ident">KnownProtocol</span></span>
<span>(</span><span>name: str, protobufFactory: Callable = None, onReceive: Callable = None)</span>
</code></dt>
<dd>
<div class="desc"><p>Used to automatically decode known protocol payloads</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">class KnownProtocol(NamedTuple):
&#34;&#34;&#34;Used to automatically decode known protocol payloads&#34;&#34;&#34;
name: str
# portnum: int, now a key
# If set, will be called to prase as a protocol buffer
protobufFactory: Callable = None
# If set, invoked as onReceive(interface, packet)
onReceive: Callable = None</code></pre>
</details>
<h3>Ancestors</h3>
<ul class="hlist">
<li>builtins.tuple</li>
</ul>
<h3>Instance variables</h3>
<dl>
<dt id="meshtastic.KnownProtocol.name"><code class="name">var <span class="ident">name</span> : str</code></dt>
<dd>
<div class="desc"><p>Alias for field number 0</p></div>
</dd>
<dt id="meshtastic.KnownProtocol.onReceive"><code class="name">var <span class="ident">onReceive</span> : Callable</code></dt>
<dd>
<div class="desc"><p>Alias for field number 2</p></div>
</dd>
<dt id="meshtastic.KnownProtocol.protobufFactory"><code class="name">var <span class="ident">protobufFactory</span> : Callable</code></dt>
<dd>
<div class="desc"><p>Alias for field number 1</p></div>
</dd>
</dl>
</dd>
<dt id="meshtastic.MeshInterface"><code class="flex name class">
<span>class <span class="ident">MeshInterface</span></span>
<span>(</span><span>debugOut=None, noProto=False)</span>
</code></dt>
<dd>
<div class="desc"><p>Interface class for meshtastic devices</p>
<p>Properties:</p>
<p>isConnected
nodes
debugOut</p>
<p>Constructor</p>
<p>Keyword Arguments:
noProto &ndash; If True, don't try to run our protocol on the link - just be a dumb serial client.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">class MeshInterface:
&#34;&#34;&#34;Interface class for meshtastic devices
Properties:
isConnected
nodes
debugOut
&#34;&#34;&#34;
def __init__(self, debugOut=None, noProto=False):
&#34;&#34;&#34;Constructor
Keyword Arguments:
noProto -- If True, don&#39;t try to run our protocol on the link - just be a dumb serial client.
&#34;&#34;&#34;
self.debugOut = debugOut
self.nodes = None # FIXME
self.isConnected = threading.Event()
self.noProto = noProto
self.myInfo = None # We don&#39;t have device info yet
self.responseHandlers = {} # A map from request ID to the handler
self.failure = None # If we&#39;ve encountered a fatal exception it will be kept here
random.seed() # FIXME, we should not clobber the random seedval here, instead tell user they must call it
self.currentPacketId = random.randint(0, 0xffffffff)
self._startConfig()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None and exc_value is not None:
logging.error(
f&#39;An exception of type {exc_type} with value {exc_value} has occurred&#39;)
if traceback is not None:
logging.error(f&#39;Traceback: {traceback}&#39;)
self.close()
def sendText(self, text: AnyStr,
destinationId=BROADCAST_ADDR,
wantAck=False,
wantResponse=False,
hopLimit=defaultHopLimit,
onResponse=None):
&#34;&#34;&#34;Send a utf8 string to some other node, if the node has a display it will also be shown on the device.
Arguments:
text {string} -- The text to send
Keyword Arguments:
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
wantResponse -- True if you want the service on the other side to send an application layer response
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
return self.sendData(text.encode(&#34;utf-8&#34;), destinationId,
portNum=portnums_pb2.PortNum.TEXT_MESSAGE_APP,
wantAck=wantAck,
wantResponse=wantResponse,
hopLimit=hopLimit,
onResponse=onResponse)
def sendData(self, data, destinationId=BROADCAST_ADDR,
portNum=portnums_pb2.PortNum.PRIVATE_APP, wantAck=False,
wantResponse=False,
hopLimit=defaultHopLimit,
onResponse=None):
&#34;&#34;&#34;Send a data packet to some other node
Keyword Arguments:
data -- the data to send, either as an array of bytes or as a protobuf (which will be automatically serialized to bytes)
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
wantResponse -- True if you want the service on the other side to send an application layer response
onResponse -- A closure of the form funct(packet), that will be called when a response packet arrives (or the transaction is NAKed due to non receipt)
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
if getattr(data, &#34;SerializeToString&#34;, None):
logging.debug(f&#34;Serializing protobuf as data: {stripnl(data)}&#34;)
data = data.SerializeToString()
if len(data) &gt; mesh_pb2.Constants.DATA_PAYLOAD_LEN:
raise Exception(&#34;Data payload too big&#34;)
if portNum == portnums_pb2.PortNum.UNKNOWN_APP: # we are now more strict wrt port numbers
raise Exception(&#34;A non-zero port number must be specified&#34;)
meshPacket = mesh_pb2.MeshPacket()
meshPacket.decoded.payload = data
meshPacket.decoded.portnum = portNum
meshPacket.decoded.want_response = wantResponse
p = self._sendPacket(meshPacket, destinationId, wantAck=wantAck, hopLimit=hopLimit)
if onResponse is not None:
self._addResponseHandler(p.id, onResponse)
return p
def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False):
&#34;&#34;&#34;
Send a position packet to some other node (normally a broadcast)
Also, the device software will notice this packet and use it to automatically set its notion of
the local position.
If timeSec is not specified (recommended), we will use the local machine time.
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
p = mesh_pb2.Position()
if(latitude != 0.0):
p.latitude_i = int(latitude / 1e-7)
if(longitude != 0.0):
p.longitude_i = int(longitude / 1e-7)
if(altitude != 0):
p.altitude = int(altitude)
if timeSec == 0:
timeSec = time.time() # returns unix timestamp in seconds
p.time = int(timeSec)
return self.sendData(p, destinationId,
portNum=portnums_pb2.PortNum.POSITION_APP,
wantAck=wantAck,
wantResponse=wantResponse)
def _addResponseHandler(self, requestId, callback):
self.responseHandlers[requestId] = ResponseHandler(callback)
def _sendPacket(self, meshPacket,
destinationId=BROADCAST_ADDR,
wantAck=False, hopLimit=defaultHopLimit):
&#34;&#34;&#34;Send a MeshPacket to the specified node (or if unspecified, broadcast).
You probably don&#39;t want this - use sendData instead.
Returns the sent packet. The id field will be populated in this packet and
can be used to track future message acks/naks.
&#34;&#34;&#34;
# We allow users to talk to the local node before we&#39;ve completed the full connection flow...
if(self.myInfo is not None and destinationId != self.myInfo.my_node_num):
self._waitConnected()
toRadio = mesh_pb2.ToRadio()
if destinationId is None:
raise Exception(&#34;destinationId must not be None&#34;)
elif isinstance(destinationId, int):
nodeNum = destinationId
elif destinationId == BROADCAST_ADDR:
nodeNum = BROADCAST_NUM
else:
nodeNum = self.nodes[destinationId][&#39;num&#39;]
meshPacket.to = nodeNum
meshPacket.want_ack = wantAck
meshPacket.hop_limit = hopLimit
# if the user hasn&#39;t set an ID for this packet (likely and recommended), we should pick a new unique ID
# so the message can be tracked.
if meshPacket.id == 0:
meshPacket.id = self._generatePacketId()
toRadio.packet.CopyFrom(meshPacket)
#logging.debug(f&#34;Sending packet: {stripnl(meshPacket)}&#34;)
self._sendToRadio(toRadio)
return meshPacket
def waitForConfig(self, sleep=.1, maxsecs=20, attrs=(&#39;myInfo&#39;, &#39;nodes&#39;, &#39;radioConfig&#39;, &#39;channels&#39;)):
&#34;&#34;&#34;Block until radio config is received. Returns True if config has been received.&#34;&#34;&#34;
for _ in range(int(maxsecs/sleep)):
if all(map(lambda a: getattr(self, a, None), attrs)):
return True
time.sleep(sleep)
return False
def writeConfig(self):
&#34;&#34;&#34;Write the current (edited) radioConfig to the device&#34;&#34;&#34;
if self.radioConfig == None:
raise Exception(&#34;No RadioConfig has been read&#34;)
p = admin_pb2.AdminMessage()
p.set_radio.CopyFrom(self.radioConfig)
self.sendData(p, self.myInfo.my_node_num,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=True)
logging.debug(&#34;Wrote config&#34;)
def writeChannel(self, channelIndex):
&#34;&#34;&#34;Write the current (edited) channel to the device&#34;&#34;&#34;
p = admin_pb2.AdminMessage()
p.set_channel.CopyFrom(self.channels[channelIndex])
self.sendData(p, self.myInfo.my_node_num,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=True)
logging.debug(&#34;Wrote channel {channelIndex}&#34;)
def getMyNodeInfo(self):
if self.myInfo is None:
return None
return self.nodesByNum.get(self.myInfo.my_node_num)
def getMyUser(self):
nodeInfo = self.getMyNodeInfo()
if nodeInfo is not None:
return nodeInfo.get(&#39;user&#39;)
return None
def getLongName(self):
user = self.getMyUser()
if user is not None:
return user.get(&#39;longName&#39;, None)
return None
def getShortName(self):
user = self.getMyUser()
if user is not None:
return user.get(&#39;shortName&#39;, None)
return None
def setOwner(self, long_name, short_name=None):
&#34;&#34;&#34;Set device owner name&#34;&#34;&#34;
nChars = 3
minChars = 2
if long_name is not None:
long_name = long_name.strip()
if short_name is None:
words = long_name.split()
if len(long_name) &lt;= nChars:
short_name = long_name
elif len(words) &gt;= minChars:
short_name = &#39;&#39;.join(map(lambda word: word[0], words))
else:
trans = str.maketrans(dict.fromkeys(&#39;aeiouAEIOU&#39;))
short_name = long_name[0] + long_name[1:].translate(trans)
if len(short_name) &lt; nChars:
short_name = long_name[:nChars]
p = admin_pb2.AdminMessage()
if long_name is not None:
p.set_owner.long_name = long_name
if short_name is not None:
short_name = short_name.strip()
if len(short_name) &gt; nChars:
short_name = short_name[:nChars]
p.set_owner.short_name = short_name
return self.sendData(p, self.myInfo.my_node_num,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=True)
@property
def channelURL(self):
&#34;&#34;&#34;The sharable URL that describes the current channel
&#34;&#34;&#34;
# Only keep the primary/secondary channels, assume primary is first
channelSet = apponly_pb2.ChannelSet()
for c in self.channels:
if c.role != channel_pb2.Channel.Role.DISABLED:
channelSet.settings.append(c.settings)
bytes = channelSet.SerializeToString()
s = base64.urlsafe_b64encode(bytes).decode(&#39;ascii&#39;)
return f&#34;https://www.meshtastic.org/d/#{s}&#34;.replace(&#34;=&#34;, &#34;&#34;)
def setURL(self, url):
&#34;&#34;&#34;Set mesh network URL&#34;&#34;&#34;
if self.radioConfig == None:
raise Exception(&#34;No RadioConfig has been read&#34;)
# URLs are of the form https://www.meshtastic.org/d/#{base64_channel_set}
# Split on &#39;/#&#39; to find the base64 encoded channel settings
splitURL = url.split(&#34;/#&#34;)
decodedURL = base64.urlsafe_b64decode(splitURL[-1])
channelSet = apponly_pb2.ChannelSet()
channelSet.ParseFromString(decodedURL)
i = 0
for chs in channelSet.settings:
ch = channel_pb2.Channel()
ch.role = channel_pb2.Channel.Role.PRIMARY if i == 0 else channel_pb2.Channel.Role.SECONDARY
ch.index = i
ch.settings.CopyFrom(chs)
self.channels[ch.index] = ch
self.writeChannel(ch.index)
i = i + 1
def _waitConnected(self):
&#34;&#34;&#34;Block until the initial node db download is complete, or timeout
and raise an exception&#34;&#34;&#34;
if not self.isConnected.wait(5.0): # timeout after 5 seconds
raise Exception(&#34;Timed out waiting for connection completion&#34;)
# If we failed while connecting, raise the connection to the client
if self.failure:
raise self.failure
def _generatePacketId(self):
&#34;&#34;&#34;Get a new unique packet ID&#34;&#34;&#34;
if self.currentPacketId is None:
raise Exception(&#34;Not connected yet, can not generate packet&#34;)
else:
self.currentPacketId = (self.currentPacketId + 1) &amp; 0xffffffff
return self.currentPacketId
def _disconnected(self):
&#34;&#34;&#34;Called by subclasses to tell clients this interface has disconnected&#34;&#34;&#34;
self.isConnected.clear()
catchAndIgnore(&#34;disconnection publish&#34;, lambda: pub.sendMessage(
&#34;meshtastic.connection.lost&#34;, interface=self))
def _connected(self):
&#34;&#34;&#34;Called by this class to tell clients we are now fully connected to a node
&#34;&#34;&#34;
self.isConnected.set()
catchAndIgnore(&#34;connection publish&#34;, lambda: pub.sendMessage(
&#34;meshtastic.connection.established&#34;, interface=self))
def _startConfig(self):
&#34;&#34;&#34;Start device packets flowing&#34;&#34;&#34;
self.myInfo = None
self.nodes = {} # nodes keyed by ID
self.nodesByNum = {} # nodes keyed by nodenum
self.radioConfig = None
self.channels = None
self.partialChannels = [] # We keep our channels in a temp array until finished
startConfig = mesh_pb2.ToRadio()
startConfig.want_config_id = MY_CONFIG_ID # we don&#39;t use this value
self._sendToRadio(startConfig)
def _sendToRadio(self, toRadio):
&#34;&#34;&#34;Send a ToRadio protobuf to the device&#34;&#34;&#34;
if self.noProto:
logging.warn(
f&#34;Not sending packet because protocol use is disabled by noProto&#34;)
else:
#logging.debug(f&#34;Sending toRadio: {stripnl(toRadio)}&#34;)
self._sendToRadioImpl(toRadio)
def _sendToRadioImpl(self, toRadio):
&#34;&#34;&#34;Send a ToRadio protobuf to the device&#34;&#34;&#34;
logging.error(f&#34;Subclass must provide toradio: {toRadio}&#34;)
def _handleConfigComplete(self):
&#34;&#34;&#34;
Done with initial config messages, now send regular MeshPackets to ask for settings and channels
&#34;&#34;&#34;
self._requestSettings()
self._requestChannel(0)
def _requestSettings(self):
&#34;&#34;&#34;
Done with initial config messages, now send regular MeshPackets to ask for settings
&#34;&#34;&#34;
p = admin_pb2.AdminMessage()
p.get_radio_request = True
def onResponse(p):
&#34;&#34;&#34;A closure to handle the response packet&#34;&#34;&#34;
self.radioConfig = p[&#34;decoded&#34;][&#34;admin&#34;][&#34;raw&#34;].get_radio_response
return self.sendData(p, self.myInfo.my_node_num,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=True,
wantResponse=True,
onResponse=onResponse)
def _requestChannel(self, channelNum: int):
&#34;&#34;&#34;
Done with initial config messages, now send regular MeshPackets to ask for settings
&#34;&#34;&#34;
p = admin_pb2.AdminMessage()
p.get_channel_request = channelNum + 1
logging.debug(f&#34;Requesting channel {channelNum}&#34;)
def onResponse(p):
&#34;&#34;&#34;A closure to handle the response packet&#34;&#34;&#34;
c = p[&#34;decoded&#34;][&#34;admin&#34;][&#34;raw&#34;].get_channel_response
self.partialChannels.append(c)
logging.debug(f&#34;Received channel {stripnl(c)}&#34;)
index = c.index
# for stress testing, we can always download all channels
fastChannelDownload = True
# Once we see a response that has NO settings, assume we are at the end of channels and stop fetching
quitEarly = (c.role == channel_pb2.Channel.Role.DISABLED) and fastChannelDownload
if quitEarly or index &gt;= self.myInfo.max_channels - 1:
self.channels = self.partialChannels
# FIXME, the following should only be called after we have settings and channels
self._connected() # Tell everone else we are ready to go
else:
self._requestChannel(index + 1)
return self.sendData(p, self.myInfo.my_node_num,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=True,
wantResponse=True,
onResponse=onResponse)
def _handleFromRadio(self, fromRadioBytes):
&#34;&#34;&#34;
Handle a packet that arrived from the radio(update model and publish events)
Called by subclasses.&#34;&#34;&#34;
fromRadio = mesh_pb2.FromRadio()
fromRadio.ParseFromString(fromRadioBytes)
asDict = google.protobuf.json_format.MessageToDict(fromRadio)
if fromRadio.HasField(&#34;my_info&#34;):
self.myInfo = fromRadio.my_info
logging.debug(f&#34;Received myinfo: {stripnl(fromRadio.my_info)}&#34;)
failmsg = None
# Check for app too old
if self.myInfo.min_app_version &gt; OUR_APP_VERSION:
failmsg = &#34;This device needs a newer python client, please \&#34;pip install --upgrade meshtastic\&#34;. For more information see https://tinyurl.com/5bjsxu32&#34;
# check for firmware too old
if self.myInfo.max_channels == 0:
failmsg = &#34;This version of meshtastic-python requires device firmware version 1.2 or later. For more information see https://tinyurl.com/5bjsxu32&#34;
if failmsg:
self.failure = Exception(failmsg)
self.isConnected.set() # let waitConnected return this exception
self.close()
elif fromRadio.HasField(&#34;node_info&#34;):
node = asDict[&#34;nodeInfo&#34;]
try:
self._fixupPosition(node[&#34;position&#34;])
except:
logging.debug(&#34;Node without position&#34;)
logging.debug(f&#34;Received nodeinfo: {node}&#34;)
self.nodesByNum[node[&#34;num&#34;]] = node
if &#34;user&#34; in node: # Some nodes might not have user/ids assigned yet
self.nodes[node[&#34;user&#34;][&#34;id&#34;]] = node
pub.sendMessage(&#34;meshtastic.node.updated&#34;,
node=node, interface=self)
elif fromRadio.config_complete_id == MY_CONFIG_ID:
# we ignore the config_complete_id, it is unneeded for our stream API fromRadio.config_complete_id
self._handleConfigComplete()
elif fromRadio.HasField(&#34;packet&#34;):
self._handlePacketFromRadio(fromRadio.packet)
elif fromRadio.rebooted:
# Tell clients the device went away. Careful not to call the overridden subclass version that closes the serial port
MeshInterface._disconnected(self)
self._startConfig() # redownload the node db etc...
else:
logging.debug(&#34;Unexpected FromRadio payload&#34;)
def _fixupPosition(self, position):
&#34;&#34;&#34;Convert integer lat/lon into floats
Arguments:
position {Position dictionary} -- object ot fix up
&#34;&#34;&#34;
if &#34;latitudeI&#34; in position:
position[&#34;latitude&#34;] = position[&#34;latitudeI&#34;] * 1e-7
if &#34;longitudeI&#34; in position:
position[&#34;longitude&#34;] = position[&#34;longitudeI&#34;] * 1e-7
def _nodeNumToId(self, num):
&#34;&#34;&#34;Map a node node number to a node ID
Arguments:
num {int} -- Node number
Returns:
string -- Node ID
&#34;&#34;&#34;
if num == BROADCAST_NUM:
return BROADCAST_ADDR
try:
return self.nodesByNum[num][&#34;user&#34;][&#34;id&#34;]
except:
logging.warn(&#34;Node not found for fromId&#34;)
return None
def _getOrCreateByNum(self, nodeNum):
&#34;&#34;&#34;Given a nodenum find the NodeInfo in the DB (or create if necessary)&#34;&#34;&#34;
if nodeNum == BROADCAST_NUM:
raise Exception(&#34;Can not create/find nodenum by the broadcast num&#34;)
if nodeNum in self.nodesByNum:
return self.nodesByNum[nodeNum]
else:
n = {&#34;num&#34;: nodeNum} # Create a minimial node db entry
self.nodesByNum[nodeNum] = n
return n
def _handlePacketFromRadio(self, meshPacket):
&#34;&#34;&#34;Handle a MeshPacket that just arrived from the radio
Will publish one of the following events:
- meshtastic.receive.text(packet = MeshPacket dictionary)
- meshtastic.receive.position(packet = MeshPacket dictionary)
- meshtastic.receive.user(packet = MeshPacket dictionary)
- meshtastic.receive.data(packet = MeshPacket dictionary)
&#34;&#34;&#34;
asDict = google.protobuf.json_format.MessageToDict(meshPacket)
# We normally decompose the payload into a dictionary so that the client
# doesn&#39;t need to understand protobufs. But advanced clients might
# want the raw protobuf, so we provide it in &#34;raw&#34;
asDict[&#34;raw&#34;] = meshPacket
# from might be missing if the nodenum was zero.
if not &#34;from&#34; in asDict:
asDict[&#34;from&#34;] = 0
logging.error(f&#34;Device returned a packet we sent, ignoring: {stripnl(asDict)}&#34;)
return
if not &#34;to&#34; in asDict:
asDict[&#34;to&#34;] = 0
# /add fromId and toId fields based on the node ID
asDict[&#34;fromId&#34;] = self._nodeNumToId(asDict[&#34;from&#34;])
asDict[&#34;toId&#34;] = self._nodeNumToId(asDict[&#34;to&#34;])
# We could provide our objects as DotMaps - which work with . notation or as dictionaries
# asObj = DotMap(asDict)
topic = &#34;meshtastic.receive&#34; # Generic unknown packet type
decoded = asDict[&#34;decoded&#34;]
# The default MessageToDict converts byte arrays into base64 strings.
# We don&#39;t want that - it messes up data payload. So slam in the correct
# byte array.
decoded[&#34;payload&#34;] = meshPacket.decoded.payload
# UNKNOWN_APP is the default protobuf portnum value, and therefore if not set it will not be populated at all
# to make API usage easier, set it to prevent confusion
if not &#34;portnum&#34; in decoded:
decoded[&#34;portnum&#34;] = portnums_pb2.PortNum.Name(
portnums_pb2.PortNum.UNKNOWN_APP)
portnum = decoded[&#34;portnum&#34;]
topic = f&#34;meshtastic.receive.data.{portnum}&#34;
# decode position protobufs and update nodedb, provide decoded version as &#34;position&#34; in the published msg
# move the following into a &#39;decoders&#39; API that clients could register?
portNumInt = meshPacket.decoded.portnum # we want portnum as an int
handler = protocols.get(portNumInt)
# The decoded protobuf as a dictionary (if we understand this message)
p = None
if handler is not None:
topic = f&#34;meshtastic.receive.{handler.name}&#34;
# Convert to protobuf if possible
if handler.protobufFactory is not None:
pb = handler.protobufFactory()
pb.ParseFromString(meshPacket.decoded.payload)
p = google.protobuf.json_format.MessageToDict(pb)
asDict[&#34;decoded&#34;][handler.name] = p
# Also provide the protobuf raw
asDict[&#34;decoded&#34;][handler.name][&#34;raw&#34;] = pb
# Call specialized onReceive if necessary
if handler.onReceive is not None:
handler.onReceive(self, asDict)
# Is this message in response to a request, if so, look for a handler
requestId = decoded.get(&#34;requestId&#34;)
if requestId is not None:
# We ignore ACK packets, but send NAKs and data responses to the handlers
routing = decoded.get(&#34;routing&#34;)
isAck = routing is not None and (&#34;errorReason&#34; not in routing)
if not isAck:
# we keep the responseHandler in dict until we get a non ack
handler = self.responseHandlers.pop(requestId, None)
if handler is not None:
handler.callback(asDict)
logging.debug(f&#34;Publishing {topic}: packet={stripnl(asDict)} &#34;)
catchAndIgnore(f&#34;publishing {topic}&#34;, lambda: pub.sendMessage(
topic, packet=asDict, interface=self))</code></pre>
</details>
<h3>Subclasses</h3>
<ul class="hlist">
<li><a title="meshtastic.BLEInterface" href="#meshtastic.BLEInterface">BLEInterface</a></li>
<li><a title="meshtastic.StreamInterface" href="#meshtastic.StreamInterface">StreamInterface</a></li>
</ul>
<h3>Instance variables</h3>
<dl>
<dt id="meshtastic.MeshInterface.channelURL"><code class="name">var <span class="ident">channelURL</span></code></dt>
<dd>
<div class="desc"><p>The sharable URL that describes the current channel</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">@property
def channelURL(self):
&#34;&#34;&#34;The sharable URL that describes the current channel
&#34;&#34;&#34;
# Only keep the primary/secondary channels, assume primary is first
channelSet = apponly_pb2.ChannelSet()
for c in self.channels:
if c.role != channel_pb2.Channel.Role.DISABLED:
channelSet.settings.append(c.settings)
bytes = channelSet.SerializeToString()
s = base64.urlsafe_b64encode(bytes).decode(&#39;ascii&#39;)
return f&#34;https://www.meshtastic.org/d/#{s}&#34;.replace(&#34;=&#34;, &#34;&#34;)</code></pre>
</details>
</dd>
</dl>
<h3>Methods</h3>
<dl>
<dt id="meshtastic.MeshInterface.getLongName"><code class="name flex">
<span>def <span class="ident">getLongName</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def getLongName(self):
user = self.getMyUser()
if user is not None:
return user.get(&#39;longName&#39;, None)
return None</code></pre>
</details>
</dd>
<dt id="meshtastic.MeshInterface.getMyNodeInfo"><code class="name flex">
<span>def <span class="ident">getMyNodeInfo</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def getMyNodeInfo(self):
if self.myInfo is None:
return None
return self.nodesByNum.get(self.myInfo.my_node_num)</code></pre>
</details>
</dd>
<dt id="meshtastic.MeshInterface.getMyUser"><code class="name flex">
<span>def <span class="ident">getMyUser</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def getMyUser(self):
nodeInfo = self.getMyNodeInfo()
if nodeInfo is not None:
return nodeInfo.get(&#39;user&#39;)
return None</code></pre>
</details>
</dd>
<dt id="meshtastic.MeshInterface.getShortName"><code class="name flex">
<span>def <span class="ident">getShortName</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def getShortName(self):
user = self.getMyUser()
if user is not None:
return user.get(&#39;shortName&#39;, None)
return None</code></pre>
</details>
</dd>
<dt id="meshtastic.MeshInterface.sendData"><code class="name flex">
<span>def <span class="ident">sendData</span></span>(<span>self, data, destinationId='^all', portNum=256, wantAck=False, wantResponse=False, hopLimit=3, onResponse=None)</span>
</code></dt>
<dd>
<div class="desc"><p>Send a data packet to some other node</p>
<p>Keyword Arguments:
data &ndash; the data to send, either as an array of bytes or as a protobuf (which will be automatically serialized to bytes)
destinationId {nodeId or nodeNum} &ndash; where to send this message (default: {BROADCAST_ADDR})
portNum &ndash; the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck &ndash; True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
wantResponse &ndash; True if you want the service on the other side to send an application layer response
onResponse &ndash; A closure of the form funct(packet), that will be called when a response packet arrives (or the transaction is NAKed due to non receipt)</p>
<p>Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def sendData(self, data, destinationId=BROADCAST_ADDR,
portNum=portnums_pb2.PortNum.PRIVATE_APP, wantAck=False,
wantResponse=False,
hopLimit=defaultHopLimit,
onResponse=None):
&#34;&#34;&#34;Send a data packet to some other node
Keyword Arguments:
data -- the data to send, either as an array of bytes or as a protobuf (which will be automatically serialized to bytes)
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
wantResponse -- True if you want the service on the other side to send an application layer response
onResponse -- A closure of the form funct(packet), that will be called when a response packet arrives (or the transaction is NAKed due to non receipt)
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
if getattr(data, &#34;SerializeToString&#34;, None):
logging.debug(f&#34;Serializing protobuf as data: {stripnl(data)}&#34;)
data = data.SerializeToString()
if len(data) &gt; mesh_pb2.Constants.DATA_PAYLOAD_LEN:
raise Exception(&#34;Data payload too big&#34;)
if portNum == portnums_pb2.PortNum.UNKNOWN_APP: # we are now more strict wrt port numbers
raise Exception(&#34;A non-zero port number must be specified&#34;)
meshPacket = mesh_pb2.MeshPacket()
meshPacket.decoded.payload = data
meshPacket.decoded.portnum = portNum
meshPacket.decoded.want_response = wantResponse
p = self._sendPacket(meshPacket, destinationId, wantAck=wantAck, hopLimit=hopLimit)
if onResponse is not None:
self._addResponseHandler(p.id, onResponse)
return p</code></pre>
</details>
</dd>
<dt id="meshtastic.MeshInterface.sendPosition"><code class="name flex">
<span>def <span class="ident">sendPosition</span></span>(<span>self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0, destinationId='^all', wantAck=False, wantResponse=False)</span>
</code></dt>
<dd>
<div class="desc"><p>Send a position packet to some other node (normally a broadcast)</p>
<p>Also, the device software will notice this packet and use it to automatically set its notion of
the local position.</p>
<p>If timeSec is not specified (recommended), we will use the local machine time.</p>
<p>Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False):
&#34;&#34;&#34;
Send a position packet to some other node (normally a broadcast)
Also, the device software will notice this packet and use it to automatically set its notion of
the local position.
If timeSec is not specified (recommended), we will use the local machine time.
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
p = mesh_pb2.Position()
if(latitude != 0.0):
p.latitude_i = int(latitude / 1e-7)
if(longitude != 0.0):
p.longitude_i = int(longitude / 1e-7)
if(altitude != 0):
p.altitude = int(altitude)
if timeSec == 0:
timeSec = time.time() # returns unix timestamp in seconds
p.time = int(timeSec)
return self.sendData(p, destinationId,
portNum=portnums_pb2.PortNum.POSITION_APP,
wantAck=wantAck,
wantResponse=wantResponse)</code></pre>
</details>
</dd>
<dt id="meshtastic.MeshInterface.sendText"><code class="name flex">
<span>def <span class="ident">sendText</span></span>(<span>self, text: ~AnyStr, destinationId='^all', wantAck=False, wantResponse=False, hopLimit=3, onResponse=None)</span>
</code></dt>
<dd>
<div class="desc"><p>Send a utf8 string to some other node, if the node has a display it will also be shown on the device.</p>
<h2 id="arguments">Arguments</h2>
<p>text {string} &ndash; The text to send</p>
<p>Keyword Arguments:
destinationId {nodeId or nodeNum} &ndash; where to send this message (default: {BROADCAST_ADDR})
portNum &ndash; the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck &ndash; True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
wantResponse &ndash; True if you want the service on the other side to send an application layer response</p>
<p>Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def sendText(self, text: AnyStr,
destinationId=BROADCAST_ADDR,
wantAck=False,
wantResponse=False,
hopLimit=defaultHopLimit,
onResponse=None):
&#34;&#34;&#34;Send a utf8 string to some other node, if the node has a display it will also be shown on the device.
Arguments:
text {string} -- The text to send
Keyword Arguments:
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
portNum -- the application portnum (similar to IP port numbers) of the destination, see portnums.proto for a list
wantAck -- True if you want the message sent in a reliable manner (with retries and ack/nak provided for delivery)
wantResponse -- True if you want the service on the other side to send an application layer response
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
return self.sendData(text.encode(&#34;utf-8&#34;), destinationId,
portNum=portnums_pb2.PortNum.TEXT_MESSAGE_APP,
wantAck=wantAck,
wantResponse=wantResponse,
hopLimit=hopLimit,
onResponse=onResponse)</code></pre>
</details>
</dd>
<dt id="meshtastic.MeshInterface.setOwner"><code class="name flex">
<span>def <span class="ident">setOwner</span></span>(<span>self, long_name, short_name=None)</span>
</code></dt>
<dd>
<div class="desc"><p>Set device owner name</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def setOwner(self, long_name, short_name=None):
&#34;&#34;&#34;Set device owner name&#34;&#34;&#34;
nChars = 3
minChars = 2
if long_name is not None:
long_name = long_name.strip()
if short_name is None:
words = long_name.split()
if len(long_name) &lt;= nChars:
short_name = long_name
elif len(words) &gt;= minChars:
short_name = &#39;&#39;.join(map(lambda word: word[0], words))
else:
trans = str.maketrans(dict.fromkeys(&#39;aeiouAEIOU&#39;))
short_name = long_name[0] + long_name[1:].translate(trans)
if len(short_name) &lt; nChars:
short_name = long_name[:nChars]
p = admin_pb2.AdminMessage()
if long_name is not None:
p.set_owner.long_name = long_name
if short_name is not None:
short_name = short_name.strip()
if len(short_name) &gt; nChars:
short_name = short_name[:nChars]
p.set_owner.short_name = short_name
return self.sendData(p, self.myInfo.my_node_num,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=True)</code></pre>
</details>
</dd>
<dt id="meshtastic.MeshInterface.setURL"><code class="name flex">
<span>def <span class="ident">setURL</span></span>(<span>self, url)</span>
</code></dt>
<dd>
<div class="desc"><p>Set mesh network URL</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def setURL(self, url):
&#34;&#34;&#34;Set mesh network URL&#34;&#34;&#34;
if self.radioConfig == None:
raise Exception(&#34;No RadioConfig has been read&#34;)
# URLs are of the form https://www.meshtastic.org/d/#{base64_channel_set}
# Split on &#39;/#&#39; to find the base64 encoded channel settings
splitURL = url.split(&#34;/#&#34;)
decodedURL = base64.urlsafe_b64decode(splitURL[-1])
channelSet = apponly_pb2.ChannelSet()
channelSet.ParseFromString(decodedURL)
i = 0
for chs in channelSet.settings:
ch = channel_pb2.Channel()
ch.role = channel_pb2.Channel.Role.PRIMARY if i == 0 else channel_pb2.Channel.Role.SECONDARY
ch.index = i
ch.settings.CopyFrom(chs)
self.channels[ch.index] = ch
self.writeChannel(ch.index)
i = i + 1</code></pre>
</details>
</dd>
<dt id="meshtastic.MeshInterface.waitForConfig"><code class="name flex">
<span>def <span class="ident">waitForConfig</span></span>(<span>self, sleep=0.1, maxsecs=20, attrs=('myInfo', 'nodes', 'radioConfig', 'channels'))</span>
</code></dt>
<dd>
<div class="desc"><p>Block until radio config is received. Returns True if config has been received.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def waitForConfig(self, sleep=.1, maxsecs=20, attrs=(&#39;myInfo&#39;, &#39;nodes&#39;, &#39;radioConfig&#39;, &#39;channels&#39;)):
&#34;&#34;&#34;Block until radio config is received. Returns True if config has been received.&#34;&#34;&#34;
for _ in range(int(maxsecs/sleep)):
if all(map(lambda a: getattr(self, a, None), attrs)):
return True
time.sleep(sleep)
return False</code></pre>
</details>
</dd>
<dt id="meshtastic.MeshInterface.writeChannel"><code class="name flex">
<span>def <span class="ident">writeChannel</span></span>(<span>self, channelIndex)</span>
</code></dt>
<dd>
<div class="desc"><p>Write the current (edited) channel to the device</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def writeChannel(self, channelIndex):
&#34;&#34;&#34;Write the current (edited) channel to the device&#34;&#34;&#34;
p = admin_pb2.AdminMessage()
p.set_channel.CopyFrom(self.channels[channelIndex])
self.sendData(p, self.myInfo.my_node_num,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=True)
logging.debug(&#34;Wrote channel {channelIndex}&#34;)</code></pre>
</details>
</dd>
<dt id="meshtastic.MeshInterface.writeConfig"><code class="name flex">
<span>def <span class="ident">writeConfig</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"><p>Write the current (edited) radioConfig to the device</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def writeConfig(self):
&#34;&#34;&#34;Write the current (edited) radioConfig to the device&#34;&#34;&#34;
if self.radioConfig == None:
raise Exception(&#34;No RadioConfig has been read&#34;)
p = admin_pb2.AdminMessage()
p.set_radio.CopyFrom(self.radioConfig)
self.sendData(p, self.myInfo.my_node_num,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=True)
logging.debug(&#34;Wrote config&#34;)</code></pre>
</details>
</dd>
</dl>
</dd>
<dt id="meshtastic.ResponseHandler"><code class="flex name class">
<span>class <span class="ident">ResponseHandler</span></span>
<span>(</span><span>callback: Callable)</span>
</code></dt>
<dd>
<div class="desc"><p>A pending response callback, waiting for a response to one of our messages</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">class ResponseHandler(NamedTuple):
&#34;&#34;&#34;A pending response callback, waiting for a response to one of our messages&#34;&#34;&#34;
# requestId: int - used only as a key
callback: Callable</code></pre>
</details>
<h3>Ancestors</h3>
<ul class="hlist">
<li>builtins.tuple</li>
</ul>
<h3>Instance variables</h3>
<dl>
<dt id="meshtastic.ResponseHandler.callback"><code class="name">var <span class="ident">callback</span> : Callable</code></dt>
<dd>
<div class="desc"><p>Alias for field number 0</p></div>
</dd>
</dl>
</dd>
<dt id="meshtastic.SerialInterface"><code class="flex name class">
<span>class <span class="ident">SerialInterface</span></span>
<span>(</span><span>devPath=None, debugOut=None, noProto=False, connectNow=True)</span>
</code></dt>
<dd>
<div class="desc"><p>Interface class for meshtastic devices over a serial link</p>
<p>Constructor, opens a connection to a specified serial port, or if unspecified try to
find one Meshtastic device by probing</p>
<p>Keyword Arguments:
devPath {string} &ndash; A filepath to a device, i.e. /dev/ttyUSB0 (default: {None})
debugOut {stream} &ndash; If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None})</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">class SerialInterface(StreamInterface):
&#34;&#34;&#34;Interface class for meshtastic devices over a serial link&#34;&#34;&#34;
def __init__(self, devPath=None, debugOut=None, noProto=False, connectNow=True):
&#34;&#34;&#34;Constructor, opens a connection to a specified serial port, or if unspecified try to
find one Meshtastic device by probing
Keyword Arguments:
devPath {string} -- A filepath to a device, i.e. /dev/ttyUSB0 (default: {None})
debugOut {stream} -- If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None})
&#34;&#34;&#34;
if devPath is None:
ports = util.findPorts()
if len(ports) == 0:
raise Exception(&#34;No Meshtastic devices detected&#34;)
elif len(ports) &gt; 1:
raise Exception(
f&#34;Multiple ports detected, you must specify a device, such as {ports[0]}&#34;)
else:
devPath = ports[0]
logging.debug(f&#34;Connecting to {devPath}&#34;)
# Note: we provide None for port here, because we will be opening it later
self.stream = serial.Serial(
None, 921600, exclusive=True, timeout=0.5)
# rts=False Needed to prevent TBEAMs resetting on OSX, because rts is connected to reset
self.stream.port = devPath
# OS-X/Windows seems to have a bug in its serial driver. It ignores that we asked for no RTSCTS
# control and will always drive RTS either high or low (rather than letting the CP102 leave
# it as an open-collector floating pin). Since it is going to drive it anyways we want to make
# sure it is driven low, so that the TBEAM won&#39;t reset
# Linux does this properly, so don&#39;t apply this hack (because it makes the reset button not work)
if platform.system() != &#39;Linux&#39;:
self.stream.rts = False
self.stream.open()
StreamInterface.__init__(
self, debugOut=debugOut, noProto=noProto, connectNow=connectNow)</code></pre>
</details>
<h3>Ancestors</h3>
<ul class="hlist">
<li><a title="meshtastic.StreamInterface" href="#meshtastic.StreamInterface">StreamInterface</a></li>
<li><a title="meshtastic.MeshInterface" href="#meshtastic.MeshInterface">MeshInterface</a></li>
</ul>
<h3>Inherited members</h3>
<ul class="hlist">
<li><code><b><a title="meshtastic.StreamInterface" href="#meshtastic.StreamInterface">StreamInterface</a></b></code>:
<ul class="hlist">
<li><code><a title="meshtastic.StreamInterface.channelURL" href="#meshtastic.MeshInterface.channelURL">channelURL</a></code></li>
<li><code><a title="meshtastic.StreamInterface.close" href="#meshtastic.StreamInterface.close">close</a></code></li>
<li><code><a title="meshtastic.StreamInterface.connect" href="#meshtastic.StreamInterface.connect">connect</a></code></li>
<li><code><a title="meshtastic.StreamInterface.sendData" href="#meshtastic.MeshInterface.sendData">sendData</a></code></li>
<li><code><a title="meshtastic.StreamInterface.sendPosition" href="#meshtastic.MeshInterface.sendPosition">sendPosition</a></code></li>
<li><code><a title="meshtastic.StreamInterface.sendText" href="#meshtastic.MeshInterface.sendText">sendText</a></code></li>
<li><code><a title="meshtastic.StreamInterface.setOwner" href="#meshtastic.MeshInterface.setOwner">setOwner</a></code></li>
<li><code><a title="meshtastic.StreamInterface.setURL" href="#meshtastic.MeshInterface.setURL">setURL</a></code></li>
<li><code><a title="meshtastic.StreamInterface.waitForConfig" href="#meshtastic.MeshInterface.waitForConfig">waitForConfig</a></code></li>
<li><code><a title="meshtastic.StreamInterface.writeChannel" href="#meshtastic.MeshInterface.writeChannel">writeChannel</a></code></li>
<li><code><a title="meshtastic.StreamInterface.writeConfig" href="#meshtastic.MeshInterface.writeConfig">writeConfig</a></code></li>
</ul>
</li>
</ul>
</dd>
<dt id="meshtastic.StreamInterface"><code class="flex name class">
<span>class <span class="ident">StreamInterface</span></span>
<span>(</span><span>debugOut=None, noProto=False, connectNow=True)</span>
</code></dt>
<dd>
<div class="desc"><p>Interface class for meshtastic devices over a stream link (serial, TCP, etc)</p>
<p>Constructor, opens a connection to self.stream </p>
<p>Keyword Arguments:
devPath {string} &ndash; A filepath to a device, i.e. /dev/ttyUSB0 (default: {None})
debugOut {stream} &ndash; If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None})</p>
<h2 id="raises">Raises</h2>
<dl>
<dt><code>Exception</code></dt>
<dd>[description]</dd>
<dt><code>Exception</code></dt>
<dd>[description]</dd>
</dl></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">class StreamInterface(MeshInterface):
&#34;&#34;&#34;Interface class for meshtastic devices over a stream link (serial, TCP, etc)&#34;&#34;&#34;
def __init__(self, debugOut=None, noProto=False, connectNow=True):
&#34;&#34;&#34;Constructor, opens a connection to self.stream
Keyword Arguments:
devPath {string} -- A filepath to a device, i.e. /dev/ttyUSB0 (default: {None})
debugOut {stream} -- If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None})
Raises:
Exception: [description]
Exception: [description]
&#34;&#34;&#34;
if not hasattr(self, &#39;stream&#39;):
raise Exception(
&#34;StreamInterface is now abstract (to update existing code create SerialInterface instead)&#34;)
self._rxBuf = bytes() # empty
self._wantExit = False
# FIXME, figure out why daemon=True causes reader thread to exit too early
self._rxThread = threading.Thread(target=self.__reader, args=())
MeshInterface.__init__(self, debugOut=debugOut, noProto=noProto)
# Start the reader thread after superclass constructor completes init
if connectNow:
self.connect()
def connect(self):
&#34;&#34;&#34;Connect to our radio
Normally this is called automatically by the constructor, but if you passed in connectNow=False you can manually
start the reading thread later.
&#34;&#34;&#34;
# Send some bogus UART characters to force a sleeping device to wake
self._writeBytes(bytes([START1, START1, START1, START1]))
time.sleep(0.1) # wait 100ms to give device time to start running
self._rxThread.start()
if not self.noProto: # Wait for the db download if using the protocol
self._waitConnected()
def _disconnected(self):
&#34;&#34;&#34;We override the superclass implementation to close our port&#34;&#34;&#34;
MeshInterface._disconnected(self)
logging.debug(&#34;Closing our port&#34;)
if not self.stream is None:
self.stream.close()
def _writeBytes(self, b):
&#34;&#34;&#34;Write an array of bytes to our stream and flush&#34;&#34;&#34;
self.stream.write(b)
self.stream.flush()
def _readBytes(self, len):
&#34;&#34;&#34;Read an array of bytes from our stream&#34;&#34;&#34;
return self.stream.read(len)
def _sendToRadioImpl(self, toRadio):
&#34;&#34;&#34;Send a ToRadio protobuf to the device&#34;&#34;&#34;
logging.debug(f&#34;Sending: {stripnl(toRadio)}&#34;)
b = toRadio.SerializeToString()
bufLen = len(b)
# We convert into a string, because the TCP code doesn&#39;t work with byte arrays
header = bytes([START1, START2, (bufLen &gt;&gt; 8) &amp; 0xff, bufLen &amp; 0xff])
self._writeBytes(header + b)
def close(self):
&#34;&#34;&#34;Close a connection to the device&#34;&#34;&#34;
logging.debug(&#34;Closing stream&#34;)
# pyserial cancel_read doesn&#39;t seem to work, therefore we ask the reader thread to close things for us
self._wantExit = True
if self._rxThread != threading.current_thread():
self._rxThread.join() # wait for it to exit
def __reader(self):
&#34;&#34;&#34;The reader thread that reads bytes from our stream&#34;&#34;&#34;
empty = bytes()
try:
while not self._wantExit:
# logging.debug(&#34;reading character&#34;)
b = self._readBytes(1)
# logging.debug(&#34;In reader loop&#34;)
if len(b) &gt; 0:
# logging.debug(f&#34;read returned {b}&#34;)
c = b[0]
ptr = len(self._rxBuf)
# Assume we want to append this byte, fixme use bytearray instead
self._rxBuf = self._rxBuf + b
if ptr == 0: # looking for START1
if c != START1:
self._rxBuf = empty # failed to find start
if self.debugOut != None:
try:
self.debugOut.write(b.decode(&#34;utf-8&#34;))
except:
self.debugOut.write(&#39;?&#39;)
elif ptr == 1: # looking for START2
if c != START2:
self._rxBuf = empty # failed to find start2
elif ptr &gt;= HEADER_LEN: # we&#39;ve at least got a header
# big endian length follos header
packetlen = (self._rxBuf[2] &lt;&lt; 8) + self._rxBuf[3]
if ptr == HEADER_LEN: # we _just_ finished reading the header, validate length
if packetlen &gt; MAX_TO_FROM_RADIO_SIZE:
self._rxBuf = empty # length ws out out bounds, restart
if len(self._rxBuf) != 0 and ptr + 1 == packetlen + HEADER_LEN:
try:
self._handleFromRadio(self._rxBuf[HEADER_LEN:])
except Exception as ex:
logging.error(
f&#34;Error while handling message from radio {ex}&#34;)
traceback.print_exc()
self._rxBuf = empty
else:
# logging.debug(f&#34;timeout&#34;)
pass
except serial.SerialException as ex:
if not self._wantExit: # We might intentionally get an exception during shutdown
logging.warn(
f&#34;Meshtastic serial port disconnected, disconnecting... {ex}&#34;)
except OSError as ex:
if not self._wantExit: # We might intentionally get an exception during shutdown
logging.error(
f&#34;Unexpected OSError, terminating meshtastic reader... {ex}&#34;)
except Exception as ex:
logging.error(
f&#34;Unexpected exception, terminating meshtastic reader... {ex}&#34;)
finally:
logging.debug(&#34;reader is exiting&#34;)
self._disconnected()</code></pre>
</details>
<h3>Ancestors</h3>
<ul class="hlist">
<li><a title="meshtastic.MeshInterface" href="#meshtastic.MeshInterface">MeshInterface</a></li>
</ul>
<h3>Subclasses</h3>
<ul class="hlist">
<li><a title="meshtastic.SerialInterface" href="#meshtastic.SerialInterface">SerialInterface</a></li>
<li><a title="meshtastic.TCPInterface" href="#meshtastic.TCPInterface">TCPInterface</a></li>
</ul>
<h3>Methods</h3>
<dl>
<dt id="meshtastic.StreamInterface.close"><code class="name flex">
<span>def <span class="ident">close</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"><p>Close a connection to the device</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def close(self):
&#34;&#34;&#34;Close a connection to the device&#34;&#34;&#34;
logging.debug(&#34;Closing stream&#34;)
# pyserial cancel_read doesn&#39;t seem to work, therefore we ask the reader thread to close things for us
self._wantExit = True
if self._rxThread != threading.current_thread():
self._rxThread.join() # wait for it to exit</code></pre>
</details>
</dd>
<dt id="meshtastic.StreamInterface.connect"><code class="name flex">
<span>def <span class="ident">connect</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"><p>Connect to our radio</p>
<p>Normally this is called automatically by the constructor, but if you passed in connectNow=False you can manually
start the reading thread later.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def connect(self):
&#34;&#34;&#34;Connect to our radio
Normally this is called automatically by the constructor, but if you passed in connectNow=False you can manually
start the reading thread later.
&#34;&#34;&#34;
# Send some bogus UART characters to force a sleeping device to wake
self._writeBytes(bytes([START1, START1, START1, START1]))
time.sleep(0.1) # wait 100ms to give device time to start running
self._rxThread.start()
if not self.noProto: # Wait for the db download if using the protocol
self._waitConnected()</code></pre>
</details>
</dd>
</dl>
<h3>Inherited members</h3>
<ul class="hlist">
<li><code><b><a title="meshtastic.MeshInterface" href="#meshtastic.MeshInterface">MeshInterface</a></b></code>:
<ul class="hlist">
<li><code><a title="meshtastic.MeshInterface.channelURL" href="#meshtastic.MeshInterface.channelURL">channelURL</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendData" href="#meshtastic.MeshInterface.sendData">sendData</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendPosition" href="#meshtastic.MeshInterface.sendPosition">sendPosition</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendText" href="#meshtastic.MeshInterface.sendText">sendText</a></code></li>
<li><code><a title="meshtastic.MeshInterface.setOwner" href="#meshtastic.MeshInterface.setOwner">setOwner</a></code></li>
<li><code><a title="meshtastic.MeshInterface.setURL" href="#meshtastic.MeshInterface.setURL">setURL</a></code></li>
<li><code><a title="meshtastic.MeshInterface.waitForConfig" href="#meshtastic.MeshInterface.waitForConfig">waitForConfig</a></code></li>
<li><code><a title="meshtastic.MeshInterface.writeChannel" href="#meshtastic.MeshInterface.writeChannel">writeChannel</a></code></li>
<li><code><a title="meshtastic.MeshInterface.writeConfig" href="#meshtastic.MeshInterface.writeConfig">writeConfig</a></code></li>
</ul>
</li>
</ul>
</dd>
<dt id="meshtastic.TCPInterface"><code class="flex name class">
<span>class <span class="ident">TCPInterface</span></span>
<span>(</span><span>hostname: ~AnyStr, debugOut=None, noProto=False, connectNow=True, portNumber=4403)</span>
</code></dt>
<dd>
<div class="desc"><p>Interface class for meshtastic devices over a TCP link</p>
<p>Constructor, opens a connection to a specified IP address/hostname</p>
<p>Keyword Arguments:
hostname {string} &ndash; Hostname/IP address of the device to connect to</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">class TCPInterface(StreamInterface):
&#34;&#34;&#34;Interface class for meshtastic devices over a TCP link&#34;&#34;&#34;
def __init__(self, hostname: AnyStr, debugOut=None, noProto=False, connectNow=True, portNumber=4403):
&#34;&#34;&#34;Constructor, opens a connection to a specified IP address/hostname
Keyword Arguments:
hostname {string} -- Hostname/IP address of the device to connect to
&#34;&#34;&#34;
logging.debug(f&#34;Connecting to {hostname}&#34;)
server_address = (hostname, portNumber)
sock = socket.create_connection(server_address)
# Instead of wrapping as a stream, we use the native socket API
# self.stream = sock.makefile(&#39;rw&#39;)
self.stream = None
self.socket = sock
StreamInterface.__init__(
self, debugOut=debugOut, noProto=noProto, connectNow=connectNow)
def close(self):
&#34;&#34;&#34;Close a connection to the device&#34;&#34;&#34;
logging.debug(&#34;Closing TCP stream&#34;)
# Sometimes the socket read might be blocked in the reader thread. Therefore we force the shutdown by closing
# the socket here
self._wantExit = True
if not self.socket is None:
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
StreamInterface.close(self)
def _writeBytes(self, b):
&#34;&#34;&#34;Write an array of bytes to our stream and flush&#34;&#34;&#34;
self.socket.send(b)
def _readBytes(self, len):
&#34;&#34;&#34;Read an array of bytes from our stream&#34;&#34;&#34;
return self.socket.recv(len)</code></pre>
</details>
<h3>Ancestors</h3>
<ul class="hlist">
<li><a title="meshtastic.StreamInterface" href="#meshtastic.StreamInterface">StreamInterface</a></li>
<li><a title="meshtastic.MeshInterface" href="#meshtastic.MeshInterface">MeshInterface</a></li>
</ul>
<h3>Inherited members</h3>
<ul class="hlist">
<li><code><b><a title="meshtastic.StreamInterface" href="#meshtastic.StreamInterface">StreamInterface</a></b></code>:
<ul class="hlist">
<li><code><a title="meshtastic.StreamInterface.channelURL" href="#meshtastic.MeshInterface.channelURL">channelURL</a></code></li>
<li><code><a title="meshtastic.StreamInterface.close" href="#meshtastic.StreamInterface.close">close</a></code></li>
<li><code><a title="meshtastic.StreamInterface.connect" href="#meshtastic.StreamInterface.connect">connect</a></code></li>
<li><code><a title="meshtastic.StreamInterface.sendData" href="#meshtastic.MeshInterface.sendData">sendData</a></code></li>
<li><code><a title="meshtastic.StreamInterface.sendPosition" href="#meshtastic.MeshInterface.sendPosition">sendPosition</a></code></li>
<li><code><a title="meshtastic.StreamInterface.sendText" href="#meshtastic.MeshInterface.sendText">sendText</a></code></li>
<li><code><a title="meshtastic.StreamInterface.setOwner" href="#meshtastic.MeshInterface.setOwner">setOwner</a></code></li>
<li><code><a title="meshtastic.StreamInterface.setURL" href="#meshtastic.MeshInterface.setURL">setURL</a></code></li>
<li><code><a title="meshtastic.StreamInterface.waitForConfig" href="#meshtastic.MeshInterface.waitForConfig">waitForConfig</a></code></li>
<li><code><a title="meshtastic.StreamInterface.writeChannel" href="#meshtastic.MeshInterface.writeChannel">writeChannel</a></code></li>
<li><code><a title="meshtastic.StreamInterface.writeConfig" href="#meshtastic.MeshInterface.writeConfig">writeConfig</a></code></li>
</ul>
</li>
</ul>
</dd>
</dl>
</section>
</article>
<nav id="sidebar">
<h1>Index</h1>
<div class="toc">
<ul>
<li><a href="#an-api-for-meshtastic-devices">an API for Meshtastic devices</a></li>
<li><a href="#published-pubsub-topics">Published PubSub topics</a></li>
<li><a href="#example-usage">Example Usage</a></li>
</ul>
</div>
<ul id="index">
<li><h3><a href="#header-submodules">Sub-modules</a></h3>
<ul>
<li><code><a title="meshtastic.admin_pb2" href="admin_pb2.html">meshtastic.admin_pb2</a></code></li>
<li><code><a title="meshtastic.apponly_pb2" href="apponly_pb2.html">meshtastic.apponly_pb2</a></code></li>
<li><code><a title="meshtastic.ble" href="ble.html">meshtastic.ble</a></code></li>
<li><code><a title="meshtastic.channel_pb2" href="channel_pb2.html">meshtastic.channel_pb2</a></code></li>
<li><code><a title="meshtastic.deviceonly_pb2" href="deviceonly_pb2.html">meshtastic.deviceonly_pb2</a></code></li>
<li><code><a title="meshtastic.environmental_measurement_pb2" href="environmental_measurement_pb2.html">meshtastic.environmental_measurement_pb2</a></code></li>
<li><code><a title="meshtastic.mesh_pb2" href="mesh_pb2.html">meshtastic.mesh_pb2</a></code></li>
<li><code><a title="meshtastic.portnums_pb2" href="portnums_pb2.html">meshtastic.portnums_pb2</a></code></li>
<li><code><a title="meshtastic.radioconfig_pb2" href="radioconfig_pb2.html">meshtastic.radioconfig_pb2</a></code></li>
<li><code><a title="meshtastic.remote_hardware" href="remote_hardware.html">meshtastic.remote_hardware</a></code></li>
<li><code><a title="meshtastic.remote_hardware_pb2" href="remote_hardware_pb2.html">meshtastic.remote_hardware_pb2</a></code></li>
<li><code><a title="meshtastic.test" href="test.html">meshtastic.test</a></code></li>
<li><code><a title="meshtastic.tunnel" href="tunnel.html">meshtastic.tunnel</a></code></li>
<li><code><a title="meshtastic.util" href="util.html">meshtastic.util</a></code></li>
</ul>
</li>
<li><h3><a href="#header-variables">Global variables</a></h3>
<ul class="">
<li><code><a title="meshtastic.MY_CONFIG_ID" href="#meshtastic.MY_CONFIG_ID">MY_CONFIG_ID</a></code></li>
</ul>
</li>
<li><h3><a href="#header-classes">Classes</a></h3>
<ul>
<li>
<h4><code><a title="meshtastic.BLEInterface" href="#meshtastic.BLEInterface">BLEInterface</a></code></h4>
<ul class="">
<li><code><a title="meshtastic.BLEInterface.close" href="#meshtastic.BLEInterface.close">close</a></code></li>
</ul>
</li>
<li>
<h4><code><a title="meshtastic.KnownProtocol" href="#meshtastic.KnownProtocol">KnownProtocol</a></code></h4>
<ul class="">
<li><code><a title="meshtastic.KnownProtocol.name" href="#meshtastic.KnownProtocol.name">name</a></code></li>
<li><code><a title="meshtastic.KnownProtocol.onReceive" href="#meshtastic.KnownProtocol.onReceive">onReceive</a></code></li>
<li><code><a title="meshtastic.KnownProtocol.protobufFactory" href="#meshtastic.KnownProtocol.protobufFactory">protobufFactory</a></code></li>
</ul>
</li>
<li>
<h4><code><a title="meshtastic.MeshInterface" href="#meshtastic.MeshInterface">MeshInterface</a></code></h4>
<ul class="two-column">
<li><code><a title="meshtastic.MeshInterface.channelURL" href="#meshtastic.MeshInterface.channelURL">channelURL</a></code></li>
<li><code><a title="meshtastic.MeshInterface.getLongName" href="#meshtastic.MeshInterface.getLongName">getLongName</a></code></li>
<li><code><a title="meshtastic.MeshInterface.getMyNodeInfo" href="#meshtastic.MeshInterface.getMyNodeInfo">getMyNodeInfo</a></code></li>
<li><code><a title="meshtastic.MeshInterface.getMyUser" href="#meshtastic.MeshInterface.getMyUser">getMyUser</a></code></li>
<li><code><a title="meshtastic.MeshInterface.getShortName" href="#meshtastic.MeshInterface.getShortName">getShortName</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendData" href="#meshtastic.MeshInterface.sendData">sendData</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendPosition" href="#meshtastic.MeshInterface.sendPosition">sendPosition</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendText" href="#meshtastic.MeshInterface.sendText">sendText</a></code></li>
<li><code><a title="meshtastic.MeshInterface.setOwner" href="#meshtastic.MeshInterface.setOwner">setOwner</a></code></li>
<li><code><a title="meshtastic.MeshInterface.setURL" href="#meshtastic.MeshInterface.setURL">setURL</a></code></li>
<li><code><a title="meshtastic.MeshInterface.waitForConfig" href="#meshtastic.MeshInterface.waitForConfig">waitForConfig</a></code></li>
<li><code><a title="meshtastic.MeshInterface.writeChannel" href="#meshtastic.MeshInterface.writeChannel">writeChannel</a></code></li>
<li><code><a title="meshtastic.MeshInterface.writeConfig" href="#meshtastic.MeshInterface.writeConfig">writeConfig</a></code></li>
</ul>
</li>
<li>
<h4><code><a title="meshtastic.ResponseHandler" href="#meshtastic.ResponseHandler">ResponseHandler</a></code></h4>
<ul class="">
<li><code><a title="meshtastic.ResponseHandler.callback" href="#meshtastic.ResponseHandler.callback">callback</a></code></li>
</ul>
</li>
<li>
<h4><code><a title="meshtastic.SerialInterface" href="#meshtastic.SerialInterface">SerialInterface</a></code></h4>
</li>
<li>
<h4><code><a title="meshtastic.StreamInterface" href="#meshtastic.StreamInterface">StreamInterface</a></code></h4>
<ul class="">
<li><code><a title="meshtastic.StreamInterface.close" href="#meshtastic.StreamInterface.close">close</a></code></li>
<li><code><a title="meshtastic.StreamInterface.connect" href="#meshtastic.StreamInterface.connect">connect</a></code></li>
</ul>
</li>
<li>
<h4><code><a title="meshtastic.TCPInterface" href="#meshtastic.TCPInterface">TCPInterface</a></code></h4>
</li>
</ul>
</li>
</ul>
</nav>
</main>
<footer id="footer">
<p>Generated by <a href="https://pdoc3.github.io/pdoc"><cite>pdoc</cite> 0.9.1</a>.</p>
</footer>
</body>
</html>