Files
python/docs/meshtastic/index.html

1425 lines
64 KiB
HTML

<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, minimum-scale=1" />
<meta name="generator" content="pdoc 0.8.1" />
<title>meshtastic API documentation</title>
<meta name="description" content="an API for Meshtastic devices …" />
<link href='https://cdnjs.cloudflare.com/ajax/libs/normalize/8.0.0/normalize.min.css' rel='stylesheet'>
<link href='https://cdnjs.cloudflare.com/ajax/libs/10up-sanitize.css/8.0.0/sanitize.min.css' rel='stylesheet'>
<link href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/github.min.css" rel="stylesheet">
<style>.flex{display:flex !important}body{line-height:1.5em}#content{padding:20px}#sidebar{padding:30px;overflow:hidden}#sidebar > *:last-child{margin-bottom:2cm}.http-server-breadcrumbs{font-size:130%;margin:0 0 15px 0}#footer{font-size:.75em;padding:5px 30px;border-top:1px solid #ddd;text-align:right}#footer p{margin:0 0 0 1em;display:inline-block}#footer p:last-child{margin-right:30px}h1,h2,h3,h4,h5{font-weight:300}h1{font-size:2.5em;line-height:1.1em}h2{font-size:1.75em;margin:1em 0 .50em 0}h3{font-size:1.4em;margin:25px 0 10px 0}h4{margin:0;font-size:105%}a{color:#058;text-decoration:none;transition:color .3s ease-in-out}a:hover{color:#e82}.title code{font-weight:bold}h2[id^="header-"]{margin-top:2em}.ident{color:#900}pre code{background:#f8f8f8;font-size:.8em;line-height:1.4em}code{background:#f2f2f1;padding:1px 4px;overflow-wrap:break-word}h1 code{background:transparent}pre{background:#f8f8f8;border:0;border-top:1px solid #ccc;border-bottom:1px solid #ccc;margin:1em 0;padding:1ex}#http-server-module-list{display:flex;flex-flow:column}#http-server-module-list div{display:flex}#http-server-module-list dt{min-width:10%}#http-server-module-list p{margin-top:0}.toc ul,#index{list-style-type:none;margin:0;padding:0}#index code{background:transparent}#index h3{border-bottom:1px solid #ddd}#index ul{padding:0}#index h4{margin-top:.6em;font-weight:bold}@media (min-width:200ex){#index .two-column{column-count:2}}@media (min-width:300ex){#index .two-column{column-count:3}}dl{margin-bottom:2em}dl dl:last-child{margin-bottom:4em}dd{margin:0 0 1em 3em}#header-classes + dl > dd{margin-bottom:3em}dd dd{margin-left:2em}dd p{margin:10px 0}.name{background:#eee;font-weight:bold;font-size:.85em;padding:5px 10px;display:inline-block;min-width:40%}.name:hover{background:#e0e0e0}.name > span:first-child{white-space:nowrap}.name.class > span:nth-child(2){margin-left:.4em}.inherited{color:#999;border-left:5px solid #eee;padding-left:1em}.inheritance em{font-style:normal;font-weight:bold}.desc h2{font-weight:400;font-size:1.25em}.desc h3{font-size:1em}.desc dt code{background:inherit}.source summary,.git-link-div{color:#666;text-align:right;font-weight:400;font-size:.8em;text-transform:uppercase}.source summary > *{white-space:nowrap;cursor:pointer}.git-link{color:inherit;margin-left:1em}.source pre{max-height:500px;overflow:auto;margin:0}.source pre code{font-size:12px;overflow:visible}.hlist{list-style:none}.hlist li{display:inline}.hlist li:after{content:',\2002'}.hlist li:last-child:after{content:none}.hlist .hlist{display:inline;padding-left:1em}img{max-width:100%}.admonition{padding:.1em .5em;margin-bottom:1em}.admonition-title{font-weight:bold}.admonition.note,.admonition.info,.admonition.important{background:#aef}.admonition.todo,.admonition.versionadded,.admonition.tip,.admonition.hint{background:#dfd}.admonition.warning,.admonition.versionchanged,.admonition.deprecated{background:#fd4}.admonition.error,.admonition.danger,.admonition.caution{background:lightpink}</style>
<style media="screen and (min-width: 700px)">@media screen and (min-width:700px){#sidebar{width:30%;height:100vh;overflow:auto;position:sticky;top:0}#content{width:70%;max-width:100ch;padding:3em 4em;border-left:1px solid #ddd}pre code{font-size:1em}.item .name{font-size:1em}main{display:flex;flex-direction:row-reverse;justify-content:flex-end}.toc ul ul,#index ul{padding-left:1.5em}.toc > ul > li{margin-top:.5em}}</style>
<style media="print">@media print{#sidebar h1{page-break-before:always}.source{display:none}}@media print{*{background:transparent !important;color:#000 !important;box-shadow:none !important;text-shadow:none !important}a[href]:after{content:" (" attr(href) ")";font-size:90%}a[href][title]:after{content:none}abbr[title]:after{content:" (" attr(title) ")"}.ir a:after,a[href^="javascript:"]:after,a[href^="#"]:after{content:""}pre,blockquote{border:1px solid #999;page-break-inside:avoid}thead{display:table-header-group}tr,img{page-break-inside:avoid}img{max-width:100% !important}@page{margin:0.5cm}p,h2,h3{orphans:3;widows:3}h1,h2,h3,h4,h5,h6{page-break-after:avoid}}</style>
</head>
<body>
<main>
<article id="content">
<header>
<h1 class="title">Package <code>meshtastic</code></h1>
</header>
<section id="section-intro">
<h1 id="an-api-for-meshtastic-devices">an API for Meshtastic devices</h1>
<p>Primary class: StreamInterface
Install with pip: "<a href="https://pypi.org/project/meshtastic/">pip3 install meshtastic</a>"
Source code on <a href="https://github.com/meshtastic/Meshtastic-python">github</a></p>
<p>properties of StreamInterface:</p>
<ul>
<li>radioConfig - Current radio configuration and device settings, if you write to this the new settings will be applied to
the device.</li>
<li>nodes - The database of received nodes.
Includes always up-to-date location and username information for each
node in the mesh.
This is a read-only datastructure.</li>
<li>myNodeInfo - Contains read-only information about the local radio device (software version, hardware version, etc)</li>
</ul>
<h1 id="published-pubsub-topics">Published PubSub topics</h1>
<p>We use a <a href="https://pypubsub.readthedocs.io/en/v4.0.3/">publish-subscribe</a> model to communicate asynchronous events.
Available
topics:</p>
<ul>
<li>meshtastic.connection.established - published once we've successfully connected to the radio and downloaded the node DB</li>
<li>meshtastic.connection.lost - published once we've lost our link to the radio</li>
<li>meshtastic.receive.position(packet) - delivers a received packet as a dictionary, if you only care about a particular
type of packet, you should subscribe to the full topic name.
If you want to see all packets, simply subscribe to "meshtastic.receive".</li>
<li>meshtastic.receive.user(packet)</li>
<li>meshtastic.receive.data(packet)</li>
<li>meshtastic.node.updated(node = NodeInfo) - published when a node in the DB changes (appears, location changed, username changed, etc&hellip;)</li>
</ul>
<h1 id="example-usage">Example Usage</h1>
<pre><code>import meshtastic
from pubsub import pub
def onReceive(packet, interface): # called when a packet arrives
print(f&quot;Received: {packet}&quot;)
def onConnection(interface, topic): # called when we (re)connect to the radio
# defaults to broadcast, specify a destination ID if you wish
interface.sendText(&quot;hello mesh&quot;)
pub.subscribe(onReceive, &quot;meshtastic.receive&quot;)
pub.subscribe(onConnection, &quot;meshtastic.connection.established&quot;)
# By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0
interface = meshtastic.StreamInterface()
</code></pre>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">&#34;&#34;&#34;
# an API for Meshtastic devices
Primary class: StreamInterface
Install with pip: &#34;[pip3 install meshtastic](https://pypi.org/project/meshtastic/)&#34;
Source code on [github](https://github.com/meshtastic/Meshtastic-python)
properties of StreamInterface:
- radioConfig - Current radio configuration and device settings, if you write to this the new settings will be applied to
the device.
- nodes - The database of received nodes. Includes always up-to-date location and username information for each
node in the mesh. This is a read-only datastructure.
- myNodeInfo - Contains read-only information about the local radio device (software version, hardware version, etc)
# Published PubSub topics
We use a [publish-subscribe](https://pypubsub.readthedocs.io/en/v4.0.3/) model to communicate asynchronous events. Available
topics:
- meshtastic.connection.established - published once we&#39;ve successfully connected to the radio and downloaded the node DB
- meshtastic.connection.lost - published once we&#39;ve lost our link to the radio
- meshtastic.receive.position(packet) - delivers a received packet as a dictionary, if you only care about a particular
type of packet, you should subscribe to the full topic name. If you want to see all packets, simply subscribe to &#34;meshtastic.receive&#34;.
- meshtastic.receive.user(packet)
- meshtastic.receive.data(packet)
- meshtastic.node.updated(node = NodeInfo) - published when a node in the DB changes (appears, location changed, username changed, etc...)
# Example Usage
```
import meshtastic
from pubsub import pub
def onReceive(packet, interface): # called when a packet arrives
print(f&#34;Received: {packet}&#34;)
def onConnection(interface, topic): # called when we (re)connect to the radio
# defaults to broadcast, specify a destination ID if you wish
interface.sendText(&#34;hello mesh&#34;)
pub.subscribe(onReceive, &#34;meshtastic.receive&#34;)
pub.subscribe(onConnection, &#34;meshtastic.connection.established&#34;)
# By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0
interface = meshtastic.StreamInterface()
```
&#34;&#34;&#34;
import pygatt
import google.protobuf.json_format
import serial
import threading
import logging
import time
import sys
import traceback
import time
from . import mesh_pb2
from . import util
from pubsub import pub
from dotmap import DotMap
START1 = 0x94
START2 = 0xc3
HEADER_LEN = 4
MAX_TO_FROM_RADIO_SIZE = 512
BROADCAST_ADDR = &#34;^all&#34; # A special ID that means broadcast
# if using 8 bit nodenums this will be shortend on the target
BROADCAST_NUM = 0xffffffff
MY_CONFIG_ID = 42
&#34;&#34;&#34;The numeric buildnumber (shared with android apps) specifying the level of device code we are guaranteed to understand&#34;&#34;&#34;
OUR_APP_VERSION = 172
class MeshInterface:
&#34;&#34;&#34;Interface class for meshtastic devices
Properties:
isConnected
nodes
debugOut
&#34;&#34;&#34;
def __init__(self, debugOut=None, noProto=False):
&#34;&#34;&#34;Constructor&#34;&#34;&#34;
self.debugOut = debugOut
self.nodes = None # FIXME
self.isConnected = False
if not noProto:
self._startConfig()
def sendText(self, text, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False):
&#34;&#34;&#34;Send a utf8 string to some other node, if the node has a display it will also be shown on the device.
Arguments:
text {string} -- The text to send
Keyword Arguments:
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
return self.sendData(text.encode(&#34;utf-8&#34;), destinationId,
dataType=mesh_pb2.Data.CLEAR_TEXT, wantAck=wantAck, wantResponse=wantResponse)
def sendData(self, byteData, destinationId=BROADCAST_ADDR, dataType=mesh_pb2.Data.OPAQUE, wantAck=False, wantResponse=False):
&#34;&#34;&#34;Send a data packet to some other node
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
meshPacket = mesh_pb2.MeshPacket()
meshPacket.decoded.data.payload = byteData
meshPacket.decoded.data.typ = dataType
meshPacket.decoded.want_response = wantResponse
return self.sendPacket(meshPacket, destinationId, wantAck=wantAck)
def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False):
&#34;&#34;&#34;
Send a position packet to some other node (normally a broadcast)
Also, the device software will notice this packet and use it to automatically set its notion of
the local position.
If timeSec is not specified (recommended), we will use the local machine time.
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
meshPacket = mesh_pb2.MeshPacket()
if(latitude != 0.0):
meshPacket.decoded.position.latitude_i = int(latitude / 1e-7)
if(longitude != 0.0):
meshPacket.decoded.position.longitude_i = int(longitude / 1e-7)
if(altitude != 0):
meshPacket.decoded.position.altitude = int(altitude)
if timeSec == 0:
timeSec = time.time() # returns unix timestamp in seconds
meshPacket.decoded.position.time = int(timeSec)
meshPacket.decoded.want_response = wantResponse
return self.sendPacket(meshPacket, destinationId, wantAck=wantAck)
def sendPacket(self, meshPacket, destinationId=BROADCAST_ADDR, wantAck=False):
&#34;&#34;&#34;Send a MeshPacket to the specified node (or if unspecified, broadcast).
You probably don&#39;t want this - use sendData instead.
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
toRadio = mesh_pb2.ToRadio()
# FIXME add support for non broadcast addresses
if isinstance(destinationId, int):
nodeNum = destinationId
elif destinationId == BROADCAST_ADDR:
nodeNum = BROADCAST_NUM
else:
nodeNum = self.nodes[destinationId][&#39;num&#39;]
meshPacket.to = nodeNum
meshPacket.want_ack = wantAck
# if the user hasn&#39;t set an ID for this packet (likely and recommended), we should pick a new unique ID
# so the message can be tracked.
if meshPacket.id == 0:
meshPacket.id = self._generatePacketId()
toRadio.packet.CopyFrom(meshPacket)
self._sendToRadio(toRadio)
return meshPacket
def writeConfig(self):
&#34;&#34;&#34;Write the current (edited) radioConfig to the device&#34;&#34;&#34;
if self.radioConfig == None:
raise Exception(&#34;No RadioConfig has been read&#34;)
t = mesh_pb2.ToRadio()
t.set_radio.CopyFrom(self.radioConfig)
self._sendToRadio(t)
def _generatePacketId(self):
&#34;&#34;&#34;Get a new unique packet ID&#34;&#34;&#34;
self.currentPacketId = (self.currentPacketId + 1) &amp; 0xffffffff
return self.currentPacketId
def _disconnected(self):
&#34;&#34;&#34;Called by subclasses to tell clients this interface has disconnected&#34;&#34;&#34;
self.isConnected = False
pub.sendMessage(&#34;meshtastic.connection.lost&#34;, interface=self)
def _connected(self):
&#34;&#34;&#34;Called by this class to tell clients we are now fully connected to a node
&#34;&#34;&#34;
self.isConnected = True
pub.sendMessage(&#34;meshtastic.connection.established&#34;, interface=self)
def _startConfig(self):
&#34;&#34;&#34;Start device packets flowing&#34;&#34;&#34;
self.myInfo = None
self.nodes = {} # nodes keyed by ID
self._nodesByNum = {} # nodes keyed by nodenum
self.radioConfig = None
self.currentPacketId = None
startConfig = mesh_pb2.ToRadio()
startConfig.want_config_id = MY_CONFIG_ID # we don&#39;t use this value
self._sendToRadio(startConfig)
def _sendToRadio(self, toRadio):
&#34;&#34;&#34;Send a ToRadio protobuf to the device&#34;&#34;&#34;
logging.error(f&#34;Subclass must provide toradio: {toRadio}&#34;)
def _handleFromRadio(self, fromRadioBytes):
&#34;&#34;&#34;
Handle a packet that arrived from the radio(update model and publish events)
Called by subclasses.&#34;&#34;&#34;
fromRadio = mesh_pb2.FromRadio()
fromRadio.ParseFromString(fromRadioBytes)
asDict = google.protobuf.json_format.MessageToDict(fromRadio)
logging.debug(f&#34;Received: {asDict}&#34;)
if fromRadio.HasField(&#34;my_info&#34;):
self.myInfo = fromRadio.my_info
if self.myInfo.min_app_version &gt; OUR_APP_VERSION:
raise Exception(
&#34;This device needs a newer python client, please \&#34;pip install --upgrade meshtastic\&#34;&#34;)
# start assigning our packet IDs from the opposite side of where our local device is assigning them
self.currentPacketId = (
self.myInfo.current_packet_id + 0x80000000) &amp; 0xffffffff
elif fromRadio.HasField(&#34;radio&#34;):
self.radioConfig = fromRadio.radio
elif fromRadio.HasField(&#34;node_info&#34;):
node = asDict[&#34;nodeInfo&#34;]
try:
self._fixupPosition(node[&#34;position&#34;])
except:
logging.debug(&#34;Node without position&#34;)
self._nodesByNum[node[&#34;num&#34;]] = node
if &#34;user&#34; in node: # Some nodes might not have user/ids assigned yet
self.nodes[node[&#34;user&#34;][&#34;id&#34;]] = node
elif fromRadio.config_complete_id == MY_CONFIG_ID:
# we ignore the config_complete_id, it is unneeded for our stream API fromRadio.config_complete_id
self._connected()
elif fromRadio.HasField(&#34;packet&#34;):
self._handlePacketFromRadio(fromRadio.packet)
elif fromRadio.rebooted:
self._disconnected()
self._startConfig() # redownload the node db etc...
else:
logging.warn(&#34;Unexpected FromRadio payload&#34;)
def _fixupPosition(self, position):
&#34;&#34;&#34;Convert integer lat/lon into floats
Arguments:
position {Position dictionary} -- object ot fix up
&#34;&#34;&#34;
if &#34;latitudeI&#34; in position:
position[&#34;latitude&#34;] = position[&#34;latitudeI&#34;] * 1e-7
if &#34;longitudeI&#34; in position:
position[&#34;longitude&#34;] = position[&#34;longitudeI&#34;] * 1e-7
def _nodeNumToId(self, num):
&#34;&#34;&#34;Map a node node number to a node ID
Arguments:
num {int} -- Node number
Returns:
string -- Node ID
&#34;&#34;&#34;
if num == BROADCAST_NUM:
return BROADCAST_ADDR
try:
return self._nodesByNum[num][&#34;user&#34;][&#34;id&#34;]
except:
logging.error(&#34;Node not found for fromId&#34;)
return None
def _getOrCreateByNum(self, nodeNum):
&#34;&#34;&#34;Given a nodenum find the NodeInfo in the DB (or create if necessary)&#34;&#34;&#34;
if nodeNum == BROADCAST_NUM:
raise Exception(&#34;Can not create/find nodenum by the broadcast num&#34;)
if nodeNum in self._nodesByNum:
return self._nodesByNum[nodeNum]
else:
n = {&#34;num&#34;: nodeNum} # Create a minimial node db entry
self._nodesByNum[nodeNum] = n
return n
def _handlePacketFromRadio(self, meshPacket):
&#34;&#34;&#34;Handle a MeshPacket that just arrived from the radio
Will publish one of the following events:
- meshtastic.receive.position(packet = MeshPacket dictionary)
- meshtastic.receive.user(packet = MeshPacket dictionary)
- meshtastic.receive.data(packet = MeshPacket dictionary)
&#34;&#34;&#34;
asDict = google.protobuf.json_format.MessageToDict(meshPacket)
# /add fromId and toId fields based on the node ID
asDict[&#34;fromId&#34;] = self._nodeNumToId(asDict[&#34;from&#34;])
asDict[&#34;toId&#34;] = self._nodeNumToId(asDict[&#34;to&#34;])
# We could provide our objects as DotMaps - which work with . notation or as dictionaries
# asObj = DotMap(asDict)
topic = &#34;meshtastic.receive&#34; # Generic unknown packet type
if meshPacket.decoded.HasField(&#34;position&#34;):
topic = &#34;meshtastic.receive.position&#34;
p = asDict[&#34;decoded&#34;][&#34;position&#34;]
self._fixupPosition(p)
# update node DB as needed
self._getOrCreateByNum(asDict[&#34;from&#34;])[&#34;position&#34;] = p
if meshPacket.decoded.HasField(&#34;user&#34;):
topic = &#34;meshtastic.receive.user&#34;
u = asDict[&#34;decoded&#34;][&#34;user&#34;]
# update node DB as needed
n = self._getOrCreateByNum(asDict[&#34;from&#34;])
n[&#34;user&#34;] = u
# We now have a node ID, make sure it is uptodate in that table
self.nodes[u[&#34;id&#34;]] = u
if meshPacket.decoded.HasField(&#34;data&#34;):
topic = &#34;meshtastic.receive.data&#34;
# OPAQUE is the default protobuf typ value, and therefore if not set it will not be populated at all
# to make API usage easier, set it to prevent confusion
if not &#34;typ&#34; in asDict[&#34;decoded&#34;][&#34;data&#34;]:
asDict[&#34;decoded&#34;][&#34;data&#34;][&#34;typ&#34;] = &#34;OPAQUE&#34;
# For text messages, we go ahead and decode the text to ascii for our users
if asDict[&#34;decoded&#34;][&#34;data&#34;][&#34;typ&#34;] == &#34;CLEAR_TEXT&#34;:
asDict[&#34;decoded&#34;][&#34;data&#34;][&#34;text&#34;] = meshPacket.decoded.data.payload.decode(
&#34;utf-8&#34;)
pub.sendMessage(topic, packet=asDict, interface=self)
# Our standard BLE characteristics
TORADIO_UUID = &#34;f75c76d2-129e-4dad-a1dd-7866124401e7&#34;
FROMRADIO_UUID = &#34;8ba2bcc2-ee02-4a55-a531-c525c5e454d5&#34;
FROMNUM_UUID = &#34;ed9da18c-a800-4f66-a670-aa7547e34453&#34;
class BLEInterface(MeshInterface):
&#34;&#34;&#34;A not quite ready - FIXME - BLE interface to devices&#34;&#34;&#34;
def __init__(self, address, debugOut=None):
self.address = address
self.adapter = pygatt.GATTToolBackend() # BGAPIBackend()
self.adapter.start()
logging.debug(f&#34;Connecting to {self.address}&#34;)
self.device = self.adapter.connect(address)
logging.debug(&#34;Connected to device&#34;)
# fromradio = self.device.char_read(FROMRADIO_UUID)
MeshInterface.__init__(self, debugOut=debugOut)
self._readFromRadio() # read the initial responses
def handle_data(handle, data):
self._handleFromRadio(data)
self.device.subscribe(FROMNUM_UUID, callback=handle_data)
def _sendToRadio(self, toRadio):
&#34;&#34;&#34;Send a ToRadio protobuf to the device&#34;&#34;&#34;
logging.debug(f&#34;Sending: {toRadio}&#34;)
b = toRadio.SerializeToString()
self.device.char_write(TORADIO_UUID, b)
def close(self):
self.adapter.stop()
def _readFromRadio(self):
wasEmpty = False
while not wasEmpty:
b = self.device.char_read(FROMRADIO_UUID)
wasEmpty = len(b) == 0
if not wasEmpty:
self._handleFromRadio(b)
class StreamInterface(MeshInterface):
&#34;&#34;&#34;Interface class for meshtastic devices over a stream link (serial, TCP, etc)&#34;&#34;&#34;
def __init__(self, devPath=None, debugOut=None, noProto=False, connectNow=True):
&#34;&#34;&#34;Constructor, opens a connection to a specified serial port, or if unspecified try to
find one Meshtastic device by probing
Keyword Arguments:
devPath {string} -- A filepath to a device, i.e. /dev/ttyUSB0 (default: {None})
debugOut {stream} -- If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None})
Raises:
Exception: [description]
Exception: [description]
&#34;&#34;&#34;
if devPath is None:
ports = util.findPorts()
if len(ports) == 0:
raise Exception(&#34;No Meshtastic devices detected&#34;)
elif len(ports) &gt; 1:
raise Exception(
f&#34;Multiple ports detected, you must specify a device, such as {ports[0].device}&#34;)
else:
devPath = ports[0]
logging.debug(f&#34;Connecting to {devPath}&#34;)
self.devPath = devPath
self._rxBuf = bytes() # empty
self._wantExit = False
self.stream = serial.Serial(
devPath, 921600, exclusive=True, timeout=0.5)
self._rxThread = threading.Thread(target=self.__reader, args=())
MeshInterface.__init__(self, debugOut=debugOut, noProto=noProto)
# Start the reader thread after superclass constructor completes init
if connectNow:
self.connect()
def connect(self):
&#34;&#34;&#34;Connect to our radio
Normally this is called automatically by the constructor, but if you passed in connectNow=False you can manually
start the reading thread later.
&#34;&#34;&#34;
# Send some bogus UART characters to force a sleeping device to wake
self.stream.write(bytes([START1, START1, START1, START1]))
self.stream.flush()
time.sleep(0.1) # wait 100ms to give device time to start running
self._rxThread.start()
def _sendToRadio(self, toRadio):
&#34;&#34;&#34;Send a ToRadio protobuf to the device&#34;&#34;&#34;
logging.debug(f&#34;Sending: {toRadio}&#34;)
b = toRadio.SerializeToString()
bufLen = len(b)
header = bytes([START1, START2, (bufLen &gt;&gt; 8) &amp; 0xff, bufLen &amp; 0xff])
self.stream.write(header)
self.stream.write(b)
self.stream.flush()
def close(self):
&#34;&#34;&#34;Close a connection to the device&#34;&#34;&#34;
logging.debug(&#34;Closing serial stream&#34;)
# pyserial cancel_read doesn&#39;t seem to work, therefore we ask the reader thread to close things for us
self._wantExit = True
if self._rxThread != threading.current_thread():
self._rxThread.join() # wait for it to exit
def __reader(self):
&#34;&#34;&#34;The reader thread that reads bytes from our stream&#34;&#34;&#34;
empty = bytes()
try:
while not self._wantExit:
b = self.stream.read(1)
if len(b) &gt; 0:
# logging.debug(f&#34;read returned {b}&#34;)
c = b[0]
ptr = len(self._rxBuf)
# Assume we want to append this byte, fixme use bytearray instead
self._rxBuf = self._rxBuf + b
if ptr == 0: # looking for START1
if c != START1:
self._rxBuf = empty # failed to find start
if self.debugOut != None:
try:
self.debugOut.write(b.decode(&#34;utf-8&#34;))
except:
self.debugOut.write(&#39;?&#39;)
elif ptr == 1: # looking for START2
if c != START2:
self.rfBuf = empty # failed to find start2
elif ptr &gt;= HEADER_LEN: # we&#39;ve at least got a header
# big endian length follos header
packetlen = (self._rxBuf[2] &lt;&lt; 8) + self._rxBuf[3]
if ptr == HEADER_LEN: # we _just_ finished reading the header, validate length
if packetlen &gt; MAX_TO_FROM_RADIO_SIZE:
self.rfBuf = empty # length ws out out bounds, restart
if len(self._rxBuf) != 0 and ptr + 1 == packetlen + HEADER_LEN:
try:
self._handleFromRadio(self._rxBuf[HEADER_LEN:])
except Exception as ex:
logging.error(
f&#34;Error while handling message from radio {ex}&#34;)
traceback.print_exc()
self._rxBuf = empty
else:
# logging.debug(f&#34;timeout on {self.devPath}&#34;)
pass
except serial.SerialException as ex:
logging.warn(
&#34;Meshtastic erial port disconnected, disconnecting...&#34;)
finally:
logging.debug(&#34;reader is exiting&#34;)
self.stream.close()
self._disconnected()</code></pre>
</details>
</section>
<section>
<h2 class="section-title" id="header-submodules">Sub-modules</h2>
<dl>
<dt><code class="name"><a title="meshtastic.ble" href="ble.html">meshtastic.ble</a></code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt><code class="name"><a title="meshtastic.mesh_pb2" href="mesh_pb2.html">meshtastic.mesh_pb2</a></code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt><code class="name"><a title="meshtastic.test" href="test.html">meshtastic.test</a></code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt><code class="name"><a title="meshtastic.util" href="util.html">meshtastic.util</a></code></dt>
<dd>
<div class="desc"></div>
</dd>
</dl>
</section>
<section>
<h2 class="section-title" id="header-variables">Global variables</h2>
<dl>
<dt id="meshtastic.MY_CONFIG_ID"><code class="name">var <span class="ident">MY_CONFIG_ID</span></code></dt>
<dd>
<div class="desc"><p>The numeric buildnumber (shared with android apps) specifying the level of device code we are guaranteed to understand</p></div>
</dd>
</dl>
</section>
<section>
</section>
<section>
<h2 class="section-title" id="header-classes">Classes</h2>
<dl>
<dt id="meshtastic.BLEInterface"><code class="flex name class">
<span>class <span class="ident">BLEInterface</span></span>
<span>(</span><span>address, debugOut=None)</span>
</code></dt>
<dd>
<div class="desc"><p>A not quite ready - FIXME - BLE interface to devices</p>
<p>Constructor</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">class BLEInterface(MeshInterface):
&#34;&#34;&#34;A not quite ready - FIXME - BLE interface to devices&#34;&#34;&#34;
def __init__(self, address, debugOut=None):
self.address = address
self.adapter = pygatt.GATTToolBackend() # BGAPIBackend()
self.adapter.start()
logging.debug(f&#34;Connecting to {self.address}&#34;)
self.device = self.adapter.connect(address)
logging.debug(&#34;Connected to device&#34;)
# fromradio = self.device.char_read(FROMRADIO_UUID)
MeshInterface.__init__(self, debugOut=debugOut)
self._readFromRadio() # read the initial responses
def handle_data(handle, data):
self._handleFromRadio(data)
self.device.subscribe(FROMNUM_UUID, callback=handle_data)
def _sendToRadio(self, toRadio):
&#34;&#34;&#34;Send a ToRadio protobuf to the device&#34;&#34;&#34;
logging.debug(f&#34;Sending: {toRadio}&#34;)
b = toRadio.SerializeToString()
self.device.char_write(TORADIO_UUID, b)
def close(self):
self.adapter.stop()
def _readFromRadio(self):
wasEmpty = False
while not wasEmpty:
b = self.device.char_read(FROMRADIO_UUID)
wasEmpty = len(b) == 0
if not wasEmpty:
self._handleFromRadio(b)</code></pre>
</details>
<h3>Ancestors</h3>
<ul class="hlist">
<li><a title="meshtastic.MeshInterface" href="#meshtastic.MeshInterface">MeshInterface</a></li>
</ul>
<h3>Methods</h3>
<dl>
<dt id="meshtastic.BLEInterface.close"><code class="name flex">
<span>def <span class="ident">close</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def close(self):
self.adapter.stop()</code></pre>
</details>
</dd>
</dl>
<h3>Inherited members</h3>
<ul class="hlist">
<li><code><b><a title="meshtastic.MeshInterface" href="#meshtastic.MeshInterface">MeshInterface</a></b></code>:
<ul class="hlist">
<li><code><a title="meshtastic.MeshInterface.sendData" href="#meshtastic.MeshInterface.sendData">sendData</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendPacket" href="#meshtastic.MeshInterface.sendPacket">sendPacket</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendPosition" href="#meshtastic.MeshInterface.sendPosition">sendPosition</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendText" href="#meshtastic.MeshInterface.sendText">sendText</a></code></li>
<li><code><a title="meshtastic.MeshInterface.writeConfig" href="#meshtastic.MeshInterface.writeConfig">writeConfig</a></code></li>
</ul>
</li>
</ul>
</dd>
<dt id="meshtastic.MeshInterface"><code class="flex name class">
<span>class <span class="ident">MeshInterface</span></span>
<span>(</span><span>debugOut=None, noProto=False)</span>
</code></dt>
<dd>
<div class="desc"><p>Interface class for meshtastic devices</p>
<p>Properties:</p>
<p>isConnected
nodes
debugOut</p>
<p>Constructor</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">class MeshInterface:
&#34;&#34;&#34;Interface class for meshtastic devices
Properties:
isConnected
nodes
debugOut
&#34;&#34;&#34;
def __init__(self, debugOut=None, noProto=False):
&#34;&#34;&#34;Constructor&#34;&#34;&#34;
self.debugOut = debugOut
self.nodes = None # FIXME
self.isConnected = False
if not noProto:
self._startConfig()
def sendText(self, text, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False):
&#34;&#34;&#34;Send a utf8 string to some other node, if the node has a display it will also be shown on the device.
Arguments:
text {string} -- The text to send
Keyword Arguments:
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
return self.sendData(text.encode(&#34;utf-8&#34;), destinationId,
dataType=mesh_pb2.Data.CLEAR_TEXT, wantAck=wantAck, wantResponse=wantResponse)
def sendData(self, byteData, destinationId=BROADCAST_ADDR, dataType=mesh_pb2.Data.OPAQUE, wantAck=False, wantResponse=False):
&#34;&#34;&#34;Send a data packet to some other node
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
meshPacket = mesh_pb2.MeshPacket()
meshPacket.decoded.data.payload = byteData
meshPacket.decoded.data.typ = dataType
meshPacket.decoded.want_response = wantResponse
return self.sendPacket(meshPacket, destinationId, wantAck=wantAck)
def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False):
&#34;&#34;&#34;
Send a position packet to some other node (normally a broadcast)
Also, the device software will notice this packet and use it to automatically set its notion of
the local position.
If timeSec is not specified (recommended), we will use the local machine time.
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
meshPacket = mesh_pb2.MeshPacket()
if(latitude != 0.0):
meshPacket.decoded.position.latitude_i = int(latitude / 1e-7)
if(longitude != 0.0):
meshPacket.decoded.position.longitude_i = int(longitude / 1e-7)
if(altitude != 0):
meshPacket.decoded.position.altitude = int(altitude)
if timeSec == 0:
timeSec = time.time() # returns unix timestamp in seconds
meshPacket.decoded.position.time = int(timeSec)
meshPacket.decoded.want_response = wantResponse
return self.sendPacket(meshPacket, destinationId, wantAck=wantAck)
def sendPacket(self, meshPacket, destinationId=BROADCAST_ADDR, wantAck=False):
&#34;&#34;&#34;Send a MeshPacket to the specified node (or if unspecified, broadcast).
You probably don&#39;t want this - use sendData instead.
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
toRadio = mesh_pb2.ToRadio()
# FIXME add support for non broadcast addresses
if isinstance(destinationId, int):
nodeNum = destinationId
elif destinationId == BROADCAST_ADDR:
nodeNum = BROADCAST_NUM
else:
nodeNum = self.nodes[destinationId][&#39;num&#39;]
meshPacket.to = nodeNum
meshPacket.want_ack = wantAck
# if the user hasn&#39;t set an ID for this packet (likely and recommended), we should pick a new unique ID
# so the message can be tracked.
if meshPacket.id == 0:
meshPacket.id = self._generatePacketId()
toRadio.packet.CopyFrom(meshPacket)
self._sendToRadio(toRadio)
return meshPacket
def writeConfig(self):
&#34;&#34;&#34;Write the current (edited) radioConfig to the device&#34;&#34;&#34;
if self.radioConfig == None:
raise Exception(&#34;No RadioConfig has been read&#34;)
t = mesh_pb2.ToRadio()
t.set_radio.CopyFrom(self.radioConfig)
self._sendToRadio(t)
def _generatePacketId(self):
&#34;&#34;&#34;Get a new unique packet ID&#34;&#34;&#34;
self.currentPacketId = (self.currentPacketId + 1) &amp; 0xffffffff
return self.currentPacketId
def _disconnected(self):
&#34;&#34;&#34;Called by subclasses to tell clients this interface has disconnected&#34;&#34;&#34;
self.isConnected = False
pub.sendMessage(&#34;meshtastic.connection.lost&#34;, interface=self)
def _connected(self):
&#34;&#34;&#34;Called by this class to tell clients we are now fully connected to a node
&#34;&#34;&#34;
self.isConnected = True
pub.sendMessage(&#34;meshtastic.connection.established&#34;, interface=self)
def _startConfig(self):
&#34;&#34;&#34;Start device packets flowing&#34;&#34;&#34;
self.myInfo = None
self.nodes = {} # nodes keyed by ID
self._nodesByNum = {} # nodes keyed by nodenum
self.radioConfig = None
self.currentPacketId = None
startConfig = mesh_pb2.ToRadio()
startConfig.want_config_id = MY_CONFIG_ID # we don&#39;t use this value
self._sendToRadio(startConfig)
def _sendToRadio(self, toRadio):
&#34;&#34;&#34;Send a ToRadio protobuf to the device&#34;&#34;&#34;
logging.error(f&#34;Subclass must provide toradio: {toRadio}&#34;)
def _handleFromRadio(self, fromRadioBytes):
&#34;&#34;&#34;
Handle a packet that arrived from the radio(update model and publish events)
Called by subclasses.&#34;&#34;&#34;
fromRadio = mesh_pb2.FromRadio()
fromRadio.ParseFromString(fromRadioBytes)
asDict = google.protobuf.json_format.MessageToDict(fromRadio)
logging.debug(f&#34;Received: {asDict}&#34;)
if fromRadio.HasField(&#34;my_info&#34;):
self.myInfo = fromRadio.my_info
if self.myInfo.min_app_version &gt; OUR_APP_VERSION:
raise Exception(
&#34;This device needs a newer python client, please \&#34;pip install --upgrade meshtastic\&#34;&#34;)
# start assigning our packet IDs from the opposite side of where our local device is assigning them
self.currentPacketId = (
self.myInfo.current_packet_id + 0x80000000) &amp; 0xffffffff
elif fromRadio.HasField(&#34;radio&#34;):
self.radioConfig = fromRadio.radio
elif fromRadio.HasField(&#34;node_info&#34;):
node = asDict[&#34;nodeInfo&#34;]
try:
self._fixupPosition(node[&#34;position&#34;])
except:
logging.debug(&#34;Node without position&#34;)
self._nodesByNum[node[&#34;num&#34;]] = node
if &#34;user&#34; in node: # Some nodes might not have user/ids assigned yet
self.nodes[node[&#34;user&#34;][&#34;id&#34;]] = node
elif fromRadio.config_complete_id == MY_CONFIG_ID:
# we ignore the config_complete_id, it is unneeded for our stream API fromRadio.config_complete_id
self._connected()
elif fromRadio.HasField(&#34;packet&#34;):
self._handlePacketFromRadio(fromRadio.packet)
elif fromRadio.rebooted:
self._disconnected()
self._startConfig() # redownload the node db etc...
else:
logging.warn(&#34;Unexpected FromRadio payload&#34;)
def _fixupPosition(self, position):
&#34;&#34;&#34;Convert integer lat/lon into floats
Arguments:
position {Position dictionary} -- object ot fix up
&#34;&#34;&#34;
if &#34;latitudeI&#34; in position:
position[&#34;latitude&#34;] = position[&#34;latitudeI&#34;] * 1e-7
if &#34;longitudeI&#34; in position:
position[&#34;longitude&#34;] = position[&#34;longitudeI&#34;] * 1e-7
def _nodeNumToId(self, num):
&#34;&#34;&#34;Map a node node number to a node ID
Arguments:
num {int} -- Node number
Returns:
string -- Node ID
&#34;&#34;&#34;
if num == BROADCAST_NUM:
return BROADCAST_ADDR
try:
return self._nodesByNum[num][&#34;user&#34;][&#34;id&#34;]
except:
logging.error(&#34;Node not found for fromId&#34;)
return None
def _getOrCreateByNum(self, nodeNum):
&#34;&#34;&#34;Given a nodenum find the NodeInfo in the DB (or create if necessary)&#34;&#34;&#34;
if nodeNum == BROADCAST_NUM:
raise Exception(&#34;Can not create/find nodenum by the broadcast num&#34;)
if nodeNum in self._nodesByNum:
return self._nodesByNum[nodeNum]
else:
n = {&#34;num&#34;: nodeNum} # Create a minimial node db entry
self._nodesByNum[nodeNum] = n
return n
def _handlePacketFromRadio(self, meshPacket):
&#34;&#34;&#34;Handle a MeshPacket that just arrived from the radio
Will publish one of the following events:
- meshtastic.receive.position(packet = MeshPacket dictionary)
- meshtastic.receive.user(packet = MeshPacket dictionary)
- meshtastic.receive.data(packet = MeshPacket dictionary)
&#34;&#34;&#34;
asDict = google.protobuf.json_format.MessageToDict(meshPacket)
# /add fromId and toId fields based on the node ID
asDict[&#34;fromId&#34;] = self._nodeNumToId(asDict[&#34;from&#34;])
asDict[&#34;toId&#34;] = self._nodeNumToId(asDict[&#34;to&#34;])
# We could provide our objects as DotMaps - which work with . notation or as dictionaries
# asObj = DotMap(asDict)
topic = &#34;meshtastic.receive&#34; # Generic unknown packet type
if meshPacket.decoded.HasField(&#34;position&#34;):
topic = &#34;meshtastic.receive.position&#34;
p = asDict[&#34;decoded&#34;][&#34;position&#34;]
self._fixupPosition(p)
# update node DB as needed
self._getOrCreateByNum(asDict[&#34;from&#34;])[&#34;position&#34;] = p
if meshPacket.decoded.HasField(&#34;user&#34;):
topic = &#34;meshtastic.receive.user&#34;
u = asDict[&#34;decoded&#34;][&#34;user&#34;]
# update node DB as needed
n = self._getOrCreateByNum(asDict[&#34;from&#34;])
n[&#34;user&#34;] = u
# We now have a node ID, make sure it is uptodate in that table
self.nodes[u[&#34;id&#34;]] = u
if meshPacket.decoded.HasField(&#34;data&#34;):
topic = &#34;meshtastic.receive.data&#34;
# OPAQUE is the default protobuf typ value, and therefore if not set it will not be populated at all
# to make API usage easier, set it to prevent confusion
if not &#34;typ&#34; in asDict[&#34;decoded&#34;][&#34;data&#34;]:
asDict[&#34;decoded&#34;][&#34;data&#34;][&#34;typ&#34;] = &#34;OPAQUE&#34;
# For text messages, we go ahead and decode the text to ascii for our users
if asDict[&#34;decoded&#34;][&#34;data&#34;][&#34;typ&#34;] == &#34;CLEAR_TEXT&#34;:
asDict[&#34;decoded&#34;][&#34;data&#34;][&#34;text&#34;] = meshPacket.decoded.data.payload.decode(
&#34;utf-8&#34;)
pub.sendMessage(topic, packet=asDict, interface=self)</code></pre>
</details>
<h3>Subclasses</h3>
<ul class="hlist">
<li><a title="meshtastic.BLEInterface" href="#meshtastic.BLEInterface">BLEInterface</a></li>
<li><a title="meshtastic.StreamInterface" href="#meshtastic.StreamInterface">StreamInterface</a></li>
</ul>
<h3>Methods</h3>
<dl>
<dt id="meshtastic.MeshInterface.sendData"><code class="name flex">
<span>def <span class="ident">sendData</span></span>(<span>self, byteData, destinationId='^all', dataType=0, wantAck=False, wantResponse=False)</span>
</code></dt>
<dd>
<div class="desc"><p>Send a data packet to some other node</p>
<p>Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def sendData(self, byteData, destinationId=BROADCAST_ADDR, dataType=mesh_pb2.Data.OPAQUE, wantAck=False, wantResponse=False):
&#34;&#34;&#34;Send a data packet to some other node
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
meshPacket = mesh_pb2.MeshPacket()
meshPacket.decoded.data.payload = byteData
meshPacket.decoded.data.typ = dataType
meshPacket.decoded.want_response = wantResponse
return self.sendPacket(meshPacket, destinationId, wantAck=wantAck)</code></pre>
</details>
</dd>
<dt id="meshtastic.MeshInterface.sendPacket"><code class="name flex">
<span>def <span class="ident">sendPacket</span></span>(<span>self, meshPacket, destinationId='^all', wantAck=False)</span>
</code></dt>
<dd>
<div class="desc"><p>Send a MeshPacket to the specified node (or if unspecified, broadcast).
You probably don't want this - use sendData instead.</p>
<p>Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def sendPacket(self, meshPacket, destinationId=BROADCAST_ADDR, wantAck=False):
&#34;&#34;&#34;Send a MeshPacket to the specified node (or if unspecified, broadcast).
You probably don&#39;t want this - use sendData instead.
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
toRadio = mesh_pb2.ToRadio()
# FIXME add support for non broadcast addresses
if isinstance(destinationId, int):
nodeNum = destinationId
elif destinationId == BROADCAST_ADDR:
nodeNum = BROADCAST_NUM
else:
nodeNum = self.nodes[destinationId][&#39;num&#39;]
meshPacket.to = nodeNum
meshPacket.want_ack = wantAck
# if the user hasn&#39;t set an ID for this packet (likely and recommended), we should pick a new unique ID
# so the message can be tracked.
if meshPacket.id == 0:
meshPacket.id = self._generatePacketId()
toRadio.packet.CopyFrom(meshPacket)
self._sendToRadio(toRadio)
return meshPacket</code></pre>
</details>
</dd>
<dt id="meshtastic.MeshInterface.sendPosition"><code class="name flex">
<span>def <span class="ident">sendPosition</span></span>(<span>self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0, destinationId='^all', wantAck=False, wantResponse=False)</span>
</code></dt>
<dd>
<div class="desc"><p>Send a position packet to some other node (normally a broadcast)</p>
<p>Also, the device software will notice this packet and use it to automatically set its notion of
the local position.</p>
<p>If timeSec is not specified (recommended), we will use the local machine time.</p>
<p>Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False):
&#34;&#34;&#34;
Send a position packet to some other node (normally a broadcast)
Also, the device software will notice this packet and use it to automatically set its notion of
the local position.
If timeSec is not specified (recommended), we will use the local machine time.
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
meshPacket = mesh_pb2.MeshPacket()
if(latitude != 0.0):
meshPacket.decoded.position.latitude_i = int(latitude / 1e-7)
if(longitude != 0.0):
meshPacket.decoded.position.longitude_i = int(longitude / 1e-7)
if(altitude != 0):
meshPacket.decoded.position.altitude = int(altitude)
if timeSec == 0:
timeSec = time.time() # returns unix timestamp in seconds
meshPacket.decoded.position.time = int(timeSec)
meshPacket.decoded.want_response = wantResponse
return self.sendPacket(meshPacket, destinationId, wantAck=wantAck)</code></pre>
</details>
</dd>
<dt id="meshtastic.MeshInterface.sendText"><code class="name flex">
<span>def <span class="ident">sendText</span></span>(<span>self, text, destinationId='^all', wantAck=False, wantResponse=False)</span>
</code></dt>
<dd>
<div class="desc"><p>Send a utf8 string to some other node, if the node has a display it will also be shown on the device.</p>
<h2 id="arguments">Arguments</h2>
<p>text {string} &ndash; The text to send</p>
<p>Keyword Arguments:
destinationId {nodeId or nodeNum} &ndash; where to send this message (default: {BROADCAST_ADDR})</p>
<p>Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def sendText(self, text, destinationId=BROADCAST_ADDR, wantAck=False, wantResponse=False):
&#34;&#34;&#34;Send a utf8 string to some other node, if the node has a display it will also be shown on the device.
Arguments:
text {string} -- The text to send
Keyword Arguments:
destinationId {nodeId or nodeNum} -- where to send this message (default: {BROADCAST_ADDR})
Returns the sent packet. The id field will be populated in this packet and can be used to track future message acks/naks.
&#34;&#34;&#34;
return self.sendData(text.encode(&#34;utf-8&#34;), destinationId,
dataType=mesh_pb2.Data.CLEAR_TEXT, wantAck=wantAck, wantResponse=wantResponse)</code></pre>
</details>
</dd>
<dt id="meshtastic.MeshInterface.writeConfig"><code class="name flex">
<span>def <span class="ident">writeConfig</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"><p>Write the current (edited) radioConfig to the device</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def writeConfig(self):
&#34;&#34;&#34;Write the current (edited) radioConfig to the device&#34;&#34;&#34;
if self.radioConfig == None:
raise Exception(&#34;No RadioConfig has been read&#34;)
t = mesh_pb2.ToRadio()
t.set_radio.CopyFrom(self.radioConfig)
self._sendToRadio(t)</code></pre>
</details>
</dd>
</dl>
</dd>
<dt id="meshtastic.StreamInterface"><code class="flex name class">
<span>class <span class="ident">StreamInterface</span></span>
<span>(</span><span>devPath=None, debugOut=None, noProto=False, connectNow=True)</span>
</code></dt>
<dd>
<div class="desc"><p>Interface class for meshtastic devices over a stream link (serial, TCP, etc)</p>
<p>Constructor, opens a connection to a specified serial port, or if unspecified try to
find one Meshtastic device by probing</p>
<p>Keyword Arguments:
devPath {string} &ndash; A filepath to a device, i.e. /dev/ttyUSB0 (default: {None})
debugOut {stream} &ndash; If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None})</p>
<h2 id="raises">Raises</h2>
<dl>
<dt><code>Exception</code></dt>
<dd>[description]</dd>
<dt><code>Exception</code></dt>
<dd>[description]</dd>
</dl></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">class StreamInterface(MeshInterface):
&#34;&#34;&#34;Interface class for meshtastic devices over a stream link (serial, TCP, etc)&#34;&#34;&#34;
def __init__(self, devPath=None, debugOut=None, noProto=False, connectNow=True):
&#34;&#34;&#34;Constructor, opens a connection to a specified serial port, or if unspecified try to
find one Meshtastic device by probing
Keyword Arguments:
devPath {string} -- A filepath to a device, i.e. /dev/ttyUSB0 (default: {None})
debugOut {stream} -- If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None})
Raises:
Exception: [description]
Exception: [description]
&#34;&#34;&#34;
if devPath is None:
ports = util.findPorts()
if len(ports) == 0:
raise Exception(&#34;No Meshtastic devices detected&#34;)
elif len(ports) &gt; 1:
raise Exception(
f&#34;Multiple ports detected, you must specify a device, such as {ports[0].device}&#34;)
else:
devPath = ports[0]
logging.debug(f&#34;Connecting to {devPath}&#34;)
self.devPath = devPath
self._rxBuf = bytes() # empty
self._wantExit = False
self.stream = serial.Serial(
devPath, 921600, exclusive=True, timeout=0.5)
self._rxThread = threading.Thread(target=self.__reader, args=())
MeshInterface.__init__(self, debugOut=debugOut, noProto=noProto)
# Start the reader thread after superclass constructor completes init
if connectNow:
self.connect()
def connect(self):
&#34;&#34;&#34;Connect to our radio
Normally this is called automatically by the constructor, but if you passed in connectNow=False you can manually
start the reading thread later.
&#34;&#34;&#34;
# Send some bogus UART characters to force a sleeping device to wake
self.stream.write(bytes([START1, START1, START1, START1]))
self.stream.flush()
time.sleep(0.1) # wait 100ms to give device time to start running
self._rxThread.start()
def _sendToRadio(self, toRadio):
&#34;&#34;&#34;Send a ToRadio protobuf to the device&#34;&#34;&#34;
logging.debug(f&#34;Sending: {toRadio}&#34;)
b = toRadio.SerializeToString()
bufLen = len(b)
header = bytes([START1, START2, (bufLen &gt;&gt; 8) &amp; 0xff, bufLen &amp; 0xff])
self.stream.write(header)
self.stream.write(b)
self.stream.flush()
def close(self):
&#34;&#34;&#34;Close a connection to the device&#34;&#34;&#34;
logging.debug(&#34;Closing serial stream&#34;)
# pyserial cancel_read doesn&#39;t seem to work, therefore we ask the reader thread to close things for us
self._wantExit = True
if self._rxThread != threading.current_thread():
self._rxThread.join() # wait for it to exit
def __reader(self):
&#34;&#34;&#34;The reader thread that reads bytes from our stream&#34;&#34;&#34;
empty = bytes()
try:
while not self._wantExit:
b = self.stream.read(1)
if len(b) &gt; 0:
# logging.debug(f&#34;read returned {b}&#34;)
c = b[0]
ptr = len(self._rxBuf)
# Assume we want to append this byte, fixme use bytearray instead
self._rxBuf = self._rxBuf + b
if ptr == 0: # looking for START1
if c != START1:
self._rxBuf = empty # failed to find start
if self.debugOut != None:
try:
self.debugOut.write(b.decode(&#34;utf-8&#34;))
except:
self.debugOut.write(&#39;?&#39;)
elif ptr == 1: # looking for START2
if c != START2:
self.rfBuf = empty # failed to find start2
elif ptr &gt;= HEADER_LEN: # we&#39;ve at least got a header
# big endian length follos header
packetlen = (self._rxBuf[2] &lt;&lt; 8) + self._rxBuf[3]
if ptr == HEADER_LEN: # we _just_ finished reading the header, validate length
if packetlen &gt; MAX_TO_FROM_RADIO_SIZE:
self.rfBuf = empty # length ws out out bounds, restart
if len(self._rxBuf) != 0 and ptr + 1 == packetlen + HEADER_LEN:
try:
self._handleFromRadio(self._rxBuf[HEADER_LEN:])
except Exception as ex:
logging.error(
f&#34;Error while handling message from radio {ex}&#34;)
traceback.print_exc()
self._rxBuf = empty
else:
# logging.debug(f&#34;timeout on {self.devPath}&#34;)
pass
except serial.SerialException as ex:
logging.warn(
&#34;Meshtastic erial port disconnected, disconnecting...&#34;)
finally:
logging.debug(&#34;reader is exiting&#34;)
self.stream.close()
self._disconnected()</code></pre>
</details>
<h3>Ancestors</h3>
<ul class="hlist">
<li><a title="meshtastic.MeshInterface" href="#meshtastic.MeshInterface">MeshInterface</a></li>
</ul>
<h3>Methods</h3>
<dl>
<dt id="meshtastic.StreamInterface.close"><code class="name flex">
<span>def <span class="ident">close</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"><p>Close a connection to the device</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def close(self):
&#34;&#34;&#34;Close a connection to the device&#34;&#34;&#34;
logging.debug(&#34;Closing serial stream&#34;)
# pyserial cancel_read doesn&#39;t seem to work, therefore we ask the reader thread to close things for us
self._wantExit = True
if self._rxThread != threading.current_thread():
self._rxThread.join() # wait for it to exit</code></pre>
</details>
</dd>
<dt id="meshtastic.StreamInterface.connect"><code class="name flex">
<span>def <span class="ident">connect</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"><p>Connect to our radio</p>
<p>Normally this is called automatically by the constructor, but if you passed in connectNow=False you can manually
start the reading thread later.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def connect(self):
&#34;&#34;&#34;Connect to our radio
Normally this is called automatically by the constructor, but if you passed in connectNow=False you can manually
start the reading thread later.
&#34;&#34;&#34;
# Send some bogus UART characters to force a sleeping device to wake
self.stream.write(bytes([START1, START1, START1, START1]))
self.stream.flush()
time.sleep(0.1) # wait 100ms to give device time to start running
self._rxThread.start()</code></pre>
</details>
</dd>
</dl>
<h3>Inherited members</h3>
<ul class="hlist">
<li><code><b><a title="meshtastic.MeshInterface" href="#meshtastic.MeshInterface">MeshInterface</a></b></code>:
<ul class="hlist">
<li><code><a title="meshtastic.MeshInterface.sendData" href="#meshtastic.MeshInterface.sendData">sendData</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendPacket" href="#meshtastic.MeshInterface.sendPacket">sendPacket</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendPosition" href="#meshtastic.MeshInterface.sendPosition">sendPosition</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendText" href="#meshtastic.MeshInterface.sendText">sendText</a></code></li>
<li><code><a title="meshtastic.MeshInterface.writeConfig" href="#meshtastic.MeshInterface.writeConfig">writeConfig</a></code></li>
</ul>
</li>
</ul>
</dd>
</dl>
</section>
</article>
<nav id="sidebar">
<h1>Index</h1>
<div class="toc">
<ul>
<li><a href="#an-api-for-meshtastic-devices">an API for Meshtastic devices</a></li>
<li><a href="#published-pubsub-topics">Published PubSub topics</a></li>
<li><a href="#example-usage">Example Usage</a></li>
</ul>
</div>
<ul id="index">
<li><h3><a href="#header-submodules">Sub-modules</a></h3>
<ul>
<li><code><a title="meshtastic.ble" href="ble.html">meshtastic.ble</a></code></li>
<li><code><a title="meshtastic.mesh_pb2" href="mesh_pb2.html">meshtastic.mesh_pb2</a></code></li>
<li><code><a title="meshtastic.test" href="test.html">meshtastic.test</a></code></li>
<li><code><a title="meshtastic.util" href="util.html">meshtastic.util</a></code></li>
</ul>
</li>
<li><h3><a href="#header-variables">Global variables</a></h3>
<ul class="">
<li><code><a title="meshtastic.MY_CONFIG_ID" href="#meshtastic.MY_CONFIG_ID">MY_CONFIG_ID</a></code></li>
</ul>
</li>
<li><h3><a href="#header-classes">Classes</a></h3>
<ul>
<li>
<h4><code><a title="meshtastic.BLEInterface" href="#meshtastic.BLEInterface">BLEInterface</a></code></h4>
<ul class="">
<li><code><a title="meshtastic.BLEInterface.close" href="#meshtastic.BLEInterface.close">close</a></code></li>
</ul>
</li>
<li>
<h4><code><a title="meshtastic.MeshInterface" href="#meshtastic.MeshInterface">MeshInterface</a></code></h4>
<ul class="">
<li><code><a title="meshtastic.MeshInterface.sendData" href="#meshtastic.MeshInterface.sendData">sendData</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendPacket" href="#meshtastic.MeshInterface.sendPacket">sendPacket</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendPosition" href="#meshtastic.MeshInterface.sendPosition">sendPosition</a></code></li>
<li><code><a title="meshtastic.MeshInterface.sendText" href="#meshtastic.MeshInterface.sendText">sendText</a></code></li>
<li><code><a title="meshtastic.MeshInterface.writeConfig" href="#meshtastic.MeshInterface.writeConfig">writeConfig</a></code></li>
</ul>
</li>
<li>
<h4><code><a title="meshtastic.StreamInterface" href="#meshtastic.StreamInterface">StreamInterface</a></code></h4>
<ul class="">
<li><code><a title="meshtastic.StreamInterface.close" href="#meshtastic.StreamInterface.close">close</a></code></li>
<li><code><a title="meshtastic.StreamInterface.connect" href="#meshtastic.StreamInterface.connect">connect</a></code></li>
</ul>
</li>
</ul>
</li>
</ul>
</nav>
</main>
<footer id="footer">
<p>Generated by <a href="https://pdoc3.github.io/pdoc"><cite>pdoc</cite> 0.8.1</a>.</p>
</footer>
<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/highlight.min.js"></script>
<script>hljs.initHighlightingOnLoad()</script>
</body>
</html>