Files
python/meshtastic/__main__.py
Steve Holden 067cddd354 Refactor to avoid the use of a special global object.
The global object formerly used is now replaced by direct use
of the namespace opf the globals module. This eliminates the
redundant getters and setters and simplifies the code for
future maintainers.

Note that the globals module name conflicts (harmlessly at
present) with a Python built-in function. A future commit
should rename it `config` to remove this clash and better
represent its intended purpose.
2024-04-10 10:03:12 +01:00

1460 lines
52 KiB
Python

#!python3
""" Main Meshtastic
"""
import argparse
import logging
import os
import platform
import sys
import time
import pyqrcode # type: ignore[import-untyped]
import yaml
from google.protobuf.json_format import MessageToDict
from pubsub import pub # type: ignore[import-untyped]
import meshtastic.test
import meshtastic.util
from meshtastic import globals
from meshtastic import channel_pb2, config_pb2, portnums_pb2, remote_hardware, BROADCAST_ADDR
from meshtastic.version import get_active_version
from meshtastic.ble_interface import BLEInterface
def onReceive(packet, interface):
"""Callback invoked when a packet arrives"""
args = globals.args
try:
d = packet.get("decoded")
logging.debug(f"in onReceive() d:{d}")
# Exit once we receive a reply
if (
args
and args.sendtext
and packet["to"] == interface.myInfo.my_node_num
and d["portnum"] == portnums_pb2.PortNum.TEXT_MESSAGE_APP
):
interface.close() # after running command then exit
# Reply to every received message with some stats
if d is not None and args and args.reply:
msg = d.get("text")
if msg:
rxSnr = packet["rxSnr"]
hopLimit = packet["hopLimit"]
print(f"message: {msg}")
reply = f"got msg '{msg}' with rxSnr: {rxSnr} and hopLimit: {hopLimit}"
print("Sending reply: ", reply)
interface.sendText(reply)
except Exception as ex:
print(f"Warning: There is no field {ex} in the packet.")
def onConnection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=W0613
"""Callback invoked when we connect/disconnect from a radio"""
print(f"Connection changed: {topic.getName()}")
def getPref(node, comp_name):
"""Get a channel or preferences value"""
name = splitCompoundName(comp_name)
wholeField = name[0] == name[1] # We want the whole field
camel_name = meshtastic.util.snake_to_camel(name[1])
# Note: protobufs has the keys in snake_case, so snake internally
snake_name = meshtastic.util.camel_to_snake(name[1])
logging.debug(f"snake_name:{snake_name} camel_name:{camel_name}")
logging.debug(f"use camel:{globals.camel_case}")
# First validate the input
localConfig = node.localConfig
moduleConfig = node.moduleConfig
found = False
for config in [localConfig, moduleConfig]:
objDesc = config.DESCRIPTOR
config_type = objDesc.fields_by_name.get(name[0])
pref = False
if config_type:
pref = config_type.message_type.fields_by_name.get(snake_name)
if pref or wholeField:
found = True
break
if not found:
if globals.camel_case:
print(
f"{localConfig.__class__.__name__} and {moduleConfig.__class__.__name__} do not have an attribute {snake_name}."
)
else:
print(
f"{localConfig.__class__.__name__} and {moduleConfig.__class__.__name__} do not have attribute {snake_name}."
)
print("Choices are...")
printConfig(localConfig)
printConfig(moduleConfig)
return False
# Check if we need to request the config
if len(config.ListFields()) != 0:
# read the value
config_values = getattr(config, config_type.name)
if not wholeField:
pref_value = getattr(config_values, pref.name)
if globals.camel_case:
print(f"{str(config_type.name)}.{camel_name}: {str(pref_value)}")
logging.debug(
f"{str(config_type.name)}.{camel_name}: {str(pref_value)}"
)
else:
print(f"{str(config_type.name)}.{snake_name}: {str(pref_value)}")
logging.debug(
f"{str(config_type.name)}.{snake_name}: {str(pref_value)}"
)
else:
print(f"{str(config_type.name)}:\n{str(config_values)}")
logging.debug(f"{str(config_type.name)}: {str(config_values)}")
else:
# Always show whole field for remote node
node.requestConfig(config_type)
return True
def splitCompoundName(comp_name):
"""Split compound (dot separated) preference name into parts"""
name = comp_name.split(".", 1)
if len(name) != 2:
name[0] = comp_name
name.append(comp_name)
return name
def setPref(config, comp_name, valStr) -> bool:
"""Set a channel or preferences value"""
name = splitCompoundName(comp_name)
snake_name = meshtastic.util.camel_to_snake(name[1])
camel_name = meshtastic.util.snake_to_camel(name[1])
logging.debug(f"snake_name:{snake_name}")
logging.debug(f"camel_name:{camel_name}")
objDesc = config.DESCRIPTOR
config_type = objDesc.fields_by_name.get(name[0])
pref = None
if config_type and config_type.message_type is not None:
pref = config_type.message_type.fields_by_name.get(snake_name)
# Others like ChannelSettings are standalone
elif config_type:
pref = config_type
if (not pref) or (not config_type):
return False
val = meshtastic.util.fromStr(valStr)
logging.debug(f"valStr:{valStr} val:{val}")
if snake_name == "wifi_psk" and len(valStr) < 8:
print(f"Warning: network.wifi_psk must be 8 or more characters.")
return False
enumType = pref.enum_type
# pylint: disable=C0123
if enumType and type(val) == str:
# We've failed so far to convert this string into an enum, try to find it by reflection
e = enumType.values_by_name.get(val)
if e:
val = e.number
else:
if globals.camel_case:
print(
f"{name[0]}.{camel_name} does not have an enum called {val}, so you can not set it."
)
else:
print(
f"{name[0]}.{snake_name} does not have an enum called {val}, so you can not set it."
)
print(f"Choices in sorted order are:")
names = []
for f in enumType.values:
# Note: We must use the value of the enum (regardless if camel or snake case)
names.append(f"{f.name}")
for temp_name in sorted(names):
print(f" {temp_name}")
return False
# note: 'ignore_incoming' is a repeating field
if snake_name != "ignore_incoming":
try:
if config_type.message_type is not None:
config_values = getattr(config, config_type.name)
setattr(config_values, pref.name, val)
else:
setattr(config, snake_name, val)
except TypeError:
# The setter didn't like our arg type guess try again as a string
config_values = getattr(config, config_type.name)
setattr(config_values, pref.name, valStr)
else:
if val == 0:
# clear values
print("Clearing ignore_incoming list")
del config_type.message_type.ignore_incoming[:]
else:
print(f"Adding '{val}' to the ignore_incoming list")
config_type.message_type.ignore_incoming.extend([val])
prefix = f"{name[0]}." if config_type.message_type is not None else ""
if globals.camel_case:
print(f"Set {prefix}{camel_name} to {valStr}")
else:
print(f"Set {prefix}{snake_name} to {valStr}")
return True
def onConnected(interface):
"""Callback invoked when we connect to a radio"""
closeNow = False # Should we drop the connection after we finish?
waitForAckNak = (
False # Should we wait for an acknowledgment if we send to a remote node?
)
try:
args = globals.args
# do not print this line if we are exporting the config
if not args.export_config:
print("Connected to radio")
if args.setlat or args.setlon or args.setalt:
closeNow = True
alt = 0
lat = 0.0
lon = 0.0
# TODO: use getNode(args.dest) to be able to set it for a remote node
localConfig = interface.localNode.localConfig
if args.setalt:
alt = int(args.setalt)
localConfig.position.fixed_position = True
print(f"Fixing altitude at {alt} meters")
if args.setlat:
lat = float(args.setlat)
localConfig.position.fixed_position = True
print(f"Fixing latitude at {lat} degrees")
if args.setlon:
lon = float(args.setlon)
localConfig.position.fixed_position = True
print(f"Fixing longitude at {lon} degrees")
print("Setting device position")
# can include lat/long/alt etc: latitude = 37.5, longitude = -122.1
interface.sendPosition(lat, lon, alt)
interface.localNode.writeConfig("position")
elif not args.no_time:
# We normally provide a current time to the mesh when we connect
interface.sendPosition()
if args.set_owner:
closeNow = True
waitForAckNak = True
print(f"Setting device owner to {args.set_owner}")
interface.getNode(args.dest, False).setOwner(args.set_owner)
if args.set_owner_short:
closeNow = True
waitForAckNak = True
print(f"Setting device owner short to {args.set_owner_short}")
interface.getNode(args.dest, False).setOwner(
long_name=None, short_name=args.set_owner_short
)
# TODO: add to export-config and configure
if args.set_canned_message:
closeNow = True
waitForAckNak = True
print(f"Setting canned plugin message to {args.set_canned_message}")
interface.getNode(args.dest, False).set_canned_message(
args.set_canned_message
)
# TODO: add to export-config and configure
if args.set_ringtone:
closeNow = True
waitForAckNak = True
print(f"Setting ringtone to {args.set_ringtone}")
interface.getNode(args.dest, False).set_ringtone(args.set_ringtone)
if args.pos_fields:
# If --pos-fields invoked with args, set position fields
closeNow = True
positionConfig = interface.getNode(args.dest).localConfig.position
allFields = 0
try:
for field in args.pos_fields:
v_field = positionConfig.PositionFlags.Value(field)
allFields |= v_field
except ValueError:
print("ERROR: supported position fields are:")
print(positionConfig.PositionFlags.keys())
print(
"If no fields are specified, will read and display current value."
)
else:
print(f"Setting position fields to {allFields}")
setPref(positionConfig, "position_flags", f"{allFields:d}")
print("Writing modified preferences to device")
interface.getNode(args.dest).writeConfig("position")
elif args.pos_fields is not None:
# If --pos-fields invoked without args, read and display current value
closeNow = True
positionConfig = interface.getNode(args.dest).localConfig.position
fieldNames = []
for bit in positionConfig.PositionFlags.values():
if positionConfig.position_flags & bit:
fieldNames.append(positionConfig.PositionFlags.Name(bit))
print(" ".join(fieldNames))
if args.set_ham:
closeNow = True
print(f"Setting Ham ID to {args.set_ham} and turning off encryption")
interface.getNode(args.dest).setOwner(args.set_ham, is_licensed=True)
# Must turn off encryption on primary channel
interface.getNode(args.dest).turnOffEncryptionOnPrimaryChannel()
if args.reboot:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False).reboot()
if args.reboot_ota:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False).rebootOTA()
if args.shutdown:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False).shutdown()
if args.device_metadata:
closeNow = True
interface.getNode(args.dest, False).getMetadata()
if args.begin_edit:
closeNow = True
interface.getNode(args.dest, False).beginSettingsTransaction()
if args.commit_edit:
closeNow = True
interface.getNode(args.dest, False).commitSettingsTransaction()
if args.factory_reset:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False).factoryReset()
if args.remove_node:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False).removeNode(args.remove_node)
if args.reset_nodedb:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False).resetNodeDb()
if args.sendtext:
closeNow = True
channelIndex = 0
if args.ch_index is not None:
channelIndex = int(args.ch_index)
ch = interface.localNode.getChannelByChannelIndex(channelIndex)
logging.debug(f"ch:{ch}")
if ch and ch.role != channel_pb2.Channel.Role.DISABLED:
print(
f"Sending text message {args.sendtext} to {args.dest} on channelIndex:{channelIndex}"
)
interface.sendText(
args.sendtext,
args.dest,
wantAck=True,
channelIndex=channelIndex,
onResponse=interface.getNode(args.dest, False).onAckNak,
)
else:
meshtastic.util.our_exit(
f"Warning: {channelIndex} is not a valid channel. Channel must not be DISABLED."
)
if args.traceroute:
loraConfig = getattr(interface.localNode.localConfig, "lora")
hopLimit = getattr(loraConfig, "hop_limit")
dest = str(args.traceroute)
print(f"Sending traceroute request to {dest} (this could take a while)")
interface.sendTraceRoute(dest, hopLimit)
if args.request_telemetry:
if args.dest == BROADCAST_ADDR:
meshtastic.util.our_exit("Warning: Must use a destination node ID.")
else:
print(f"Sending telemetry request to {args.dest} (this could take a while)")
interface.sendTelemetry(destinationId=args.dest, wantResponse=True)
if args.gpio_wrb or args.gpio_rd or args.gpio_watch:
if args.dest == BROADCAST_ADDR:
meshtastic.util.our_exit("Warning: Must use a destination node ID.")
else:
rhc = remote_hardware.RemoteHardwareClient(interface)
if args.gpio_wrb:
bitmask = 0
bitval = 0
for wrpair in args.gpio_wrb or []:
bitmask |= 1 << int(wrpair[0])
bitval |= int(wrpair[1]) << int(wrpair[0])
print(
f"Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}"
)
rhc.writeGPIOs(args.dest, bitmask, bitval)
closeNow = True
if args.gpio_rd:
bitmask = int(args.gpio_rd, 16)
print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}")
interface.mask = bitmask
rhc.readGPIOs(args.dest, bitmask, None)
# wait up to X seconds for a response
for _ in range(10):
time.sleep(1)
if interface.gotResponse:
break
logging.debug(f"end of gpio_rd")
if args.gpio_watch:
bitmask = int(args.gpio_watch, 16)
print(
f"Watching GPIO mask 0x{bitmask:x} from {args.dest}. Press ctrl-c to exit"
)
while True:
rhc.watchGPIOs(args.dest, bitmask)
time.sleep(1)
# handle settings
if args.set:
closeNow = True
waitForAckNak = True
node = interface.getNode(args.dest, False)
# Handle the int/float/bool arguments
pref = None
for pref in args.set:
found = False
field = splitCompoundName(pref[0].lower())[0]
for config in [node.localConfig, node.moduleConfig]:
config_type = config.DESCRIPTOR.fields_by_name.get(field)
if config_type:
if len(config.ListFields()) == 0:
node.requestConfig(
config.DESCRIPTOR.fields_by_name.get(field)
)
found = setPref(config, pref[0], pref[1])
if found:
break
if found:
print("Writing modified preferences to device")
node.writeConfig(field)
else:
if globals.camel_case:
print(
f"{node.localConfig.__class__.__name__} and {node.moduleConfig.__class__.__name__} do not have an attribute {pref[0]}."
)
else:
print(
f"{node.localConfig.__class__.__name__} and {node.moduleConfig.__class__.__name__} do not have attribute {pref[0]}."
)
print("Choices are...")
printConfig(node.localConfig)
printConfig(node.moduleConfig)
if args.configure:
with open(args.configure[0], encoding="utf8") as file:
configuration = yaml.safe_load(file)
closeNow = True
interface.getNode(args.dest, False).beginSettingsTransaction()
if "owner" in configuration:
print(f"Setting device owner to {configuration['owner']}")
waitForAckNak = True
interface.getNode(args.dest, False).setOwner(configuration["owner"])
if "owner_short" in configuration:
print(
f"Setting device owner short to {configuration['owner_short']}"
)
waitForAckNak = True
interface.getNode(args.dest, False).setOwner(
long_name=None, short_name=configuration["owner_short"]
)
if "ownerShort" in configuration:
print(
f"Setting device owner short to {configuration['ownerShort']}"
)
waitForAckNak = True
interface.getNode(args.dest, False).setOwner(
long_name=None, short_name=configuration["ownerShort"]
)
if "channel_url" in configuration:
print("Setting channel url to", configuration["channel_url"])
interface.getNode(args.dest).setURL(configuration["channel_url"])
if "channelUrl" in configuration:
print("Setting channel url to", configuration["channelUrl"])
interface.getNode(args.dest).setURL(configuration["channelUrl"])
if "location" in configuration:
alt = 0
lat = 0.0
lon = 0.0
localConfig = interface.localNode.localConfig
if "alt" in configuration["location"]:
alt = int(configuration["location"]["alt"])
localConfig.position.fixed_position = True
print(f"Fixing altitude at {alt} meters")
if "lat" in configuration["location"]:
lat = float(configuration["location"]["lat"])
localConfig.position.fixed_position = True
print(f"Fixing latitude at {lat} degrees")
if "lon" in configuration["location"]:
lon = float(configuration["location"]["lon"])
localConfig.position.fixed_position = True
print(f"Fixing longitude at {lon} degrees")
print("Setting device position")
interface.sendPosition(lat, lon, alt)
interface.localNode.writeConfig("position")
if "config" in configuration:
localConfig = interface.getNode(args.dest).localConfig
for section in configuration["config"]:
for pref in configuration["config"][section]:
setPref(
localConfig,
f"{meshtastic.util.camel_to_snake(section)}.{pref}",
str(configuration["config"][section][pref]),
)
interface.getNode(args.dest).writeConfig(
meshtastic.util.camel_to_snake(section)
)
if "module_config" in configuration:
moduleConfig = interface.getNode(args.dest).moduleConfig
for section in configuration["module_config"]:
for pref in configuration["module_config"][section]:
setPref(
moduleConfig,
f"{meshtastic.util.camel_to_snake(section)}.{pref}",
str(configuration["module_config"][section][pref]),
)
interface.getNode(args.dest).writeConfig(
meshtastic.util.camel_to_snake(section)
)
interface.getNode(args.dest, False).commitSettingsTransaction()
print("Writing modified configuration to device")
if args.export_config:
# export the configuration (the opposite of '--configure')
closeNow = True
export_config(interface)
if args.seturl:
closeNow = True
interface.getNode(args.dest).setURL(args.seturl)
# handle changing channels
if args.ch_add:
channelIndex = globals.channel_index
if channelIndex is not None:
# Since we set the channel index after adding a channel, don't allow --ch-index
meshtastic.util.our_exit(
"Warning: '--ch-add' and '--ch-index' are incompatible. Channel not added."
)
closeNow = True
if len(args.ch_add) > 10:
meshtastic.util.our_exit(
"Warning: Channel name must be shorter. Channel not added."
)
n = interface.getNode(args.dest)
ch = n.getChannelByName(args.ch_add)
if ch:
meshtastic.util.our_exit(
f"Warning: This node already has a '{args.ch_add}' channel. No changes were made."
)
else:
# get the first channel that is disabled (i.e., available)
ch = n.getDisabledChannel()
if not ch:
meshtastic.util.our_exit("Warning: No free channels were found")
chs = channel_pb2.ChannelSettings()
chs.psk = meshtastic.util.genPSK256()
chs.name = args.ch_add
ch.settings.CopyFrom(chs)
ch.role = channel_pb2.Channel.Role.SECONDARY
print(f"Writing modified channels to device")
n.writeChannel(ch.index)
if channelIndex is None:
print(f"Setting newly-added channel's {ch.index} as '--ch-index' for further modifications")
globals.channel_index = ch.index
if args.ch_del:
closeNow = True
channelIndex = globals.channel_index
if channelIndex is None:
meshtastic.util.our_exit(
"Warning: Need to specify '--ch-index' for '--ch-del'.", 1
)
else:
if channelIndex == 0:
meshtastic.util.our_exit(
"Warning: Cannot delete primary channel.", 1
)
else:
print(f"Deleting channel {channelIndex}")
ch = interface.getNode(args.dest).deleteChannel(channelIndex)
def setSimpleConfig(modem_preset):
"""Set one of the simple modem_config"""
channelIndex = globals.channel_index
if channelIndex is not None and channelIndex > 0:
meshtastic.util.our_exit(
"Warning: Cannot set modem preset for non-primary channel", 1
)
# Overwrite modem_preset
prefs = interface.getNode(args.dest).localConfig
prefs.lora.modem_preset = modem_preset
interface.getNode(args.dest).writeConfig("lora")
# handle the simple radio set commands
if args.ch_vlongslow:
setSimpleConfig(config_pb2.Config.LoRaConfig.ModemPreset.VERY_LONG_SLOW)
if args.ch_longslow:
setSimpleConfig(config_pb2.Config.LoRaConfig.ModemPreset.LONG_SLOW)
if args.ch_longfast:
setSimpleConfig(config_pb2.Config.LoRaConfig.ModemPreset.LONG_FAST)
if args.ch_medslow:
setSimpleConfig(config_pb2.Config.LoRaConfig.ModemPreset.MEDIUM_SLOW)
if args.ch_medfast:
setSimpleConfig(config_pb2.Config.LoRaConfig.ModemPreset.MEDIUM_FAST)
if args.ch_shortslow:
setSimpleConfig(config_pb2.Config.LoRaConfig.ModemPreset.SHORT_SLOW)
if args.ch_shortfast:
setSimpleConfig(config_pb2.Config.LoRaConfig.ModemPreset.SHORT_FAST)
if args.ch_set or args.ch_enable or args.ch_disable:
closeNow = True
channelIndex = globals.channel_index
if channelIndex is None:
meshtastic.util.our_exit("Warning: Need to specify '--ch-index'.", 1)
ch = interface.getNode(args.dest).channels[channelIndex]
if args.ch_enable or args.ch_disable:
if channelIndex == 0:
meshtastic.util.our_exit(
"Warning: Cannot enable/disable PRIMARY channel."
)
enable = True # default to enable
if args.ch_enable:
enable = True
if args.ch_disable:
enable = False
# Handle the channel settings
for pref in args.ch_set or []:
if pref[0] == "psk":
found = True
ch.settings.psk = meshtastic.util.fromPSK(pref[1])
else:
found = setPref(ch.settings, pref[0], pref[1])
if not found:
category_settings = ['module_settings']
print(
f"{ch.settings.__class__.__name__} does not have an attribute {pref[0]}."
)
print("Choices are...")
for field in ch.settings.DESCRIPTOR.fields:
if field.name not in category_settings:
print(f"{field.name}")
else:
print(f"{field.name}:")
config = ch.settings.DESCRIPTOR.fields_by_name.get(field.name)
names = []
for sub_field in config.message_type.fields:
tmp_name = f"{field.name}.{sub_field.name}"
names.append(tmp_name)
for temp_name in sorted(names):
print(f" {temp_name}")
enable = True # If we set any pref, assume the user wants to enable the channel
if enable:
ch.role = (
channel_pb2.Channel.Role.PRIMARY
if (channelIndex == 0)
else channel_pb2.Channel.Role.SECONDARY
)
else:
ch.role = channel_pb2.Channel.Role.DISABLED
print(f"Writing modified channels to device")
interface.getNode(args.dest).writeChannel(channelIndex)
if args.get_canned_message:
closeNow = True
print("")
interface.getNode(args.dest).get_canned_message()
if args.get_ringtone:
closeNow = True
print("")
interface.getNode(args.dest).get_ringtone()
if args.info:
print("")
# If we aren't trying to talk to our local node, don't show it
if args.dest == BROADCAST_ADDR:
interface.showInfo()
print("")
interface.getNode(args.dest).showInfo()
closeNow = True
print("")
pypi_version = meshtastic.util.check_if_newer_version()
if pypi_version:
print(
f"*** A newer version v{pypi_version} is available!"
' Consider running "pip install --upgrade meshtastic" ***\n'
)
else:
print("Showing info of remote node is not supported.")
print(
"Use the '--get' command for a specific configuration (e.g. 'lora') instead."
)
if args.get:
closeNow = True
node = interface.getNode(args.dest, False)
for pref in args.get:
found = getPref(node, pref[0])
if found:
print("Completed getting preferences")
if args.nodes:
closeNow = True
if args.dest != BROADCAST_ADDR:
print("Showing node list of a remote node is not supported.")
return
interface.showNodes()
if args.qr:
closeNow = True
url = interface.localNode.getURL(includeAll=False)
print(f"Primary channel URL {url}")
qr = pyqrcode.create(url)
print(qr.terminal())
if args.listen:
closeNow = False
have_tunnel = platform.system() == "Linux"
if have_tunnel and args.tunnel:
# pylint: disable=C0415
from . import tunnel
# Even if others said we could close, stay open if the user asked for a tunnel
closeNow = False
if interface.noProto:
logging.warning(f"Not starting Tunnel - disabled by noProto")
else:
if args.tunnel_net:
tunnel.Tunnel(interface, subnet=args.tunnel_net)
else:
tunnel.Tunnel(interface)
if args.ack or (args.dest != BROADCAST_ADDR and waitForAckNak):
print(
f"Waiting for an acknowledgment from remote node (this could take a while)"
)
interface.getNode(args.dest, False).iface.waitForAckNak()
# if the user didn't ask for serial debugging output, we might want to exit after we've done our operation
if (not args.seriallog) and closeNow:
interface.close() # after running command then exit
except Exception as ex:
print(f"Aborting due to: {ex}")
interface.close() # close the connection now, so that our app exits
sys.exit(1)
def printConfig(config):
"""print configuration"""
objDesc = config.DESCRIPTOR
for config_section in objDesc.fields:
if config_section.name != "version":
config = objDesc.fields_by_name.get(config_section.name)
print(f"{config_section.name}:")
names = []
for field in config.message_type.fields:
tmp_name = f"{config_section.name}.{field.name}"
if globals.camel_case:
tmp_name = meshtastic.util.snake_to_camel(tmp_name)
names.append(tmp_name)
for temp_name in sorted(names):
print(f" {temp_name}")
def onNode(node):
"""Callback invoked when the node DB changes"""
print(f"Node changed: {node}")
def subscribe():
"""Subscribe to the topics the user probably wants to see, prints output to stdout"""
pub.subscribe(onReceive, "meshtastic.receive")
# pub.subscribe(onConnection, "meshtastic.connection")
# We now call onConnected from main
# pub.subscribe(onConnected, "meshtastic.connection.established")
# pub.subscribe(onNode, "meshtastic.node")
def export_config(interface):
"""used in --export-config"""
configObj = {}
owner = interface.getLongName()
owner_short = interface.getShortName()
channel_url = interface.localNode.getURL()
myinfo = interface.getMyNodeInfo()
pos = myinfo.get("position")
lat = None
lon = None
alt = None
if pos:
lat = pos.get("latitude")
lon = pos.get("longitude")
alt = pos.get("altitude")
if owner:
configObj["owner"] = owner
if owner_short:
configObj["owner_short"] = owner_short
if channel_url:
if globals.camel_case:
configObj["channelUrl"] = channel_url
else:
configObj["channel_url"] = channel_url
if lat or lon or alt:
configObj["location"] = {"lat": lat, "lon": lon, "alt": alt}
config = MessageToDict(interface.localNode.localConfig)
if config:
# Convert inner keys to correct snake/camelCase
prefs = {}
for pref in config:
if globals.camel_case:
prefs[meshtastic.util.snake_to_camel(pref)] = config[pref]
else:
prefs[pref] = config[pref]
if globals.camel_case:
configObj["config"] = config
else:
configObj["config"] = config
module_config = MessageToDict(interface.localNode.moduleConfig)
if module_config:
# Convert inner keys to correct snake/camelCase
prefs = {}
for pref in module_config:
if len(module_config[pref]) > 0:
prefs[pref] = module_config[pref]
if globals.camel_case:
configObj["module_config"] = prefs
else:
configObj["module_config"] = prefs
config = "# start of Meshtastic configure yaml\n"
config += yaml.dump(configObj)
print(config)
return config
def common():
"""Shared code for all of our command line wrappers"""
logfile = None
args = globals.args
parser = globals.parser
logging.basicConfig(
level=logging.DEBUG if (args.debug or args.listen) else logging.INFO,
format="%(levelname)s file:%(filename)s %(funcName)s line:%(lineno)s %(message)s",
)
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
meshtastic.util.our_exit("", 1)
else:
if args.support:
meshtastic.util.support_info()
meshtastic.util.our_exit("", 0)
if args.ch_index is not None:
channelIndex = int(args.ch_index)
globals.channel_index = channelIndex
if not args.dest:
args.dest = BROADCAST_ADDR
if not args.seriallog:
if args.noproto:
args.seriallog = "stdout"
else:
args.seriallog = "none" # assume no debug output in this case
if args.deprecated is not None:
logging.error(
"This option has been deprecated, see help below for the correct replacement..."
)
parser.print_help(sys.stderr)
meshtastic.util.our_exit("", 1)
elif args.test:
result = meshtastic.test.testAll()
if not result:
meshtastic.util.our_exit("Warning: Test was not successful.")
else:
meshtastic.util.our_exit("Test was a success.", 0)
else:
if args.seriallog == "stdout":
logfile = sys.stdout
elif args.seriallog == "none":
args.seriallog = None
logging.debug("Not logging serial output")
logfile = None
else:
logging.info(f"Logging serial output to {args.seriallog}")
# Note: using "line buffering"
# pylint: disable=R1732
logfile = open(args.seriallog, "w+", buffering=1, encoding="utf8")
globals.logfile = logfile
subscribe()
if args.ble_scan:
logging.debug("BLE scan starting")
client = BLEInterface(None, debugOut=logfile, noProto=args.noproto)
try:
for x in client.scan():
print(f"Found: name='{x[1].local_name}' address='{x[0].address}'")
finally:
client.close()
meshtastic.util.our_exit("BLE scan finished", 0)
return
elif args.ble:
client = BLEInterface(args.ble, debugOut=logfile, noProto=args.noproto)
elif args.host:
try:
client = meshtastic.tcp_interface.TCPInterface(
args.host, debugOut=logfile, noProto=args.noproto
)
except Exception as ex:
meshtastic.util.our_exit(
f"Error connecting to {args.host}:{ex}", 1
)
else:
try:
client = meshtastic.serial_interface.SerialInterface(
args.port, debugOut=logfile, noProto=args.noproto
)
except PermissionError as ex:
username = os.getlogin()
message = "Permission Error:\n"
message += (
" Need to add yourself to the 'dialout' group by running:\n"
)
message += f" sudo usermod -a -G dialout {username}\n"
message += " After running that command, log out and re-login for it to take effect.\n"
message += f"Error was:{ex}"
meshtastic.util.our_exit(message)
if client.devPath is None:
try:
client = meshtastic.tcp_interface.TCPInterface(
"localhost", debugOut=logfile, noProto=args.noproto
)
except Exception as ex:
meshtastic.util.our_exit(
f"Error connecting to localhost:{ex}", 1
)
# We assume client is fully connected now
onConnected(client)
have_tunnel = platform.system() == "Linux"
if (
args.noproto or args.reply or (have_tunnel and args.tunnel) or args.listen
): # loop until someone presses ctrlc
while True:
time.sleep(1000)
# don't call exit, background threads might be running still
# sys.exit(0)
def addConnectionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""Add connection specifiation arguments"""
outer = parser.add_argument_group('Connection', 'Optional arguments that specify how to connect to a Meshtastic device.')
group = outer.add_mutually_exclusive_group()
group.add_argument(
"--port",
help="The port of the device to connect to using serial, e.g. /dev/ttyUSB0.",
default=None,
)
group.add_argument(
"--host",
help="The hostname or IP address of the device to connect to using TCP",
default=None,
)
group.add_argument(
"--ble",
help="The BLE device address or name to connect to",
default=None,
)
return parser
def initParser():
"""Initialize the command line argument parsing."""
parser = globals.parser
args = globals.args
# The "Help" group includes the help option and other informational stuff about the CLI itself
outerHelpGroup = parser.add_argument_group('Help')
helpGroup = outerHelpGroup.add_mutually_exclusive_group()
helpGroup.add_argument("-h", "--help", action="help", help="show this help message and exit")
the_version = get_active_version()
helpGroup.add_argument("--version", action="version", version=f"{the_version}")
helpGroup.add_argument(
"--support",
action="store_true",
help="Show support info (useful when troubleshooting an issue)",
)
# Connection arguments to indicate a device to connect to
parser = addConnectionArgs(parser)
# Arguments concerning viewing and setting configuration
# Arguments for sending or requesting things from the local device
# Arguments for sending or requesting things from the mesh
# All the rest of the arguments
group = parser.add_argument_group("optional arguments")
group.add_argument(
"--configure",
help="Specify a path to a yaml(.yml) file containing the desired settings for the connected device.",
action="append",
)
group.add_argument(
"--export-config",
help="Export the configuration in yaml(.yml) format.",
action="store_true",
)
group.add_argument(
"--seriallog",
help="Log device serial output to either 'stdout', 'none' or a filename to append to.",
)
group.add_argument(
"--info",
help="Read and display the radio config information",
action="store_true",
)
group.add_argument(
"--get-canned-message",
help="Show the canned message plugin message",
action="store_true",
)
group.add_argument(
"--get-ringtone", help="Show the stored ringtone", action="store_true"
)
group.add_argument(
"--nodes",
help="Print Node List in a pretty formatted table",
action="store_true",
)
group.add_argument(
"--qr",
help="Display the QR code that corresponds to the current channel",
action="store_true",
)
group.add_argument(
"--get",
help=(
"Get a preferences field. Use an invalid field such as '0' to get a list of all fields."
" Can use either snake_case or camelCase format. (ex: 'ls_secs' or 'lsSecs')"
),
nargs=1,
action="append",
)
group.add_argument(
"--set",
help="Set a preferences field. Can use either snake_case or camelCase format. (ex: 'ls_secs' or 'lsSecs')",
nargs=2,
action="append",
)
group.add_argument("--seturl", help="Set a channel URL", action="store")
group.add_argument(
"--ch-index",
help="Set the specified channel index. Channels start at 0 (0 is the PRIMARY channel).",
action="store",
)
group.add_argument(
"--ch-add",
help="Add a secondary channel, you must specify a channel name",
default=None,
)
group.add_argument(
"--ch-del", help="Delete the ch-index channel", action="store_true"
)
group.add_argument(
"--ch-enable",
help="Enable the specified channel",
action="store_true",
dest="ch_enable",
default=False,
)
# Note: We are doing a double negative here (Do we want to disable? If ch_disable==True, then disable.)
group.add_argument(
"--ch-disable",
help="Disable the specified channel",
action="store_true",
dest="ch_disable",
default=False,
)
group.add_argument(
"--ch-set",
help=(
"Set a channel parameter. To see channel settings available:'--ch-set all all --ch-index 0'. "
"Can set the 'psk' using this command. To disable encryption on primary channel:'--ch-set psk none --ch-index 0'. "
"To set encryption with a new random key on second channel:'--ch-set psk random --ch-index 1'. "
"To set encryption back to the default:'--ch-set psk default --ch-index 0'. To set encryption with your "
"own key: '--ch-set psk 0x1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b --ch-index 0'."
),
nargs=2,
action="append",
)
group.add_argument(
"--ch-vlongslow",
help="Change to the very long-range and slow channel",
action="store_true",
)
group.add_argument(
"--ch-longslow",
help="Change to the long-range and slow channel",
action="store_true",
)
group.add_argument(
"--ch-longfast",
help="Change to the long-range and fast channel",
action="store_true",
)
group.add_argument(
"--ch-medslow",
help="Change to the med-range and slow channel",
action="store_true",
)
group.add_argument(
"--ch-medfast",
help="Change to the med-range and fast channel",
action="store_true",
)
group.add_argument(
"--ch-shortslow",
help="Change to the short-range and slow channel",
action="store_true",
)
group.add_argument(
"--ch-shortfast",
help="Change to the short-range and fast channel",
action="store_true",
)
group.add_argument("--set-owner", help="Set device owner name", action="store")
group.add_argument(
"--set-canned-message",
help="Set the canned messages plugin message (up to 200 characters).",
action="store",
)
group.add_argument(
"--set-ringtone",
help="Set the Notification Ringtone (up to 230 characters).",
action="store",
)
group.add_argument(
"--set-owner-short", help="Set device owner short name", action="store"
)
group.add_argument(
"--set-ham", help="Set licensed Ham ID and turn off encryption", action="store"
)
group.add_argument(
"--dest",
help="The destination node id for any sent commands, if not set '^all' or '^local' is assumed as appropriate",
default=None,
)
group.add_argument(
"--sendtext",
help="Send a text message. Can specify a destination '--dest' and/or channel index '--ch-index'.",
)
group.add_argument(
"--traceroute",
help="Traceroute from connected node to a destination. "
"You need pass the destination ID as argument, like "
"this: '--traceroute !ba4bf9d0' "
"Only nodes that have the encryption key can be traced.",
)
group.add_argument(
"--request-telemetry",
help="Request telemetry from a node. "
"You need pass the destination ID as argument with '--dest'. "
"For repeaters, the nodeNum is required.",
action="store_true",
)
group.add_argument(
"--ack",
help="Use in combination with --sendtext to wait for an acknowledgment.",
action="store_true",
)
group.add_argument(
"--reboot", help="Tell the destination node to reboot", action="store_true"
)
group.add_argument(
"--reboot-ota",
help="Tell the destination node to reboot into factory firmware",
action="store_true",
)
group.add_argument(
"--shutdown", help="Tell the destination node to shutdown", action="store_true"
)
group.add_argument(
"--device-metadata",
help="Get the device metadata from the node",
action="store_true",
)
group.add_argument(
"--begin-edit",
help="Tell the node to open a transaction to edit settings",
action="store_true",
)
group.add_argument(
"--commit-edit",
help="Tell the node to commit open settings transaction",
action="store_true",
)
group.add_argument(
"--factory-reset",
help="Tell the destination node to install the default config",
action="store_true",
)
group.add_argument(
"--remove-node",
help="Tell the destination node to remove a specific node from its DB, by node number or ID"
)
group.add_argument(
"--reset-nodedb",
help="Tell the destination node to clear its list of nodes",
action="store_true",
)
group.add_argument(
"--reply", help="Reply to received messages", action="store_true"
)
group.add_argument(
"--gpio-wrb", nargs=2, help="Set a particular GPIO # to 1 or 0", action="append"
)
group.add_argument("--gpio-rd", help="Read from a GPIO mask (ex: '0x10')")
group.add_argument(
"--gpio-watch", help="Start watching a GPIO mask for changes (ex: '0x10')"
)
group.add_argument(
"--no-time",
help="Suppress sending the current time to the mesh",
action="store_true",
)
group.add_argument("--setalt", help="Set device altitude in meters (allows use without GPS)")
group.add_argument("--setlat", help="Set device latitude (allows use without GPS)")
group.add_argument(
"--setlon", help="Set device longitude (allows use without GPS)"
)
group.add_argument(
"--pos-fields",
help="Specify fields to send when sending a position. Use no argument for a list of valid values. "
"Can pass multiple values as a space separated list like "
"this: '--pos-fields POS_ALTITUDE POS_ALT_MSL'",
nargs="*",
action="store",
)
group.add_argument(
"--debug", help="Show API library debug log messages", action="store_true"
)
group.add_argument(
"--test",
help="Run stress test against all connected Meshtastic devices",
action="store_true",
)
group.add_argument(
"--ble-scan",
help="Scan for Meshtastic BLE devices",
action="store_true",
)
group.add_argument(
"--noproto",
help="Don't start the API, just function as a dumb serial terminal.",
action="store_true",
)
group.add_argument(
"--listen",
help="Just stay open and listen to the protobuf stream. Enables debug logging.",
action="store_true",
)
have_tunnel = platform.system() == "Linux"
if have_tunnel:
tunnelArgs = parser.add_argument_group('Tunnel', 'Arguments related to establishing a tunnel device over the mesh.')
tunnelArgs.add_argument(
"--tunnel",
action="store_true",
help="Create a TUN tunnel device for forwarding IP packets over the mesh",
)
tunnelArgs.add_argument(
"--subnet",
dest="tunnel_net",
help="Sets the local-end subnet address for the TUN IP bridge. (ex: 10.115' which is the default)",
default=None,
)
parser.set_defaults(deprecated=None)
args = parser.parse_args()
globals.args = args
globals.parser = parser
def main():
"""Perform command line meshtastic operations"""
parser = argparse.ArgumentParser(
add_help=False,
epilog="If no connection arguments are specified, we search for a compatible serial device, "
"and if none is found, then attempt a TCP connection to localhost.")
globals.parser = parser
initParser()
common()
logfile = globals.logfile
if logfile:
logfile.close()
def tunnelMain():
"""Run a meshtastic IP tunnel"""
parser = argparse.ArgumentParser(add_help=False)
globals.parser = parser
initParser()
args = globals.args
args.tunnel = True
globals.args = args
common()
if __name__ == "__main__":
main()