Files
python/docs/meshtastic/__main__.html
2021-12-28 22:10:11 -08:00

1865 lines
82 KiB
HTML

<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, minimum-scale=1" />
<meta name="generator" content="pdoc 0.10.1.dev1+g4aa70de.d20211229" />
<title>meshtastic.__main__ API documentation</title>
<meta name="description" content="Main Meshtastic" />
<link rel="preload stylesheet" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/10up-sanitize.css/11.0.1/sanitize.min.css" integrity="sha256-PK9q560IAAa6WVRRh76LtCaI8pjTJ2z11v0miyNNjrs=" crossorigin>
<link rel="preload stylesheet" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/10up-sanitize.css/11.0.1/typography.min.css" integrity="sha256-7l/o7C8jubJiy74VsKTidCy1yBkRtiUGbVkYBylBqUg=" crossorigin>
<link rel="stylesheet preload" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.1.1/styles/github.min.css" crossorigin>
<style>:root{--highlight-color:#fe9}.flex{display:flex !important}body{line-height:1.5em}#content{padding:20px}#sidebar{padding:30px;overflow:hidden}#sidebar > *:last-child{margin-bottom:2cm}.http-server-breadcrumbs{font-size:130%;margin:0 0 15px 0}#footer{font-size:.75em;padding:5px 30px;border-top:1px solid #ddd;text-align:right}#footer p{margin:0 0 0 1em;display:inline-block}#footer p:last-child{margin-right:30px}h1,h2,h3,h4,h5{font-weight:300}h1{font-size:2.5em;line-height:1.1em}h2{font-size:1.75em;margin:1em 0 .50em 0}h3{font-size:1.4em;margin:25px 0 10px 0}h4{margin:0;font-size:105%}h1:target,h2:target,h3:target,h4:target,h5:target,h6:target{background:var(--highlight-color);padding:.2em 0}a{color:#058;text-decoration:none;transition:color .3s ease-in-out}a:hover{color:#e82}.title code{font-weight:bold}h2[id^="header-"]{margin-top:2em}.ident{color:#900}pre code{background:#f8f8f8;font-size:.8em;line-height:1.4em}code{background:#f2f2f1;padding:1px 4px;overflow-wrap:break-word}h1 code{background:transparent}pre{background:#f8f8f8;border:0;border-top:1px solid #ccc;border-bottom:1px solid #ccc;margin:1em 0;padding:1ex}#http-server-module-list{display:flex;flex-flow:column}#http-server-module-list div{display:flex}#http-server-module-list dt{min-width:10%}#http-server-module-list p{margin-top:0}.toc ul,#index{list-style-type:none;margin:0;padding:0}#index code{background:transparent}#index h3{border-bottom:1px solid #ddd}#index ul{padding:0}#index h4{margin-top:.6em;font-weight:bold}@media (min-width:200ex){#index .two-column{column-count:2}}@media (min-width:300ex){#index .two-column{column-count:3}}dl{margin-bottom:2em}dl dl:last-child{margin-bottom:4em}dd{margin:0 0 1em 3em}#header-classes + dl > dd{margin-bottom:3em}dd dd{margin-left:2em}dd p{margin:10px 0}.name{background:#eee;font-weight:bold;font-size:.85em;padding:5px 10px;display:inline-block;min-width:40%}.name:hover{background:#e0e0e0}dt:target .name{background:var(--highlight-color)}.name > span:first-child{white-space:nowrap}.name.class > span:nth-child(2){margin-left:.4em}.inherited{color:#999;border-left:5px solid #eee;padding-left:1em}.inheritance em{font-style:normal;font-weight:bold}.desc h2{font-weight:400;font-size:1.25em}.desc h3{font-size:1em}.desc dt code{background:inherit}.source summary,.git-link-div{color:#666;text-align:right;font-weight:400;font-size:.8em;text-transform:uppercase}.source summary > *{white-space:nowrap;cursor:pointer}.git-link{color:inherit;margin-left:1em}.source pre{max-height:500px;overflow:auto;margin:0}.source pre code{font-size:12px;overflow:visible}.hlist{list-style:none}.hlist li{display:inline}.hlist li:after{content:',\2002'}.hlist li:last-child:after{content:none}.hlist .hlist{display:inline;padding-left:1em}img{max-width:100%}td{padding:0 .5em}.admonition{padding:.1em .5em;margin-bottom:1em}.admonition-title{font-weight:bold}.admonition.note,.admonition.info,.admonition.important{background:#aef}.admonition.todo,.admonition.versionadded,.admonition.tip,.admonition.hint{background:#dfd}.admonition.warning,.admonition.versionchanged,.admonition.deprecated{background:#fd4}.admonition.error,.admonition.danger,.admonition.caution{background:lightpink}</style>
<style media="screen and (min-width: 700px)">@media screen and (min-width:700px){#sidebar{width:30%;height:100vh;overflow:auto;position:sticky;top:0}#content{width:70%;max-width:100ch;padding:3em 4em;border-left:1px solid #ddd}pre code{font-size:1em}.item .name{font-size:1em}main{display:flex;flex-direction:row-reverse;justify-content:flex-end}.toc ul ul,#index ul{padding-left:1.5em}.toc > ul > li{margin-top:.5em}}</style>
<style media="print">@media print{#sidebar h1{page-break-before:always}.source{display:none}}@media print{*{background:transparent !important;color:#000 !important;box-shadow:none !important;text-shadow:none !important}a[href]:after{content:" (" attr(href) ")";font-size:90%}a[href][title]:after{content:none}abbr[title]:after{content:" (" attr(title) ")"}.ir a:after,a[href^="javascript:"]:after,a[href^="#"]:after{content:""}pre,blockquote{border:1px solid #999;page-break-inside:avoid}thead{display:table-header-group}tr,img{page-break-inside:avoid}img{max-width:100% !important}@page{margin:0.5cm}p,h2,h3{orphans:3;widows:3}h1,h2,h3,h4,h5,h6{page-break-after:avoid}}</style>
<script defer src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.1.1/highlight.min.js" integrity="sha256-Uv3H6lx7dJmRfRvH8TH6kJD1TSK1aFcwgx+mdg3epi8=" crossorigin></script>
<script>window.addEventListener('DOMContentLoaded', () => hljs.initHighlighting())</script>
</head>
<body>
<main>
<article id="content">
<header>
<h1 class="title">Module <code>meshtastic.__main__</code></h1>
</header>
<section id="section-intro">
<p>Main Meshtastic</p>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">#!python3
&#34;&#34;&#34; Main Meshtastic
&#34;&#34;&#34;
import argparse
import platform
import logging
import sys
import time
import yaml
from pubsub import pub
import pyqrcode
import pkg_resources
import meshtastic.util
import meshtastic.test
from . import remote_hardware
from . import portnums_pb2, channel_pb2, radioconfig_pb2
from .globals import Globals
have_tunnel = platform.system() == &#39;Linux&#39;
&#34;&#34;&#34;We only import the tunnel code if we are on a platform that can run it. &#34;&#34;&#34;
def onReceive(packet, interface):
&#34;&#34;&#34;Callback invoked when a packet arrives&#34;&#34;&#34;
our_globals = Globals.getInstance()
args = our_globals.get_args()
try:
d = packet.get(&#39;decoded&#39;)
logging.debug(f&#39;in onReceive() d:{d}&#39;)
# Exit once we receive a reply
if args and args.sendtext and packet[&#34;to&#34;] == interface.myInfo.my_node_num and d[&#34;portnum&#34;] == portnums_pb2.PortNum.TEXT_MESSAGE_APP:
interface.close() # after running command then exit
# Reply to every received message with some stats
if args and args.reply:
msg = d.get(&#39;text&#39;)
if msg:
rxSnr = packet[&#39;rxSnr&#39;]
hopLimit = packet[&#39;hopLimit&#39;]
print(f&#34;message: {msg}&#34;)
reply = &#34;got msg \&#39;{}\&#39; with rxSnr: {} and hopLimit: {}&#34;.format(msg, rxSnr, hopLimit)
print(&#34;Sending reply: &#34;, reply)
interface.sendText(reply)
except Exception as ex:
print(ex)
def onConnection(interface, topic=pub.AUTO_TOPIC):
&#34;&#34;&#34;Callback invoked when we connect/disconnect from a radio&#34;&#34;&#34;
print(f&#34;Connection changed: {topic.getName()}&#34;)
def getPref(attributes, name):
&#34;&#34;&#34;Get a channel or preferences value&#34;&#34;&#34;
objDesc = attributes.DESCRIPTOR
field = objDesc.fields_by_name.get(name)
if not field:
print(f&#34;{attributes.__class__.__name__} does not have an attribute called {name}, so you can not get it.&#34;)
print(f&#34;Choices in sorted order are:&#34;)
names = []
for f in objDesc.fields:
names.append(f&#39;{f.name}&#39;)
for temp_name in sorted(names):
print(f&#34; {temp_name}&#34;)
return
# okay - try to read the value
try:
try:
val = getattr(attributes, name)
except TypeError:
# The getter didn&#39;t like our arg type guess try again as a string
val = getattr(attributes, name)
# succeeded!
print(f&#34;{name}: {str(val)}&#34;)
except Exception as ex:
print(f&#34;Can&#39;t get {name} due to {ex}&#34;)
def setPref(attributes, name, valStr):
&#34;&#34;&#34;Set a channel or preferences value&#34;&#34;&#34;
objDesc = attributes.DESCRIPTOR
field = objDesc.fields_by_name.get(name)
if not field:
print(f&#34;{attributes.__class__.__name__} does not have an attribute called {name}, so you can not set it.&#34;)
print(f&#34;Choices in sorted order are:&#34;)
names = []
for f in objDesc.fields:
names.append(f&#39;{f.name}&#39;)
for temp_name in sorted(names):
print(f&#34; {temp_name}&#34;)
return
val = meshtastic.util.fromStr(valStr)
enumType = field.enum_type
# pylint: disable=C0123
if enumType and type(val) == str:
# We&#39;ve failed so far to convert this string into an enum, try to find it by reflection
e = enumType.values_by_name.get(val)
if e:
val = e.number
else:
print(f&#34;{name} does not have an enum called {val}, so you can not set it.&#34;)
print(f&#34;Choices in sorted order are:&#34;)
names = []
for f in enumType.values:
names.append(f&#39;{f.name}&#39;)
for temp_name in sorted(names):
print(f&#34; {temp_name}&#34;)
return
# okay - try to read the value
try:
try:
setattr(attributes, name, val)
except TypeError:
# The setter didn&#39;t like our arg type guess try again as a string
setattr(attributes, name, valStr)
# succeeded!
print(f&#34;Set {name} to {valStr}&#34;)
except Exception as ex:
print(f&#34;Can&#39;t set {name} due to {ex}&#34;)
def onConnected(interface):
&#34;&#34;&#34;Callback invoked when we connect to a radio&#34;&#34;&#34;
closeNow = False # Should we drop the connection after we finish?
try:
our_globals = Globals.getInstance()
args = our_globals.get_args()
print(&#34;Connected to radio&#34;)
def getNode():
&#34;&#34;&#34;This operation could be expensive, so we try to cache the results&#34;&#34;&#34;
targetNode = our_globals.get_target_node()
if not targetNode:
targetNode = interface.getNode(args.destOrLocal)
our_globals.set_target_node(targetNode)
return targetNode
if args.setlat or args.setlon or args.setalt:
closeNow = True
alt = 0
lat = 0.0
lon = 0.0
prefs = interface.localNode.radioConfig.preferences
if args.setalt:
alt = int(args.setalt)
prefs.fixed_position = True
print(f&#34;Fixing altitude at {alt} meters&#34;)
if args.setlat:
lat = float(args.setlat)
prefs.fixed_position = True
print(f&#34;Fixing latitude at {lat} degrees&#34;)
if args.setlon:
lon = float(args.setlon)
prefs.fixed_position = True
print(f&#34;Fixing longitude at {lon} degrees&#34;)
print(&#34;Setting device position&#34;)
# can include lat/long/alt etc: latitude = 37.5, longitude = -122.1
interface.sendPosition(lat, lon, alt)
interface.localNode.writeConfig()
elif not args.no_time:
# We normally provide a current time to the mesh when we connect
interface.sendPosition()
if args.set_owner:
closeNow = True
print(f&#34;Setting device owner to {args.set_owner}&#34;)
getNode().setOwner(args.set_owner)
if args.pos_fields:
# If --pos-fields invoked with args, set position fields
closeNow = True
prefs = getNode().radioConfig.preferences
allFields = 0
try:
for field in args.pos_fields:
v_field = radioconfig_pb2.PositionFlags.Value(field)
allFields |= v_field
except ValueError:
print(&#34;ERROR: supported position fields are:&#34;)
print(radioconfig_pb2.PositionFlags.keys())
print(&#34;If no fields are specified, will read and display current value.&#34;)
else:
print(f&#34;Setting position fields to {allFields}&#34;)
setPref(prefs, &#39;position_flags&#39;, (&#39;%d&#39; % allFields))
print(&#34;Writing modified preferences to device&#34;)
getNode().writeConfig()
elif args.pos_fields is not None:
# If --pos-fields invoked without args, read and display current value
closeNow = True
prefs = getNode().radioConfig.preferences
fieldNames = []
for bit in radioconfig_pb2.PositionFlags.values():
if prefs.position_flags &amp; bit:
fieldNames.append(radioconfig_pb2.PositionFlags.Name(bit))
print(&#39; &#39;.join(fieldNames))
if args.set_team:
closeNow = True
try:
v_team = meshtastic.mesh_pb2.Team.Value(args.set_team.upper())
except ValueError:
v_team = 0
print(f&#34;ERROR: Team \&#39;{args.set_team}\&#39; not found.&#34;)
print(&#34;Try a team name from the sorted list below, or use &#39;CLEAR&#39; for unaffiliated:&#34;)
print(sorted(meshtastic.mesh_pb2.Team.keys()))
else:
print(f&#34;Setting team to {meshtastic.mesh_pb2.Team.Name(v_team)}&#34;)
getNode().setOwner(team=v_team)
if args.set_ham:
closeNow = True
print(f&#34;Setting Ham ID to {args.set_ham} and turning off encryption&#34;)
getNode().setOwner(args.set_ham, is_licensed=True)
# Must turn off encryption on primary channel
getNode().turnOffEncryptionOnPrimaryChannel()
if args.reboot:
closeNow = True
getNode().reboot()
if args.sendtext:
closeNow = True
channelIndex = 0
if args.ch_index is not None:
channelIndex = int(args.ch_index)
ch = getNode().getChannelByChannelIndex(channelIndex)
if ch and ch.role != channel_pb2.Channel.Role.DISABLED:
print(f&#34;Sending text message {args.sendtext} to {args.destOrAll} on channelIndex:{channelIndex}&#34;)
interface.sendText(args.sendtext, args.destOrAll, wantAck=True, channelIndex=channelIndex)
else:
meshtastic.util.our_exit(f&#34;Warning: {channelIndex} is not a valid channel. Channel must not be DISABLED.&#34;)
if args.sendping:
payload = str.encode(&#34;test string&#34;)
print(f&#34;Sending ping message to {args.destOrAll}&#34;)
interface.sendData(payload, args.destOrAll, portNum=portnums_pb2.PortNum.REPLY_APP,
wantAck=True, wantResponse=True)
if args.gpio_wrb or args.gpio_rd or args.gpio_watch:
rhc = remote_hardware.RemoteHardwareClient(interface)
if args.gpio_wrb:
bitmask = 0
bitval = 0
for wrpair in (args.gpio_wrb or []):
bitmask |= 1 &lt;&lt; int(wrpair[0])
bitval |= int(wrpair[1]) &lt;&lt; int(wrpair[0])
print(f&#34;Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}&#34;)
rhc.writeGPIOs(args.dest, bitmask, bitval)
closeNow = True
if args.gpio_rd:
bitmask = int(args.gpio_rd, 16)
print(f&#34;Reading GPIO mask 0x{bitmask:x} from {args.dest}&#34;)
interface.mask = bitmask
rhc.readGPIOs(args.dest, bitmask, None)
if not interface.noProto:
# wait up to X seconds for a response
for _ in range(10):
time.sleep(1)
if interface.gotResponse:
break
logging.debug(f&#39;end of gpio_rd&#39;)
if args.gpio_watch:
bitmask = int(args.gpio_watch, 16)
print(f&#34;Watching GPIO mask 0x{bitmask:x} from {args.dest}. Press ctrl-c to exit&#34;)
while True:
rhc.watchGPIOs(args.dest, bitmask)
time.sleep(1)
# handle settings
if args.set:
closeNow = True
prefs = getNode().radioConfig.preferences
# Handle the int/float/bool arguments
for pref in args.set:
setPref(prefs, pref[0], pref[1])
print(&#34;Writing modified preferences to device&#34;)
getNode().writeConfig()
if args.configure:
with open(args.configure[0], encoding=&#39;utf8&#39;) as file:
configuration = yaml.safe_load(file)
closeNow = True
if &#39;owner&#39; in configuration:
print(f&#34;Setting device owner to {configuration[&#39;owner&#39;]}&#34;)
getNode().setOwner(configuration[&#39;owner&#39;])
if &#39;channel_url&#39; in configuration:
print(&#34;Setting channel url to&#34;, configuration[&#39;channel_url&#39;])
getNode().setURL(configuration[&#39;channel_url&#39;])
if &#39;location&#39; in configuration:
alt = 0
lat = 0.0
lon = 0.0
prefs = interface.localNode.radioConfig.preferences
if &#39;alt&#39; in configuration[&#39;location&#39;]:
alt = int(configuration[&#39;location&#39;][&#39;alt&#39;])
prefs.fixed_position = True
print(f&#34;Fixing altitude at {alt} meters&#34;)
if &#39;lat&#39; in configuration[&#39;location&#39;]:
lat = float(configuration[&#39;location&#39;][&#39;lat&#39;])
prefs.fixed_position = True
print(f&#34;Fixing latitude at {lat} degrees&#34;)
if &#39;lon&#39; in configuration[&#39;location&#39;]:
lon = float(configuration[&#39;location&#39;][&#39;lon&#39;])
prefs.fixed_position = True
print(f&#34;Fixing longitude at {lon} degrees&#34;)
print(&#34;Setting device position&#34;)
interface.sendPosition(lat, lon, alt)
interface.localNode.writeConfig()
if &#39;user_prefs&#39; in configuration:
prefs = getNode().radioConfig.preferences
for pref in configuration[&#39;user_prefs&#39;]:
setPref(prefs, pref, str(configuration[&#39;user_prefs&#39;][pref]))
print(&#34;Writing modified preferences to device&#34;)
getNode().writeConfig()
if args.export_config:
# export the configuration (the opposite of &#39;--configure&#39;)
closeNow = True
export_config(interface)
if args.seturl:
closeNow = True
getNode().setURL(args.seturl)
# handle changing channels
if args.ch_add:
closeNow = True
if len(args.ch_add) &gt; 10:
meshtastic.util.our_exit(&#34;Warning: Channel name must be shorter. Channel not added.&#34;)
n = getNode()
ch = n.getChannelByName(args.ch_add)
if ch:
meshtastic.util.our_exit(f&#34;Warning: This node already has a &#39;{args.ch_add}&#39; channel. No changes were made.&#34;)
else:
# get the first channel that is disabled (i.e., available)
ch = n.getDisabledChannel()
if not ch:
meshtastic.util.our_exit(&#34;Warning: No free channels were found&#34;)
chs = channel_pb2.ChannelSettings()
chs.psk = meshtastic.util.genPSK256()
chs.name = args.ch_add
ch.settings.CopyFrom(chs)
ch.role = channel_pb2.Channel.Role.SECONDARY
print(f&#34;Writing modified channels to device&#34;)
n.writeChannel(ch.index)
if args.ch_del:
closeNow = True
channelIndex = our_globals.get_channel_index()
if channelIndex is None:
meshtastic.util.our_exit(&#34;Warning: Need to specify &#39;--ch-index&#39; for &#39;--ch-del&#39;.&#34;, 1)
else:
if channelIndex == 0:
meshtastic.util.our_exit(&#34;Warning: Cannot delete primary channel.&#34;, 1)
else:
print(f&#34;Deleting channel {channelIndex}&#34;)
ch = getNode().deleteChannel(channelIndex)
ch_changes = [args.ch_longslow, args.ch_longfast,
args.ch_mediumslow, args.ch_mediumfast,
args.ch_shortslow, args.ch_shortfast]
any_primary_channel_changes = any(x for x in ch_changes)
if args.ch_set or any_primary_channel_changes or args.ch_enable or args.ch_disable:
closeNow = True
channelIndex = our_globals.get_channel_index()
if channelIndex is None:
if any_primary_channel_changes:
# we assume that they want the primary channel if they&#39;re setting range values
channelIndex = 0
else:
meshtastic.util.our_exit(&#34;Warning: Need to specify &#39;--ch-index&#39;.&#34;, 1)
ch = getNode().channels[channelIndex]
if any_primary_channel_changes or args.ch_enable or args.ch_disable:
if channelIndex == 0 and not any_primary_channel_changes:
meshtastic.util.our_exit(&#34;Warning: Cannot enable/disable PRIMARY channel.&#34;)
if channelIndex != 0:
if any_primary_channel_changes:
meshtastic.util.our_exit(&#34;Warning: Standard channel settings can only be applied to the PRIMARY channel&#34;)
enable = True # default to enable
if args.ch_enable:
enable = True
if args.ch_disable:
enable = False
def setSimpleChannel(modem_config):
&#34;&#34;&#34;Set one of the simple modem_config only based channels&#34;&#34;&#34;
# Completely new channel settings
chs = channel_pb2.ChannelSettings()
chs.modem_config = modem_config
chs.psk = bytes([1]) # Use default channel psk 1
ch.settings.CopyFrom(chs)
# handle the simple channel set commands
if args.ch_longslow:
setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw125Cr48Sf4096)
if args.ch_longfast:
setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw31_25Cr48Sf512)
if args.ch_mediumslow:
setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw250Cr46Sf2048)
if args.ch_mediumfast:
setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw250Cr47Sf1024)
if args.ch_shortslow:
setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw125Cr45Sf128)
if args.ch_shortfast:
setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw500Cr45Sf128)
# Handle the channel settings
for pref in (args.ch_set or []):
if pref[0] == &#34;psk&#34;:
ch.settings.psk = meshtastic.util.fromPSK(pref[1])
else:
setPref(ch.settings, pref[0], pref[1])
enable = True # If we set any pref, assume the user wants to enable the channel
if enable:
ch.role = channel_pb2.Channel.Role.PRIMARY if (
channelIndex == 0) else channel_pb2.Channel.Role.SECONDARY
else:
ch.role = channel_pb2.Channel.Role.DISABLED
print(f&#34;Writing modified channels to device&#34;)
getNode().writeChannel(channelIndex)
if args.info:
print(&#34;&#34;)
if not args.dest: # If we aren&#39;t trying to talk to our local node, don&#39;t show it
interface.showInfo()
print(&#34;&#34;)
getNode().showInfo()
closeNow = True # FIXME, for now we leave the link up while talking to remote nodes
print(&#34;&#34;)
if args.get:
closeNow = True
prefs = getNode().radioConfig.preferences
# Handle the int/float/bool arguments
for pref in args.get:
getPref(prefs, pref[0])
print(&#34;Completed getting preferences&#34;)
if args.nodes:
closeNow = True
interface.showNodes()
if args.qr:
closeNow = True
url = interface.localNode.getURL(includeAll=False)
print(f&#34;Primary channel URL {url}&#34;)
qr = pyqrcode.create(url)
print(qr.terminal())
if have_tunnel and args.tunnel:
# pylint: disable=C0415
from . import tunnel
# Even if others said we could close, stay open if the user asked for a tunnel
closeNow = False
tunnel.Tunnel(interface, subnet=args.tunnel_net)
# if the user didn&#39;t ask for serial debugging output, we might want to exit after we&#39;ve done our operation
if (not args.seriallog) and closeNow:
interface.close() # after running command then exit
except Exception as ex:
print(f&#34;Aborting due to: {ex}&#34;)
interface.close() # close the connection now, so that our app exits
def onNode(node):
&#34;&#34;&#34;Callback invoked when the node DB changes&#34;&#34;&#34;
print(f&#34;Node changed: {node}&#34;)
def subscribe():
&#34;&#34;&#34;Subscribe to the topics the user probably wants to see, prints output to stdout&#34;&#34;&#34;
pub.subscribe(onReceive, &#34;meshtastic.receive&#34;)
# pub.subscribe(onConnection, &#34;meshtastic.connection&#34;)
# We now call onConnected from main
# pub.subscribe(onConnected, &#34;meshtastic.connection.established&#34;)
# pub.subscribe(onNode, &#34;meshtastic.node&#34;)
def export_config(interface):
&#34;&#34;&#34;used in--export-config&#34;&#34;&#34;
owner = interface.getLongName()
channel_url = interface.localNode.getURL()
myinfo = interface.getMyNodeInfo()
pos = myinfo.get(&#39;position&#39;)
lat = None
lon = None
alt = None
if pos:
lat = pos.get(&#39;latitude&#39;)
lon = pos.get(&#39;longitude&#39;)
alt = pos.get(&#39;altitude&#39;)
config = &#34;# start of Meshtastic configure yaml\n&#34;
if owner:
config += f&#34;owner: {owner}\n\n&#34;
if channel_url:
config += f&#34;channel_url: {channel_url}\n\n&#34;
if lat or lon or alt:
config += &#34;location:\n&#34;
if lat:
config += f&#34; lat: {lat}\n&#34;
if lon:
config += f&#34; lon: {lon}\n&#34;
if alt:
config += f&#34; alt: {alt}\n&#34;
config += &#34;\n&#34;
preferences = f&#39;{interface.localNode.radioConfig.preferences}&#39;
prefs = preferences.splitlines()
if prefs:
config += &#34;user_prefs:\n&#34;
for pref in prefs:
config += f&#34; {meshtastic.util.quoteBooleans(pref)}\n&#34;
print(config)
return config
def common():
&#34;&#34;&#34;Shared code for all of our command line wrappers&#34;&#34;&#34;
our_globals = Globals.getInstance()
args = our_globals.get_args()
parser = our_globals.get_parser()
logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO,
format=&#39;%(levelname)s file:%(filename)s %(funcName)s line:%(lineno)s %(message)s&#39;)
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
meshtastic.util.our_exit(&#34;&#34;, 1)
else:
if args.support:
meshtastic.util.support_info()
meshtastic.util.our_exit(&#34;&#34;, 0)
if args.ch_index is not None:
channelIndex = int(args.ch_index)
our_globals.set_channel_index(channelIndex)
# Some commands require dest to be set, so we now use destOrAll/destOrLocal for more lenient commands
if not args.dest:
args.destOrAll = &#34;^all&#34;
args.destOrLocal = &#34;^local&#34;
else:
args.destOrAll = args.dest
args.destOrLocal = args.dest # FIXME, temp hack for debugging remove
if not args.seriallog:
if args.noproto:
args.seriallog = &#34;stdout&#34;
else:
args.seriallog = &#34;none&#34; # assume no debug output in this case
if args.deprecated is not None:
logging.error(
&#39;This option has been deprecated, see help below for the correct replacement...&#39;)
parser.print_help(sys.stderr)
meshtastic.util.our_exit(&#39;&#39;, 1)
elif args.test:
result = meshtastic.test.testAll()
if not result:
meshtastic.util.our_exit(&#34;Warning: Test was not successful.&#34;)
else:
meshtastic.util.our_exit(&#34;Test was a success.&#34;, 0)
else:
if args.seriallog == &#34;stdout&#34;:
logfile = sys.stdout
elif args.seriallog == &#34;none&#34;:
args.seriallog = None
logging.debug(&#34;Not logging serial output&#34;)
logfile = None
else:
logging.info(f&#34;Logging serial output to {args.seriallog}&#34;)
# Note: using &#34;line buffering&#34;
# pylint: disable=R1732
logfile = open(args.seriallog, &#39;w+&#39;,
buffering=1, encoding=&#39;utf8&#39;)
subscribe()
if args.ble:
client = meshtastic.ble_interface.BLEInterface(args.ble, debugOut=logfile, noProto=args.noproto)
elif args.host:
client = meshtastic.tcp_interface.TCPInterface(
args.host, debugOut=logfile, noProto=args.noproto)
else:
client = meshtastic.serial_interface.SerialInterface(
args.port, debugOut=logfile, noProto=args.noproto)
# We assume client is fully connected now
onConnected(client)
if args.noproto or (have_tunnel and args.tunnel): # loop until someone presses ctrlc
while True:
time.sleep(1000)
# don&#39;t call exit, background threads might be running still
# sys.exit(0)
def initParser():
&#34;&#34;&#34;Initialize the command line argument parsing.&#34;&#34;&#34;
our_globals = Globals.getInstance()
parser = our_globals.get_parser()
args = our_globals.get_args()
parser.add_argument(
&#34;--configure&#34;,
help=&#34;Specify a path to a yaml(.yml) file containing the desired settings for the connected device.&#34;,
action=&#39;append&#39;)
parser.add_argument(
&#34;--export-config&#34;,
help=&#34;Export the configuration in yaml(.yml) format.&#34;,
action=&#39;store_true&#39;)
parser.add_argument(
&#34;--port&#34;,
help=&#34;The port the Meshtastic device is connected to, i.e. /dev/ttyUSB0. If unspecified, we&#39;ll try to find it.&#34;,
default=None)
parser.add_argument(
&#34;--host&#34;,
help=&#34;The hostname/ipaddr of the device to connect to (over TCP)&#34;,
default=None)
parser.add_argument(
&#34;--seriallog&#34;,
help=&#34;Log device serial output to either &#39;stdout&#39;, &#39;none&#39; or a filename to append to.&#34;)
parser.add_argument(&#34;--info&#34;, help=&#34;Read and display the radio config information&#34;,
action=&#34;store_true&#34;)
parser.add_argument(&#34;--nodes&#34;, help=&#34;Print Node List in a pretty formatted table&#34;,
action=&#34;store_true&#34;)
parser.add_argument(&#34;--qr&#34;, help=&#34;Display the QR code that corresponds to the current channel&#34;,
action=&#34;store_true&#34;)
parser.add_argument(
&#34;--get&#34;, help=&#34;Get a preferences field. Use an invalid field such as &#39;0&#39; to get a list of all fields.&#34;, nargs=1, action=&#39;append&#39;)
parser.add_argument(
&#34;--set&#34;, help=&#34;Set a preferences field&#34;, nargs=2, action=&#39;append&#39;)
parser.add_argument(
&#34;--seturl&#34;, help=&#34;Set a channel URL&#34;, action=&#34;store&#34;)
parser.add_argument(
&#34;--ch-index&#34;, help=&#34;Set the specified channel index. Channels start at 0 (0 is the PRIMARY channel).&#34;, action=&#34;store&#34;)
parser.add_argument(
&#34;--ch-add&#34;, help=&#34;Add a secondary channel, you must specify a channel name&#34;, default=None)
parser.add_argument(
&#34;--ch-del&#34;, help=&#34;Delete the ch-index channel&#34;, action=&#39;store_true&#39;)
parser.add_argument(
&#34;--ch-enable&#34;, help=&#34;Enable the specified channel&#34;, action=&#34;store_true&#34;, dest=&#34;ch_enable&#34;, default=False)
# Note: We are doing a double negative here (Do we want to disable? If ch_disable==True, then disable.)
parser.add_argument(
&#34;--ch-disable&#34;, help=&#34;Disable the specified channel&#34;, action=&#34;store_true&#34;, dest=&#34;ch_disable&#34;, default=False)
parser.add_argument(
&#34;--ch-set&#34;, help=&#34;Set a channel parameter&#34;, nargs=2, action=&#39;append&#39;)
parser.add_argument(
&#34;--ch-longslow&#34;, help=&#34;Change to the long-range and slow channel&#34;, action=&#39;store_true&#39;)
parser.add_argument(
&#34;--ch-longfast&#34;, help=&#34;Change to the long-range and fast channel&#34;, action=&#39;store_true&#39;)
parser.add_argument(
&#34;--ch-mediumslow&#34;, help=&#34;Change to the medium-range and slow channel&#34;, action=&#39;store_true&#39;)
parser.add_argument(
&#34;--ch-mediumfast&#34;, help=&#34;Change to the medium-range and fast channel&#34;, action=&#39;store_true&#39;)
parser.add_argument(
&#34;--ch-shortslow&#34;, help=&#34;Change to the short-range and slow channel&#34;, action=&#39;store_true&#39;)
parser.add_argument(
&#34;--ch-shortfast&#34;, help=&#34;Change to the short-range and fast channel&#34;, action=&#39;store_true&#39;)
parser.add_argument(
&#34;--set-owner&#34;, help=&#34;Set device owner name&#34;, action=&#34;store&#34;)
parser.add_argument(
&#34;--set-team&#34;, help=&#34;Set team affiliation (an invalid team will list valid values)&#34;, action=&#34;store&#34;)
parser.add_argument(
&#34;--set-ham&#34;, help=&#34;Set licensed Ham ID and turn off encryption&#34;, action=&#34;store&#34;)
parser.add_argument(
&#34;--dest&#34;, help=&#34;The destination node id for any sent commands, if not set &#39;^all&#39; or &#39;^local&#39; is assumed as appropriate&#34;, default=None)
parser.add_argument(
&#34;--sendtext&#34;, help=&#34;Send a text message. Can specify a destination &#39;--dest&#39; and/or channel index &#39;--ch-index&#39;.&#34;)
parser.add_argument(
&#34;--sendping&#34;, help=&#34;Send a ping message (which requests a reply)&#34;, action=&#34;store_true&#34;)
parser.add_argument(
&#34;--reboot&#34;, help=&#34;Tell the destination node to reboot&#34;, action=&#34;store_true&#34;)
parser.add_argument(
&#34;--reply&#34;, help=&#34;Reply to received messages&#34;,
action=&#34;store_true&#34;)
parser.add_argument(
&#34;--gpio-wrb&#34;, nargs=2, help=&#34;Set a particular GPIO # to 1 or 0&#34;, action=&#39;append&#39;)
parser.add_argument(
&#34;--gpio-rd&#34;, help=&#34;Read from a GPIO mask (ex: &#39;0x10&#39;)&#34;)
parser.add_argument(
&#34;--gpio-watch&#34;, help=&#34;Start watching a GPIO mask for changes (ex: &#39;0x10&#39;)&#34;)
parser.add_argument(
&#34;--no-time&#34;, help=&#34;Suppress sending the current time to the mesh&#34;, action=&#34;store_true&#34;)
parser.add_argument(
&#34;--setalt&#34;, help=&#34;Set device altitude (allows use without GPS)&#34;)
parser.add_argument(
&#34;--setlat&#34;, help=&#34;Set device latitude (allows use without GPS)&#34;)
parser.add_argument(
&#34;--setlon&#34;, help=&#34;Set device longitude (allows use without GPS)&#34;)
parser.add_argument(
&#34;--pos-fields&#34;, help=&#34;Specify fields to send when sending a position. Use no argument for a list of valid values. &#34;\
&#34;Can pass multiple values as a space separated list like &#34;\
&#34;this: &#39;--pos-fields POS_ALTITUDE POS_ALT_MSL&#39;&#34;,
nargs=&#34;*&#34;, action=&#34;store&#34;)
parser.add_argument(&#34;--debug&#34;, help=&#34;Show API library debug log messages&#34;,
action=&#34;store_true&#34;)
parser.add_argument(&#34;--test&#34;, help=&#34;Run stress test against all connected Meshtastic devices&#34;,
action=&#34;store_true&#34;)
parser.add_argument(&#34;--ble&#34;, help=&#34;BLE mac address to connect to (BLE is not yet supported for this tool)&#34;,
default=None)
parser.add_argument(&#34;--noproto&#34;, help=&#34;Don&#39;t start the API, just function as a dumb serial terminal.&#34;,
action=&#34;store_true&#34;)
parser.add_argument(&#39;--setchan&#39;, dest=&#39;deprecated&#39;, nargs=2, action=&#39;append&#39;,
help=&#39;Deprecated, use &#34;--ch-set param value&#34; instead&#39;)
parser.add_argument(&#39;--set-router&#39;, dest=&#39;deprecated&#39;,
action=&#39;store_true&#39;, help=&#39;Deprecated, use &#34;--set is_router true&#34; instead&#39;)
parser.add_argument(&#39;--unset-router&#39;, dest=&#39;deprecated&#39;,
action=&#39;store_false&#39;, help=&#39;Deprecated, use &#34;--set is_router false&#34; instead&#39;)
if have_tunnel:
parser.add_argument(&#39;--tunnel&#39;,
action=&#39;store_true&#39;, help=&#34;Create a TUN tunnel device for forwarding IP packets over the mesh&#34;)
parser.add_argument(
&#34;--subnet&#34;, dest=&#39;tunnel_net&#39;, help=&#34;Sets the local-end subnet address for the TUN IP bridge&#34;, default=None)
parser.set_defaults(deprecated=None)
parser.add_argument(&#39;--version&#39;, action=&#39;version&#39;,
version=f&#34;{pkg_resources.require(&#39;meshtastic&#39;)[0].version}&#34;)
parser.add_argument(
&#34;--support&#34;, action=&#39;store_true&#39;, help=&#34;Show support info (useful when troubleshooting an issue)&#34;)
args = parser.parse_args()
our_globals.set_args(args)
our_globals.set_parser(parser)
def main():
&#34;&#34;&#34;Perform command line meshtastic operations&#34;&#34;&#34;
our_globals = Globals.getInstance()
parser = argparse.ArgumentParser()
our_globals.set_parser(parser)
initParser()
common()
def tunnelMain():
&#34;&#34;&#34;Run a meshtastic IP tunnel&#34;&#34;&#34;
our_globals = Globals.getInstance()
parser = argparse.ArgumentParser()
our_globals.set_parser(parser)
initParser()
args = our_globals.get_args()
args.tunnel = True
our_globals.set_args(args)
common()
if __name__ == &#34;__main__&#34;:
main()</code></pre>
</details>
</section>
<section>
</section>
<section>
<h2 class="section-title" id="header-variables">Global variables</h2>
<dl>
<dt id="meshtastic.__main__.have_tunnel"><code class="name">var <span class="ident">have_tunnel</span></code></dt>
<dd>
<div class="desc"><p>We only import the tunnel code if we are on a platform that can run it.</p></div>
</dd>
</dl>
</section>
<section>
<h2 class="section-title" id="header-functions">Functions</h2>
<dl>
<dt id="meshtastic.__main__.common"><code class="name flex">
<span>def <span class="ident">common</span></span>(<span>)</span>
</code></dt>
<dd>
<div class="desc"><p>Shared code for all of our command line wrappers</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def common():
&#34;&#34;&#34;Shared code for all of our command line wrappers&#34;&#34;&#34;
our_globals = Globals.getInstance()
args = our_globals.get_args()
parser = our_globals.get_parser()
logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO,
format=&#39;%(levelname)s file:%(filename)s %(funcName)s line:%(lineno)s %(message)s&#39;)
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
meshtastic.util.our_exit(&#34;&#34;, 1)
else:
if args.support:
meshtastic.util.support_info()
meshtastic.util.our_exit(&#34;&#34;, 0)
if args.ch_index is not None:
channelIndex = int(args.ch_index)
our_globals.set_channel_index(channelIndex)
# Some commands require dest to be set, so we now use destOrAll/destOrLocal for more lenient commands
if not args.dest:
args.destOrAll = &#34;^all&#34;
args.destOrLocal = &#34;^local&#34;
else:
args.destOrAll = args.dest
args.destOrLocal = args.dest # FIXME, temp hack for debugging remove
if not args.seriallog:
if args.noproto:
args.seriallog = &#34;stdout&#34;
else:
args.seriallog = &#34;none&#34; # assume no debug output in this case
if args.deprecated is not None:
logging.error(
&#39;This option has been deprecated, see help below for the correct replacement...&#39;)
parser.print_help(sys.stderr)
meshtastic.util.our_exit(&#39;&#39;, 1)
elif args.test:
result = meshtastic.test.testAll()
if not result:
meshtastic.util.our_exit(&#34;Warning: Test was not successful.&#34;)
else:
meshtastic.util.our_exit(&#34;Test was a success.&#34;, 0)
else:
if args.seriallog == &#34;stdout&#34;:
logfile = sys.stdout
elif args.seriallog == &#34;none&#34;:
args.seriallog = None
logging.debug(&#34;Not logging serial output&#34;)
logfile = None
else:
logging.info(f&#34;Logging serial output to {args.seriallog}&#34;)
# Note: using &#34;line buffering&#34;
# pylint: disable=R1732
logfile = open(args.seriallog, &#39;w+&#39;,
buffering=1, encoding=&#39;utf8&#39;)
subscribe()
if args.ble:
client = meshtastic.ble_interface.BLEInterface(args.ble, debugOut=logfile, noProto=args.noproto)
elif args.host:
client = meshtastic.tcp_interface.TCPInterface(
args.host, debugOut=logfile, noProto=args.noproto)
else:
client = meshtastic.serial_interface.SerialInterface(
args.port, debugOut=logfile, noProto=args.noproto)
# We assume client is fully connected now
onConnected(client)
if args.noproto or (have_tunnel and args.tunnel): # loop until someone presses ctrlc
while True:
time.sleep(1000)
# don&#39;t call exit, background threads might be running still
# sys.exit(0)</code></pre>
</details>
</dd>
<dt id="meshtastic.__main__.export_config"><code class="name flex">
<span>def <span class="ident">export_config</span></span>(<span>interface)</span>
</code></dt>
<dd>
<div class="desc"><p>used in&ndash;export-config</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def export_config(interface):
&#34;&#34;&#34;used in--export-config&#34;&#34;&#34;
owner = interface.getLongName()
channel_url = interface.localNode.getURL()
myinfo = interface.getMyNodeInfo()
pos = myinfo.get(&#39;position&#39;)
lat = None
lon = None
alt = None
if pos:
lat = pos.get(&#39;latitude&#39;)
lon = pos.get(&#39;longitude&#39;)
alt = pos.get(&#39;altitude&#39;)
config = &#34;# start of Meshtastic configure yaml\n&#34;
if owner:
config += f&#34;owner: {owner}\n\n&#34;
if channel_url:
config += f&#34;channel_url: {channel_url}\n\n&#34;
if lat or lon or alt:
config += &#34;location:\n&#34;
if lat:
config += f&#34; lat: {lat}\n&#34;
if lon:
config += f&#34; lon: {lon}\n&#34;
if alt:
config += f&#34; alt: {alt}\n&#34;
config += &#34;\n&#34;
preferences = f&#39;{interface.localNode.radioConfig.preferences}&#39;
prefs = preferences.splitlines()
if prefs:
config += &#34;user_prefs:\n&#34;
for pref in prefs:
config += f&#34; {meshtastic.util.quoteBooleans(pref)}\n&#34;
print(config)
return config</code></pre>
</details>
</dd>
<dt id="meshtastic.__main__.getPref"><code class="name flex">
<span>def <span class="ident">getPref</span></span>(<span>attributes, name)</span>
</code></dt>
<dd>
<div class="desc"><p>Get a channel or preferences value</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def getPref(attributes, name):
&#34;&#34;&#34;Get a channel or preferences value&#34;&#34;&#34;
objDesc = attributes.DESCRIPTOR
field = objDesc.fields_by_name.get(name)
if not field:
print(f&#34;{attributes.__class__.__name__} does not have an attribute called {name}, so you can not get it.&#34;)
print(f&#34;Choices in sorted order are:&#34;)
names = []
for f in objDesc.fields:
names.append(f&#39;{f.name}&#39;)
for temp_name in sorted(names):
print(f&#34; {temp_name}&#34;)
return
# okay - try to read the value
try:
try:
val = getattr(attributes, name)
except TypeError:
# The getter didn&#39;t like our arg type guess try again as a string
val = getattr(attributes, name)
# succeeded!
print(f&#34;{name}: {str(val)}&#34;)
except Exception as ex:
print(f&#34;Can&#39;t get {name} due to {ex}&#34;)</code></pre>
</details>
</dd>
<dt id="meshtastic.__main__.initParser"><code class="name flex">
<span>def <span class="ident">initParser</span></span>(<span>)</span>
</code></dt>
<dd>
<div class="desc"><p>Initialize the command line argument parsing.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def initParser():
&#34;&#34;&#34;Initialize the command line argument parsing.&#34;&#34;&#34;
our_globals = Globals.getInstance()
parser = our_globals.get_parser()
args = our_globals.get_args()
parser.add_argument(
&#34;--configure&#34;,
help=&#34;Specify a path to a yaml(.yml) file containing the desired settings for the connected device.&#34;,
action=&#39;append&#39;)
parser.add_argument(
&#34;--export-config&#34;,
help=&#34;Export the configuration in yaml(.yml) format.&#34;,
action=&#39;store_true&#39;)
parser.add_argument(
&#34;--port&#34;,
help=&#34;The port the Meshtastic device is connected to, i.e. /dev/ttyUSB0. If unspecified, we&#39;ll try to find it.&#34;,
default=None)
parser.add_argument(
&#34;--host&#34;,
help=&#34;The hostname/ipaddr of the device to connect to (over TCP)&#34;,
default=None)
parser.add_argument(
&#34;--seriallog&#34;,
help=&#34;Log device serial output to either &#39;stdout&#39;, &#39;none&#39; or a filename to append to.&#34;)
parser.add_argument(&#34;--info&#34;, help=&#34;Read and display the radio config information&#34;,
action=&#34;store_true&#34;)
parser.add_argument(&#34;--nodes&#34;, help=&#34;Print Node List in a pretty formatted table&#34;,
action=&#34;store_true&#34;)
parser.add_argument(&#34;--qr&#34;, help=&#34;Display the QR code that corresponds to the current channel&#34;,
action=&#34;store_true&#34;)
parser.add_argument(
&#34;--get&#34;, help=&#34;Get a preferences field. Use an invalid field such as &#39;0&#39; to get a list of all fields.&#34;, nargs=1, action=&#39;append&#39;)
parser.add_argument(
&#34;--set&#34;, help=&#34;Set a preferences field&#34;, nargs=2, action=&#39;append&#39;)
parser.add_argument(
&#34;--seturl&#34;, help=&#34;Set a channel URL&#34;, action=&#34;store&#34;)
parser.add_argument(
&#34;--ch-index&#34;, help=&#34;Set the specified channel index. Channels start at 0 (0 is the PRIMARY channel).&#34;, action=&#34;store&#34;)
parser.add_argument(
&#34;--ch-add&#34;, help=&#34;Add a secondary channel, you must specify a channel name&#34;, default=None)
parser.add_argument(
&#34;--ch-del&#34;, help=&#34;Delete the ch-index channel&#34;, action=&#39;store_true&#39;)
parser.add_argument(
&#34;--ch-enable&#34;, help=&#34;Enable the specified channel&#34;, action=&#34;store_true&#34;, dest=&#34;ch_enable&#34;, default=False)
# Note: We are doing a double negative here (Do we want to disable? If ch_disable==True, then disable.)
parser.add_argument(
&#34;--ch-disable&#34;, help=&#34;Disable the specified channel&#34;, action=&#34;store_true&#34;, dest=&#34;ch_disable&#34;, default=False)
parser.add_argument(
&#34;--ch-set&#34;, help=&#34;Set a channel parameter&#34;, nargs=2, action=&#39;append&#39;)
parser.add_argument(
&#34;--ch-longslow&#34;, help=&#34;Change to the long-range and slow channel&#34;, action=&#39;store_true&#39;)
parser.add_argument(
&#34;--ch-longfast&#34;, help=&#34;Change to the long-range and fast channel&#34;, action=&#39;store_true&#39;)
parser.add_argument(
&#34;--ch-mediumslow&#34;, help=&#34;Change to the medium-range and slow channel&#34;, action=&#39;store_true&#39;)
parser.add_argument(
&#34;--ch-mediumfast&#34;, help=&#34;Change to the medium-range and fast channel&#34;, action=&#39;store_true&#39;)
parser.add_argument(
&#34;--ch-shortslow&#34;, help=&#34;Change to the short-range and slow channel&#34;, action=&#39;store_true&#39;)
parser.add_argument(
&#34;--ch-shortfast&#34;, help=&#34;Change to the short-range and fast channel&#34;, action=&#39;store_true&#39;)
parser.add_argument(
&#34;--set-owner&#34;, help=&#34;Set device owner name&#34;, action=&#34;store&#34;)
parser.add_argument(
&#34;--set-team&#34;, help=&#34;Set team affiliation (an invalid team will list valid values)&#34;, action=&#34;store&#34;)
parser.add_argument(
&#34;--set-ham&#34;, help=&#34;Set licensed Ham ID and turn off encryption&#34;, action=&#34;store&#34;)
parser.add_argument(
&#34;--dest&#34;, help=&#34;The destination node id for any sent commands, if not set &#39;^all&#39; or &#39;^local&#39; is assumed as appropriate&#34;, default=None)
parser.add_argument(
&#34;--sendtext&#34;, help=&#34;Send a text message. Can specify a destination &#39;--dest&#39; and/or channel index &#39;--ch-index&#39;.&#34;)
parser.add_argument(
&#34;--sendping&#34;, help=&#34;Send a ping message (which requests a reply)&#34;, action=&#34;store_true&#34;)
parser.add_argument(
&#34;--reboot&#34;, help=&#34;Tell the destination node to reboot&#34;, action=&#34;store_true&#34;)
parser.add_argument(
&#34;--reply&#34;, help=&#34;Reply to received messages&#34;,
action=&#34;store_true&#34;)
parser.add_argument(
&#34;--gpio-wrb&#34;, nargs=2, help=&#34;Set a particular GPIO # to 1 or 0&#34;, action=&#39;append&#39;)
parser.add_argument(
&#34;--gpio-rd&#34;, help=&#34;Read from a GPIO mask (ex: &#39;0x10&#39;)&#34;)
parser.add_argument(
&#34;--gpio-watch&#34;, help=&#34;Start watching a GPIO mask for changes (ex: &#39;0x10&#39;)&#34;)
parser.add_argument(
&#34;--no-time&#34;, help=&#34;Suppress sending the current time to the mesh&#34;, action=&#34;store_true&#34;)
parser.add_argument(
&#34;--setalt&#34;, help=&#34;Set device altitude (allows use without GPS)&#34;)
parser.add_argument(
&#34;--setlat&#34;, help=&#34;Set device latitude (allows use without GPS)&#34;)
parser.add_argument(
&#34;--setlon&#34;, help=&#34;Set device longitude (allows use without GPS)&#34;)
parser.add_argument(
&#34;--pos-fields&#34;, help=&#34;Specify fields to send when sending a position. Use no argument for a list of valid values. &#34;\
&#34;Can pass multiple values as a space separated list like &#34;\
&#34;this: &#39;--pos-fields POS_ALTITUDE POS_ALT_MSL&#39;&#34;,
nargs=&#34;*&#34;, action=&#34;store&#34;)
parser.add_argument(&#34;--debug&#34;, help=&#34;Show API library debug log messages&#34;,
action=&#34;store_true&#34;)
parser.add_argument(&#34;--test&#34;, help=&#34;Run stress test against all connected Meshtastic devices&#34;,
action=&#34;store_true&#34;)
parser.add_argument(&#34;--ble&#34;, help=&#34;BLE mac address to connect to (BLE is not yet supported for this tool)&#34;,
default=None)
parser.add_argument(&#34;--noproto&#34;, help=&#34;Don&#39;t start the API, just function as a dumb serial terminal.&#34;,
action=&#34;store_true&#34;)
parser.add_argument(&#39;--setchan&#39;, dest=&#39;deprecated&#39;, nargs=2, action=&#39;append&#39;,
help=&#39;Deprecated, use &#34;--ch-set param value&#34; instead&#39;)
parser.add_argument(&#39;--set-router&#39;, dest=&#39;deprecated&#39;,
action=&#39;store_true&#39;, help=&#39;Deprecated, use &#34;--set is_router true&#34; instead&#39;)
parser.add_argument(&#39;--unset-router&#39;, dest=&#39;deprecated&#39;,
action=&#39;store_false&#39;, help=&#39;Deprecated, use &#34;--set is_router false&#34; instead&#39;)
if have_tunnel:
parser.add_argument(&#39;--tunnel&#39;,
action=&#39;store_true&#39;, help=&#34;Create a TUN tunnel device for forwarding IP packets over the mesh&#34;)
parser.add_argument(
&#34;--subnet&#34;, dest=&#39;tunnel_net&#39;, help=&#34;Sets the local-end subnet address for the TUN IP bridge&#34;, default=None)
parser.set_defaults(deprecated=None)
parser.add_argument(&#39;--version&#39;, action=&#39;version&#39;,
version=f&#34;{pkg_resources.require(&#39;meshtastic&#39;)[0].version}&#34;)
parser.add_argument(
&#34;--support&#34;, action=&#39;store_true&#39;, help=&#34;Show support info (useful when troubleshooting an issue)&#34;)
args = parser.parse_args()
our_globals.set_args(args)
our_globals.set_parser(parser)</code></pre>
</details>
</dd>
<dt id="meshtastic.__main__.main"><code class="name flex">
<span>def <span class="ident">main</span></span>(<span>)</span>
</code></dt>
<dd>
<div class="desc"><p>Perform command line meshtastic operations</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def main():
&#34;&#34;&#34;Perform command line meshtastic operations&#34;&#34;&#34;
our_globals = Globals.getInstance()
parser = argparse.ArgumentParser()
our_globals.set_parser(parser)
initParser()
common()</code></pre>
</details>
</dd>
<dt id="meshtastic.__main__.onConnected"><code class="name flex">
<span>def <span class="ident">onConnected</span></span>(<span>interface)</span>
</code></dt>
<dd>
<div class="desc"><p>Callback invoked when we connect to a radio</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def onConnected(interface):
&#34;&#34;&#34;Callback invoked when we connect to a radio&#34;&#34;&#34;
closeNow = False # Should we drop the connection after we finish?
try:
our_globals = Globals.getInstance()
args = our_globals.get_args()
print(&#34;Connected to radio&#34;)
def getNode():
&#34;&#34;&#34;This operation could be expensive, so we try to cache the results&#34;&#34;&#34;
targetNode = our_globals.get_target_node()
if not targetNode:
targetNode = interface.getNode(args.destOrLocal)
our_globals.set_target_node(targetNode)
return targetNode
if args.setlat or args.setlon or args.setalt:
closeNow = True
alt = 0
lat = 0.0
lon = 0.0
prefs = interface.localNode.radioConfig.preferences
if args.setalt:
alt = int(args.setalt)
prefs.fixed_position = True
print(f&#34;Fixing altitude at {alt} meters&#34;)
if args.setlat:
lat = float(args.setlat)
prefs.fixed_position = True
print(f&#34;Fixing latitude at {lat} degrees&#34;)
if args.setlon:
lon = float(args.setlon)
prefs.fixed_position = True
print(f&#34;Fixing longitude at {lon} degrees&#34;)
print(&#34;Setting device position&#34;)
# can include lat/long/alt etc: latitude = 37.5, longitude = -122.1
interface.sendPosition(lat, lon, alt)
interface.localNode.writeConfig()
elif not args.no_time:
# We normally provide a current time to the mesh when we connect
interface.sendPosition()
if args.set_owner:
closeNow = True
print(f&#34;Setting device owner to {args.set_owner}&#34;)
getNode().setOwner(args.set_owner)
if args.pos_fields:
# If --pos-fields invoked with args, set position fields
closeNow = True
prefs = getNode().radioConfig.preferences
allFields = 0
try:
for field in args.pos_fields:
v_field = radioconfig_pb2.PositionFlags.Value(field)
allFields |= v_field
except ValueError:
print(&#34;ERROR: supported position fields are:&#34;)
print(radioconfig_pb2.PositionFlags.keys())
print(&#34;If no fields are specified, will read and display current value.&#34;)
else:
print(f&#34;Setting position fields to {allFields}&#34;)
setPref(prefs, &#39;position_flags&#39;, (&#39;%d&#39; % allFields))
print(&#34;Writing modified preferences to device&#34;)
getNode().writeConfig()
elif args.pos_fields is not None:
# If --pos-fields invoked without args, read and display current value
closeNow = True
prefs = getNode().radioConfig.preferences
fieldNames = []
for bit in radioconfig_pb2.PositionFlags.values():
if prefs.position_flags &amp; bit:
fieldNames.append(radioconfig_pb2.PositionFlags.Name(bit))
print(&#39; &#39;.join(fieldNames))
if args.set_team:
closeNow = True
try:
v_team = meshtastic.mesh_pb2.Team.Value(args.set_team.upper())
except ValueError:
v_team = 0
print(f&#34;ERROR: Team \&#39;{args.set_team}\&#39; not found.&#34;)
print(&#34;Try a team name from the sorted list below, or use &#39;CLEAR&#39; for unaffiliated:&#34;)
print(sorted(meshtastic.mesh_pb2.Team.keys()))
else:
print(f&#34;Setting team to {meshtastic.mesh_pb2.Team.Name(v_team)}&#34;)
getNode().setOwner(team=v_team)
if args.set_ham:
closeNow = True
print(f&#34;Setting Ham ID to {args.set_ham} and turning off encryption&#34;)
getNode().setOwner(args.set_ham, is_licensed=True)
# Must turn off encryption on primary channel
getNode().turnOffEncryptionOnPrimaryChannel()
if args.reboot:
closeNow = True
getNode().reboot()
if args.sendtext:
closeNow = True
channelIndex = 0
if args.ch_index is not None:
channelIndex = int(args.ch_index)
ch = getNode().getChannelByChannelIndex(channelIndex)
if ch and ch.role != channel_pb2.Channel.Role.DISABLED:
print(f&#34;Sending text message {args.sendtext} to {args.destOrAll} on channelIndex:{channelIndex}&#34;)
interface.sendText(args.sendtext, args.destOrAll, wantAck=True, channelIndex=channelIndex)
else:
meshtastic.util.our_exit(f&#34;Warning: {channelIndex} is not a valid channel. Channel must not be DISABLED.&#34;)
if args.sendping:
payload = str.encode(&#34;test string&#34;)
print(f&#34;Sending ping message to {args.destOrAll}&#34;)
interface.sendData(payload, args.destOrAll, portNum=portnums_pb2.PortNum.REPLY_APP,
wantAck=True, wantResponse=True)
if args.gpio_wrb or args.gpio_rd or args.gpio_watch:
rhc = remote_hardware.RemoteHardwareClient(interface)
if args.gpio_wrb:
bitmask = 0
bitval = 0
for wrpair in (args.gpio_wrb or []):
bitmask |= 1 &lt;&lt; int(wrpair[0])
bitval |= int(wrpair[1]) &lt;&lt; int(wrpair[0])
print(f&#34;Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}&#34;)
rhc.writeGPIOs(args.dest, bitmask, bitval)
closeNow = True
if args.gpio_rd:
bitmask = int(args.gpio_rd, 16)
print(f&#34;Reading GPIO mask 0x{bitmask:x} from {args.dest}&#34;)
interface.mask = bitmask
rhc.readGPIOs(args.dest, bitmask, None)
if not interface.noProto:
# wait up to X seconds for a response
for _ in range(10):
time.sleep(1)
if interface.gotResponse:
break
logging.debug(f&#39;end of gpio_rd&#39;)
if args.gpio_watch:
bitmask = int(args.gpio_watch, 16)
print(f&#34;Watching GPIO mask 0x{bitmask:x} from {args.dest}. Press ctrl-c to exit&#34;)
while True:
rhc.watchGPIOs(args.dest, bitmask)
time.sleep(1)
# handle settings
if args.set:
closeNow = True
prefs = getNode().radioConfig.preferences
# Handle the int/float/bool arguments
for pref in args.set:
setPref(prefs, pref[0], pref[1])
print(&#34;Writing modified preferences to device&#34;)
getNode().writeConfig()
if args.configure:
with open(args.configure[0], encoding=&#39;utf8&#39;) as file:
configuration = yaml.safe_load(file)
closeNow = True
if &#39;owner&#39; in configuration:
print(f&#34;Setting device owner to {configuration[&#39;owner&#39;]}&#34;)
getNode().setOwner(configuration[&#39;owner&#39;])
if &#39;channel_url&#39; in configuration:
print(&#34;Setting channel url to&#34;, configuration[&#39;channel_url&#39;])
getNode().setURL(configuration[&#39;channel_url&#39;])
if &#39;location&#39; in configuration:
alt = 0
lat = 0.0
lon = 0.0
prefs = interface.localNode.radioConfig.preferences
if &#39;alt&#39; in configuration[&#39;location&#39;]:
alt = int(configuration[&#39;location&#39;][&#39;alt&#39;])
prefs.fixed_position = True
print(f&#34;Fixing altitude at {alt} meters&#34;)
if &#39;lat&#39; in configuration[&#39;location&#39;]:
lat = float(configuration[&#39;location&#39;][&#39;lat&#39;])
prefs.fixed_position = True
print(f&#34;Fixing latitude at {lat} degrees&#34;)
if &#39;lon&#39; in configuration[&#39;location&#39;]:
lon = float(configuration[&#39;location&#39;][&#39;lon&#39;])
prefs.fixed_position = True
print(f&#34;Fixing longitude at {lon} degrees&#34;)
print(&#34;Setting device position&#34;)
interface.sendPosition(lat, lon, alt)
interface.localNode.writeConfig()
if &#39;user_prefs&#39; in configuration:
prefs = getNode().radioConfig.preferences
for pref in configuration[&#39;user_prefs&#39;]:
setPref(prefs, pref, str(configuration[&#39;user_prefs&#39;][pref]))
print(&#34;Writing modified preferences to device&#34;)
getNode().writeConfig()
if args.export_config:
# export the configuration (the opposite of &#39;--configure&#39;)
closeNow = True
export_config(interface)
if args.seturl:
closeNow = True
getNode().setURL(args.seturl)
# handle changing channels
if args.ch_add:
closeNow = True
if len(args.ch_add) &gt; 10:
meshtastic.util.our_exit(&#34;Warning: Channel name must be shorter. Channel not added.&#34;)
n = getNode()
ch = n.getChannelByName(args.ch_add)
if ch:
meshtastic.util.our_exit(f&#34;Warning: This node already has a &#39;{args.ch_add}&#39; channel. No changes were made.&#34;)
else:
# get the first channel that is disabled (i.e., available)
ch = n.getDisabledChannel()
if not ch:
meshtastic.util.our_exit(&#34;Warning: No free channels were found&#34;)
chs = channel_pb2.ChannelSettings()
chs.psk = meshtastic.util.genPSK256()
chs.name = args.ch_add
ch.settings.CopyFrom(chs)
ch.role = channel_pb2.Channel.Role.SECONDARY
print(f&#34;Writing modified channels to device&#34;)
n.writeChannel(ch.index)
if args.ch_del:
closeNow = True
channelIndex = our_globals.get_channel_index()
if channelIndex is None:
meshtastic.util.our_exit(&#34;Warning: Need to specify &#39;--ch-index&#39; for &#39;--ch-del&#39;.&#34;, 1)
else:
if channelIndex == 0:
meshtastic.util.our_exit(&#34;Warning: Cannot delete primary channel.&#34;, 1)
else:
print(f&#34;Deleting channel {channelIndex}&#34;)
ch = getNode().deleteChannel(channelIndex)
ch_changes = [args.ch_longslow, args.ch_longfast,
args.ch_mediumslow, args.ch_mediumfast,
args.ch_shortslow, args.ch_shortfast]
any_primary_channel_changes = any(x for x in ch_changes)
if args.ch_set or any_primary_channel_changes or args.ch_enable or args.ch_disable:
closeNow = True
channelIndex = our_globals.get_channel_index()
if channelIndex is None:
if any_primary_channel_changes:
# we assume that they want the primary channel if they&#39;re setting range values
channelIndex = 0
else:
meshtastic.util.our_exit(&#34;Warning: Need to specify &#39;--ch-index&#39;.&#34;, 1)
ch = getNode().channels[channelIndex]
if any_primary_channel_changes or args.ch_enable or args.ch_disable:
if channelIndex == 0 and not any_primary_channel_changes:
meshtastic.util.our_exit(&#34;Warning: Cannot enable/disable PRIMARY channel.&#34;)
if channelIndex != 0:
if any_primary_channel_changes:
meshtastic.util.our_exit(&#34;Warning: Standard channel settings can only be applied to the PRIMARY channel&#34;)
enable = True # default to enable
if args.ch_enable:
enable = True
if args.ch_disable:
enable = False
def setSimpleChannel(modem_config):
&#34;&#34;&#34;Set one of the simple modem_config only based channels&#34;&#34;&#34;
# Completely new channel settings
chs = channel_pb2.ChannelSettings()
chs.modem_config = modem_config
chs.psk = bytes([1]) # Use default channel psk 1
ch.settings.CopyFrom(chs)
# handle the simple channel set commands
if args.ch_longslow:
setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw125Cr48Sf4096)
if args.ch_longfast:
setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw31_25Cr48Sf512)
if args.ch_mediumslow:
setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw250Cr46Sf2048)
if args.ch_mediumfast:
setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw250Cr47Sf1024)
if args.ch_shortslow:
setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw125Cr45Sf128)
if args.ch_shortfast:
setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw500Cr45Sf128)
# Handle the channel settings
for pref in (args.ch_set or []):
if pref[0] == &#34;psk&#34;:
ch.settings.psk = meshtastic.util.fromPSK(pref[1])
else:
setPref(ch.settings, pref[0], pref[1])
enable = True # If we set any pref, assume the user wants to enable the channel
if enable:
ch.role = channel_pb2.Channel.Role.PRIMARY if (
channelIndex == 0) else channel_pb2.Channel.Role.SECONDARY
else:
ch.role = channel_pb2.Channel.Role.DISABLED
print(f&#34;Writing modified channels to device&#34;)
getNode().writeChannel(channelIndex)
if args.info:
print(&#34;&#34;)
if not args.dest: # If we aren&#39;t trying to talk to our local node, don&#39;t show it
interface.showInfo()
print(&#34;&#34;)
getNode().showInfo()
closeNow = True # FIXME, for now we leave the link up while talking to remote nodes
print(&#34;&#34;)
if args.get:
closeNow = True
prefs = getNode().radioConfig.preferences
# Handle the int/float/bool arguments
for pref in args.get:
getPref(prefs, pref[0])
print(&#34;Completed getting preferences&#34;)
if args.nodes:
closeNow = True
interface.showNodes()
if args.qr:
closeNow = True
url = interface.localNode.getURL(includeAll=False)
print(f&#34;Primary channel URL {url}&#34;)
qr = pyqrcode.create(url)
print(qr.terminal())
if have_tunnel and args.tunnel:
# pylint: disable=C0415
from . import tunnel
# Even if others said we could close, stay open if the user asked for a tunnel
closeNow = False
tunnel.Tunnel(interface, subnet=args.tunnel_net)
# if the user didn&#39;t ask for serial debugging output, we might want to exit after we&#39;ve done our operation
if (not args.seriallog) and closeNow:
interface.close() # after running command then exit
except Exception as ex:
print(f&#34;Aborting due to: {ex}&#34;)
interface.close() # close the connection now, so that our app exits</code></pre>
</details>
</dd>
<dt id="meshtastic.__main__.onConnection"><code class="name flex">
<span>def <span class="ident">onConnection</span></span>(<span>interface, topic=pubsub.core.callables.AUTO_TOPIC)</span>
</code></dt>
<dd>
<div class="desc"><p>Callback invoked when we connect/disconnect from a radio</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def onConnection(interface, topic=pub.AUTO_TOPIC):
&#34;&#34;&#34;Callback invoked when we connect/disconnect from a radio&#34;&#34;&#34;
print(f&#34;Connection changed: {topic.getName()}&#34;)</code></pre>
</details>
</dd>
<dt id="meshtastic.__main__.onNode"><code class="name flex">
<span>def <span class="ident">onNode</span></span>(<span>node)</span>
</code></dt>
<dd>
<div class="desc"><p>Callback invoked when the node DB changes</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def onNode(node):
&#34;&#34;&#34;Callback invoked when the node DB changes&#34;&#34;&#34;
print(f&#34;Node changed: {node}&#34;)</code></pre>
</details>
</dd>
<dt id="meshtastic.__main__.onReceive"><code class="name flex">
<span>def <span class="ident">onReceive</span></span>(<span>packet, interface)</span>
</code></dt>
<dd>
<div class="desc"><p>Callback invoked when a packet arrives</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def onReceive(packet, interface):
&#34;&#34;&#34;Callback invoked when a packet arrives&#34;&#34;&#34;
our_globals = Globals.getInstance()
args = our_globals.get_args()
try:
d = packet.get(&#39;decoded&#39;)
logging.debug(f&#39;in onReceive() d:{d}&#39;)
# Exit once we receive a reply
if args and args.sendtext and packet[&#34;to&#34;] == interface.myInfo.my_node_num and d[&#34;portnum&#34;] == portnums_pb2.PortNum.TEXT_MESSAGE_APP:
interface.close() # after running command then exit
# Reply to every received message with some stats
if args and args.reply:
msg = d.get(&#39;text&#39;)
if msg:
rxSnr = packet[&#39;rxSnr&#39;]
hopLimit = packet[&#39;hopLimit&#39;]
print(f&#34;message: {msg}&#34;)
reply = &#34;got msg \&#39;{}\&#39; with rxSnr: {} and hopLimit: {}&#34;.format(msg, rxSnr, hopLimit)
print(&#34;Sending reply: &#34;, reply)
interface.sendText(reply)
except Exception as ex:
print(ex)</code></pre>
</details>
</dd>
<dt id="meshtastic.__main__.setPref"><code class="name flex">
<span>def <span class="ident">setPref</span></span>(<span>attributes, name, valStr)</span>
</code></dt>
<dd>
<div class="desc"><p>Set a channel or preferences value</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def setPref(attributes, name, valStr):
&#34;&#34;&#34;Set a channel or preferences value&#34;&#34;&#34;
objDesc = attributes.DESCRIPTOR
field = objDesc.fields_by_name.get(name)
if not field:
print(f&#34;{attributes.__class__.__name__} does not have an attribute called {name}, so you can not set it.&#34;)
print(f&#34;Choices in sorted order are:&#34;)
names = []
for f in objDesc.fields:
names.append(f&#39;{f.name}&#39;)
for temp_name in sorted(names):
print(f&#34; {temp_name}&#34;)
return
val = meshtastic.util.fromStr(valStr)
enumType = field.enum_type
# pylint: disable=C0123
if enumType and type(val) == str:
# We&#39;ve failed so far to convert this string into an enum, try to find it by reflection
e = enumType.values_by_name.get(val)
if e:
val = e.number
else:
print(f&#34;{name} does not have an enum called {val}, so you can not set it.&#34;)
print(f&#34;Choices in sorted order are:&#34;)
names = []
for f in enumType.values:
names.append(f&#39;{f.name}&#39;)
for temp_name in sorted(names):
print(f&#34; {temp_name}&#34;)
return
# okay - try to read the value
try:
try:
setattr(attributes, name, val)
except TypeError:
# The setter didn&#39;t like our arg type guess try again as a string
setattr(attributes, name, valStr)
# succeeded!
print(f&#34;Set {name} to {valStr}&#34;)
except Exception as ex:
print(f&#34;Can&#39;t set {name} due to {ex}&#34;)</code></pre>
</details>
</dd>
<dt id="meshtastic.__main__.subscribe"><code class="name flex">
<span>def <span class="ident">subscribe</span></span>(<span>)</span>
</code></dt>
<dd>
<div class="desc"><p>Subscribe to the topics the user probably wants to see, prints output to stdout</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def subscribe():
&#34;&#34;&#34;Subscribe to the topics the user probably wants to see, prints output to stdout&#34;&#34;&#34;
pub.subscribe(onReceive, &#34;meshtastic.receive&#34;)
# pub.subscribe(onConnection, &#34;meshtastic.connection&#34;)
# We now call onConnected from main
# pub.subscribe(onConnected, &#34;meshtastic.connection.established&#34;)
# pub.subscribe(onNode, &#34;meshtastic.node&#34;)</code></pre>
</details>
</dd>
<dt id="meshtastic.__main__.tunnelMain"><code class="name flex">
<span>def <span class="ident">tunnelMain</span></span>(<span>)</span>
</code></dt>
<dd>
<div class="desc"><p>Run a meshtastic IP tunnel</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def tunnelMain():
&#34;&#34;&#34;Run a meshtastic IP tunnel&#34;&#34;&#34;
our_globals = Globals.getInstance()
parser = argparse.ArgumentParser()
our_globals.set_parser(parser)
initParser()
args = our_globals.get_args()
args.tunnel = True
our_globals.set_args(args)
common()</code></pre>
</details>
</dd>
</dl>
</section>
<section>
</section>
</article>
<nav id="sidebar">
<h1>Index</h1>
<div class="toc">
<ul></ul>
</div>
<ul id="index">
<li><h3>Super-module</h3>
<ul>
<li><code><a title="meshtastic" href="index.html">meshtastic</a></code></li>
</ul>
</li>
<li><h3><a href="#header-variables">Global variables</a></h3>
<ul class="">
<li><code><a title="meshtastic.__main__.have_tunnel" href="#meshtastic.__main__.have_tunnel">have_tunnel</a></code></li>
</ul>
</li>
<li><h3><a href="#header-functions">Functions</a></h3>
<ul class="two-column">
<li><code><a title="meshtastic.__main__.common" href="#meshtastic.__main__.common">common</a></code></li>
<li><code><a title="meshtastic.__main__.export_config" href="#meshtastic.__main__.export_config">export_config</a></code></li>
<li><code><a title="meshtastic.__main__.getPref" href="#meshtastic.__main__.getPref">getPref</a></code></li>
<li><code><a title="meshtastic.__main__.initParser" href="#meshtastic.__main__.initParser">initParser</a></code></li>
<li><code><a title="meshtastic.__main__.main" href="#meshtastic.__main__.main">main</a></code></li>
<li><code><a title="meshtastic.__main__.onConnected" href="#meshtastic.__main__.onConnected">onConnected</a></code></li>
<li><code><a title="meshtastic.__main__.onConnection" href="#meshtastic.__main__.onConnection">onConnection</a></code></li>
<li><code><a title="meshtastic.__main__.onNode" href="#meshtastic.__main__.onNode">onNode</a></code></li>
<li><code><a title="meshtastic.__main__.onReceive" href="#meshtastic.__main__.onReceive">onReceive</a></code></li>
<li><code><a title="meshtastic.__main__.setPref" href="#meshtastic.__main__.setPref">setPref</a></code></li>
<li><code><a title="meshtastic.__main__.subscribe" href="#meshtastic.__main__.subscribe">subscribe</a></code></li>
<li><code><a title="meshtastic.__main__.tunnelMain" href="#meshtastic.__main__.tunnelMain">tunnelMain</a></code></li>
</ul>
</li>
</ul>
</nav>
</main>
<footer id="footer">
<p>Generated by <a href="https://pdoc3.github.io/pdoc" title="pdoc: Python API documentation generator"><cite>pdoc</cite> 0.10.1.dev1+g4aa70de.d20211229</a>.</p>
</footer>
</body>
</html>