Files
python/meshtastic/__main__.py

2227 lines
83 KiB
Python

""" Main Meshtastic
"""
# We just hit the 1600 line limit for main.py, but I currently have a huge set of powermon/structured logging changes
# later we can have a separate changelist to refactor main.py into smaller files
# pylint: disable=R0917,C0302
from typing import List, Optional, Union
from types import ModuleType
import argparse
argcomplete: Union[None, ModuleType] = None
try:
import argcomplete # type: ignore
except ImportError as e:
pass # already set to None by default above
import logging
import os
import platform
import sys
import time
try:
import pyqrcode # type: ignore[import-untyped]
except ImportError as e:
pyqrcode = None
import yaml
from google.protobuf.json_format import MessageToDict
from pubsub import pub # type: ignore[import-untyped]
try:
import meshtastic.test
have_test = True
except ImportError as e:
have_test = False
import meshtastic.util
import meshtastic.serial_interface
import meshtastic.tcp_interface
from meshtastic import BROADCAST_ADDR, mt_config, remote_hardware
from meshtastic.ble_interface import BLEInterface
from meshtastic.mesh_interface import MeshInterface
try:
from meshtastic.powermon import (
PowerMeter,
PowerStress,
PPK2PowerSupply,
RidenPowerSupply,
SimPowerSupply,
)
from meshtastic.slog import LogSet
have_powermon = True
powermon_exception = None
meter: Optional[PowerMeter] = None
except ImportError as e:
have_powermon = False
powermon_exception = e
meter = None
from meshtastic.protobuf import channel_pb2, config_pb2, portnums_pb2, mesh_pb2
from meshtastic.version import get_active_version
logger = logging.getLogger(__name__)
def onReceive(packet, interface) -> None:
"""Callback invoked when a packet arrives"""
args = mt_config.args
try:
d = packet.get("decoded")
logger.debug(f"in onReceive() d:{d}")
# Exit once we receive a reply
if (
args
and args.sendtext
and packet["to"] == interface.myInfo.my_node_num
and d.get("portnum", portnums_pb2.PortNum.UNKNOWN_APP) == portnums_pb2.PortNum.TEXT_MESSAGE_APP
):
interface.close() # after running command then exit
# Reply to every received message with some stats
if d is not None and args and args.reply:
msg = d.get("text")
if msg:
rxSnr = packet["rxSnr"]
hopLimit = packet["hopLimit"]
print(f"message: {msg}")
reply = f"got msg '{msg}' with rxSnr: {rxSnr} and hopLimit: {hopLimit}"
print("Sending reply: ", reply)
interface.sendText(reply)
except Exception as ex:
print(f"Warning: Error processing received packet: {ex}.")
def onConnection(interface, topic=pub.AUTO_TOPIC) -> None: # pylint: disable=W0613
"""Callback invoked when we connect/disconnect from a radio"""
print(f"Connection changed: {topic.getName()}")
def checkChannel(interface: MeshInterface, channelIndex: int) -> bool:
"""Given an interface and channel index, return True if that channel is non-disabled on the local node"""
ch = interface.localNode.getChannelByChannelIndex(channelIndex)
logger.debug(f"ch:{ch}")
return ch and ch.role != channel_pb2.Channel.Role.DISABLED
def getPref(node, comp_name) -> bool:
"""Get a channel or preferences value"""
def _printSetting(config_type, uni_name, pref_value, repeated):
"""Pretty print the setting"""
if repeated:
pref_value = [meshtastic.util.toStr(v) for v in pref_value]
else:
pref_value = meshtastic.util.toStr(pref_value)
print(f"{str(config_type.name)}.{uni_name}: {str(pref_value)}")
logger.debug(f"{str(config_type.name)}.{uni_name}: {str(pref_value)}")
name = splitCompoundName(comp_name)
wholeField = name[0] == name[1] # We want the whole field
camel_name = meshtastic.util.snake_to_camel(name[1])
# Note: protobufs has the keys in snake_case, so snake internally
snake_name = meshtastic.util.camel_to_snake(name[1])
uni_name = camel_name if mt_config.camel_case else snake_name
logger.debug(f"snake_name:{snake_name} camel_name:{camel_name}")
logger.debug(f"use camel:{mt_config.camel_case}")
# First validate the input
localConfig = node.localConfig
moduleConfig = node.moduleConfig
found: bool = False
for config in [localConfig, moduleConfig]:
objDesc = config.DESCRIPTOR
config_type = objDesc.fields_by_name.get(name[0])
pref = "" #FIXME - is this correct to leave as an empty string if not found?
if config_type:
pref = config_type.message_type.fields_by_name.get(snake_name)
if pref or wholeField:
found = True
break
if not found:
print(
f"{localConfig.__class__.__name__} and {moduleConfig.__class__.__name__} do not have attribute {uni_name}."
)
print("Choices are...")
printConfig(localConfig)
printConfig(moduleConfig)
return False
# Check if we need to request the config
if len(config.ListFields()) != 0 and not isinstance(pref, str): # if str, it's still the empty string, I think
# read the value
config_values = getattr(config, config_type.name)
if not wholeField:
pref_value = getattr(config_values, pref.name)
repeated = pref.label == pref.LABEL_REPEATED
_printSetting(config_type, uni_name, pref_value, repeated)
else:
for field in config_values.ListFields():
repeated = field[0].label == field[0].LABEL_REPEATED
_printSetting(config_type, field[0].name, field[1], repeated)
else:
# Always show whole field for remote node
node.requestConfig(config_type)
return True
def splitCompoundName(comp_name: str) -> List[str]:
"""Split compound (dot separated) preference name into parts"""
name: List[str] = comp_name.split(".")
if len(name) < 2:
name[0] = comp_name
name.append(comp_name)
return name
def traverseConfig(config_root, config, interface_config) -> bool:
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
snake_name = meshtastic.util.camel_to_snake(config_root)
for pref in config:
pref_name = f"{snake_name}.{pref}"
if isinstance(config[pref], dict):
traverseConfig(pref_name, config[pref], interface_config)
else:
setPref(interface_config, pref_name, config[pref])
return True
def setPref(config, comp_name, raw_val) -> bool:
"""Set a channel or preferences value"""
name = splitCompoundName(comp_name)
snake_name = meshtastic.util.camel_to_snake(name[-1])
camel_name = meshtastic.util.snake_to_camel(name[-1])
uni_name = camel_name if mt_config.camel_case else snake_name
logger.debug(f"snake_name:{snake_name}")
logger.debug(f"camel_name:{camel_name}")
objDesc = config.DESCRIPTOR
config_part = config
config_type = objDesc.fields_by_name.get(name[0])
if config_type and config_type.message_type is not None:
for name_part in name[1:-1]:
part_snake_name = meshtastic.util.camel_to_snake((name_part))
config_part = getattr(config, config_type.name)
config_type = config_type.message_type.fields_by_name.get(part_snake_name)
pref = None
if config_type and config_type.message_type is not None:
pref = config_type.message_type.fields_by_name.get(snake_name)
# Others like ChannelSettings are standalone
elif config_type:
pref = config_type
if (not pref) or (not config_type):
return False
if isinstance(raw_val, str):
val = meshtastic.util.fromStr(raw_val)
else:
val = raw_val
logger.debug(f"valStr:{raw_val} val:{val}")
if snake_name == "wifi_psk" and len(str(raw_val)) < 8:
print("Warning: network.wifi_psk must be 8 or more characters.")
return False
enumType = pref.enum_type
# pylint: disable=C0123
if enumType and type(val) == str:
# We've failed so far to convert this string into an enum, try to find it by reflection
e = enumType.values_by_name.get(val)
if e:
val = e.number
else:
print(
f"{name[0]}.{uni_name} does not have an enum called {val}, so you can not set it."
)
print(f"Choices in sorted order are:")
names = []
for f in enumType.values:
# Note: We must use the value of the enum (regardless if camel or snake case)
names.append(f"{f.name}")
for temp_name in sorted(names):
print(f" {temp_name}")
return False
# repeating fields need to be handled with append, not setattr
if pref.label != pref.LABEL_REPEATED:
try:
if config_type.message_type is not None:
config_values = getattr(config_part, config_type.name)
setattr(config_values, pref.name, val)
else:
setattr(config_part, snake_name, val)
except TypeError:
# The setter didn't like our arg type guess try again as a string
config_values = getattr(config_part, config_type.name)
setattr(config_values, pref.name, str(val))
elif type(val) == list:
new_vals = [meshtastic.util.fromStr(x) for x in val]
config_values = getattr(config, config_type.name)
getattr(config_values, pref.name)[:] = new_vals
else:
config_values = getattr(config, config_type.name)
if val == 0:
# clear values
print(f"Clearing {pref.name} list")
del getattr(config_values, pref.name)[:]
else:
print(f"Adding '{raw_val}' to the {pref.name} list")
cur_vals = [x for x in getattr(config_values, pref.name) if x not in [0, "", b""]]
if val not in cur_vals:
cur_vals.append(val)
getattr(config_values, pref.name)[:] = cur_vals
return True
prefix = f"{'.'.join(name[0:-1])}." if config_type.message_type is not None else ""
print(f"Set {prefix}{uni_name} to {raw_val}")
return True
def onConnected(interface):
"""Callback invoked when we connect to a radio"""
closeNow = False # Should we drop the connection after we finish?
waitForAckNak = (
False # Should we wait for an acknowledgment if we send to a remote node?
)
try:
args = mt_config.args
# convenient place to store any keyword args we pass to getNode
getNode_kwargs = {
"requestChannelAttempts": args.channel_fetch_attempts,
"timeout": args.timeout
}
# do not print this line if we are exporting the config
if not args.export_config:
print("Connected to radio")
if args.set_time is not None:
interface.getNode(args.dest, False, **getNode_kwargs).setTime(args.set_time)
if args.remove_position:
closeNow = True
waitForAckNak = True
print("Removing fixed position and disabling fixed position setting")
interface.getNode(args.dest, False, **getNode_kwargs).removeFixedPosition()
elif args.setlat or args.setlon or args.setalt:
closeNow = True
waitForAckNak = True
alt = 0
lat = 0
lon = 0
if args.setalt:
alt = int(args.setalt)
print(f"Fixing altitude at {alt} meters")
if args.setlat:
try:
lat = int(args.setlat)
except ValueError:
lat = float(args.setlat)
print(f"Fixing latitude at {lat} degrees")
if args.setlon:
try:
lon = int(args.setlon)
except ValueError:
lon = float(args.setlon)
print(f"Fixing longitude at {lon} degrees")
print("Setting device position and enabling fixed position setting")
# can include lat/long/alt etc: latitude = 37.5, longitude = -122.1
interface.getNode(args.dest, False, **getNode_kwargs).setFixedPosition(lat, lon, alt)
if args.set_owner or args.set_owner_short or args.set_is_unmessageable:
closeNow = True
waitForAckNak = True
long_name = args.set_owner.strip() if args.set_owner else None
short_name = args.set_owner_short.strip() if args.set_owner_short else None
if long_name is not None and not long_name:
meshtastic.util.our_exit("ERROR: Long Name cannot be empty or contain only whitespace characters")
if short_name is not None and not short_name:
meshtastic.util.our_exit("ERROR: Short Name cannot be empty or contain only whitespace characters")
if long_name and short_name:
print(f"Setting device owner to {long_name} and short name to {short_name}")
elif long_name:
print(f"Setting device owner to {long_name}")
elif short_name:
print(f"Setting device owner short to {short_name}")
unmessagable = None
if args.set_is_unmessageable is not None:
unmessagable = (
meshtastic.util.fromStr(args.set_is_unmessageable)
if isinstance(args.set_is_unmessageable, str)
else args.set_is_unmessageable
)
print(f"Setting device owner is_unmessageable to {unmessagable}")
interface.getNode(args.dest, False, **getNode_kwargs).setOwner(
long_name=long_name,
short_name=short_name,
is_unmessagable=unmessagable
)
if args.set_canned_message:
closeNow = True
waitForAckNak = True
node = interface.getNode(args.dest, False, **getNode_kwargs)
if node.module_available(mesh_pb2.CANNEDMSG_CONFIG):
print(f"Setting canned plugin message to {args.set_canned_message}")
node.set_canned_message(args.set_canned_message)
else:
print("Canned Message module is excluded by firmware; skipping set.")
if args.set_ringtone:
closeNow = True
waitForAckNak = True
node = interface.getNode(args.dest, False, **getNode_kwargs)
if node.module_available(mesh_pb2.EXTNOTIF_CONFIG):
print(f"Setting ringtone to {args.set_ringtone}")
node.set_ringtone(args.set_ringtone)
else:
print("External Notification is excluded by firmware; skipping ringtone set.")
if args.pos_fields:
# If --pos-fields invoked with args, set position fields
closeNow = True
positionConfig = interface.getNode(args.dest, **getNode_kwargs).localConfig.position
allFields = 0
try:
for field in args.pos_fields:
v_field = positionConfig.PositionFlags.Value(field)
allFields |= v_field
except ValueError:
print("ERROR: supported position fields are:")
print(positionConfig.PositionFlags.keys())
print(
"If no fields are specified, will read and display current value."
)
else:
print(f"Setting position fields to {allFields}")
setPref(positionConfig, "position_flags", f"{allFields:d}")
print("Writing modified preferences to device")
interface.getNode(args.dest, **getNode_kwargs).writeConfig("position")
elif args.pos_fields is not None:
# If --pos-fields invoked without args, read and display current value
closeNow = True
positionConfig = interface.getNode(args.dest, **getNode_kwargs).localConfig.position
fieldNames = []
for bit in positionConfig.PositionFlags.values():
if positionConfig.position_flags & bit:
fieldNames.append(positionConfig.PositionFlags.Name(bit))
print(" ".join(fieldNames))
if args.set_ham:
if not args.set_ham.strip():
meshtastic.util.our_exit("ERROR: Ham radio callsign cannot be empty or contain only whitespace characters")
closeNow = True
print(f"Setting Ham ID to {args.set_ham} and turning off encryption")
interface.getNode(args.dest, **getNode_kwargs).setOwner(args.set_ham, is_licensed=True)
# Must turn off encryption on primary channel
interface.getNode(args.dest, **getNode_kwargs).turnOffEncryptionOnPrimaryChannel()
if args.reboot:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).reboot()
if args.reboot_ota:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).rebootOTA()
if args.enter_dfu:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).enterDFUMode()
if args.shutdown:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).shutdown()
if args.device_metadata:
closeNow = True
interface.getNode(args.dest, False, **getNode_kwargs).getMetadata()
if args.begin_edit:
closeNow = True
interface.getNode(args.dest, False, **getNode_kwargs).beginSettingsTransaction()
if args.commit_edit:
closeNow = True
interface.getNode(args.dest, False, **getNode_kwargs).commitSettingsTransaction()
if args.factory_reset or args.factory_reset_device:
closeNow = True
waitForAckNak = True
full = bool(args.factory_reset_device)
interface.getNode(args.dest, False, **getNode_kwargs).factoryReset(full=full)
if args.remove_node:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).removeNode(args.remove_node)
if args.set_favorite_node:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).setFavorite(args.set_favorite_node)
if args.remove_favorite_node:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).removeFavorite(args.remove_favorite_node)
if args.set_ignored_node:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).setIgnored(args.set_ignored_node)
if args.remove_ignored_node:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).removeIgnored(args.remove_ignored_node)
if args.reset_nodedb:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).resetNodeDb()
if args.sendtext:
closeNow = True
channelIndex = mt_config.channel_index or 0
if checkChannel(interface, channelIndex):
print(
f"Sending text message {args.sendtext} to {args.dest} on channelIndex:{channelIndex}"
f" {'using PRIVATE_APP port' if args.private else ''}"
)
interface.sendText(
args.sendtext,
args.dest,
wantAck=True,
channelIndex=channelIndex,
onResponse=interface.getNode(args.dest, False, **getNode_kwargs).onAckNak,
portNum=portnums_pb2.PortNum.PRIVATE_APP if args.private else portnums_pb2.PortNum.TEXT_MESSAGE_APP
)
else:
meshtastic.util.our_exit(
f"Warning: {channelIndex} is not a valid channel. Channel must not be DISABLED."
)
if args.traceroute:
loraConfig = getattr(interface.localNode.localConfig, "lora")
hopLimit = getattr(loraConfig, "hop_limit")
dest = str(args.traceroute)
channelIndex = mt_config.channel_index or 0
if checkChannel(interface, channelIndex):
print(
f"Sending traceroute request to {dest} on channelIndex:{channelIndex} (this could take a while)"
)
interface.sendTraceRoute(dest, hopLimit, channelIndex=channelIndex)
if args.request_telemetry:
if args.dest == BROADCAST_ADDR:
meshtastic.util.our_exit("Warning: Must use a destination node ID.")
else:
channelIndex = mt_config.channel_index or 0
if checkChannel(interface, channelIndex):
telemMap = {
"device": "device_metrics",
"environment": "environment_metrics",
"air_quality": "air_quality_metrics",
"airquality": "air_quality_metrics",
"power": "power_metrics",
"localstats": "local_stats",
"local_stats": "local_stats",
}
telemType = telemMap.get(args.request_telemetry, "device_metrics")
print(
f"Sending {telemType} telemetry request to {args.dest} on channelIndex:{channelIndex} (this could take a while)"
)
interface.sendTelemetry(
destinationId=args.dest,
wantResponse=True,
channelIndex=channelIndex,
telemetryType=telemType,
)
if args.request_position:
if args.dest == BROADCAST_ADDR:
meshtastic.util.our_exit("Warning: Must use a destination node ID.")
else:
channelIndex = mt_config.channel_index or 0
if checkChannel(interface, channelIndex):
print(
f"Sending position request to {args.dest} on channelIndex:{channelIndex} (this could take a while)"
)
interface.sendPosition(
destinationId=args.dest,
wantResponse=True,
channelIndex=channelIndex,
)
if args.gpio_wrb or args.gpio_rd or args.gpio_watch:
if args.dest == BROADCAST_ADDR:
meshtastic.util.our_exit("Warning: Must use a destination node ID.")
else:
rhc = remote_hardware.RemoteHardwareClient(interface)
if args.gpio_wrb:
bitmask = 0
bitval = 0
for wrpair in args.gpio_wrb or []:
bitmask |= 1 << int(wrpair[0])
bitval |= int(wrpair[1]) << int(wrpair[0])
print(
f"Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}"
)
rhc.writeGPIOs(args.dest, bitmask, bitval)
closeNow = True
if args.gpio_rd:
bitmask = int(args.gpio_rd, 16)
print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}")
interface.mask = bitmask
rhc.readGPIOs(args.dest, bitmask, None)
# wait up to X seconds for a response
for _ in range(10):
time.sleep(1)
if interface.gotResponse:
break
logger.debug(f"end of gpio_rd")
if args.gpio_watch:
bitmask = int(args.gpio_watch, 16)
print(
f"Watching GPIO mask 0x{bitmask:x} from {args.dest}. Press ctrl-c to exit"
)
while True:
rhc.watchGPIOs(args.dest, bitmask)
time.sleep(1)
# handle settings
if args.set:
closeNow = True
waitForAckNak = True
node = interface.getNode(args.dest, False, **getNode_kwargs)
# Handle the int/float/bool arguments
pref = None
fields = set()
for pref in args.set:
found = False
field = splitCompoundName(pref[0].lower())[0]
for config in [node.localConfig, node.moduleConfig]:
config_type = config.DESCRIPTOR.fields_by_name.get(field)
if config_type:
if len(config.ListFields()) == 0:
node.requestConfig(
config.DESCRIPTOR.fields_by_name.get(field)
)
found = setPref(config, pref[0], pref[1])
if found:
fields.add(field)
break
if found:
print("Writing modified preferences to device")
if len(fields) > 1:
print("Using a configuration transaction")
node.beginSettingsTransaction()
for field in fields:
print(f"Writing {field} configuration to device")
node.writeConfig(field)
if len(fields) > 1:
node.commitSettingsTransaction()
else:
if mt_config.camel_case:
print(
f"{node.localConfig.__class__.__name__} and {node.moduleConfig.__class__.__name__} do not have an attribute {pref[0]}."
)
else:
print(
f"{node.localConfig.__class__.__name__} and {node.moduleConfig.__class__.__name__} do not have attribute {pref[0]}."
)
print("Choices are...")
printConfig(node.localConfig)
printConfig(node.moduleConfig)
if args.configure:
with open(args.configure[0], encoding="utf8") as file:
configuration = yaml.safe_load(file)
closeNow = True
interface.getNode(args.dest, False, **getNode_kwargs).beginSettingsTransaction()
if "owner" in configuration:
# Validate owner name before setting
owner_name = str(configuration["owner"]).strip()
if not owner_name:
meshtastic.util.our_exit("ERROR: Long Name cannot be empty or contain only whitespace characters")
print(f"Setting device owner to {configuration['owner']}")
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).setOwner(configuration["owner"])
time.sleep(0.5)
if "owner_short" in configuration:
# Validate owner short name before setting
owner_short_name = str(configuration["owner_short"]).strip()
if not owner_short_name:
meshtastic.util.our_exit("ERROR: Short Name cannot be empty or contain only whitespace characters")
print(
f"Setting device owner short to {configuration['owner_short']}"
)
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).setOwner(
long_name=None, short_name=configuration["owner_short"]
)
time.sleep(0.5)
if "ownerShort" in configuration:
# Validate owner short name before setting
owner_short_name = str(configuration["ownerShort"]).strip()
if not owner_short_name:
meshtastic.util.our_exit("ERROR: Short Name cannot be empty or contain only whitespace characters")
print(
f"Setting device owner short to {configuration['ownerShort']}"
)
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).setOwner(
long_name=None, short_name=configuration["ownerShort"]
)
time.sleep(0.5)
if "channel_url" in configuration:
print("Setting channel url to", configuration["channel_url"])
interface.getNode(args.dest, **getNode_kwargs).setURL(configuration["channel_url"])
time.sleep(0.5)
if "channelUrl" in configuration:
print("Setting channel url to", configuration["channelUrl"])
interface.getNode(args.dest, **getNode_kwargs).setURL(configuration["channelUrl"])
time.sleep(0.5)
if "canned_messages" in configuration:
print("Setting canned message messages to", configuration["canned_messages"])
interface.getNode(args.dest, **getNode_kwargs).set_canned_message(configuration["canned_messages"])
time.sleep(0.5)
if "ringtone" in configuration:
print("Setting ringtone to", configuration["ringtone"])
interface.getNode(args.dest, **getNode_kwargs).set_ringtone(configuration["ringtone"])
time.sleep(0.5)
if "location" in configuration:
alt = 0
lat = 0.0
lon = 0.0
localConfig = interface.localNode.localConfig
if "alt" in configuration["location"]:
alt = int(configuration["location"]["alt"] or 0)
print(f"Fixing altitude at {alt} meters")
if "lat" in configuration["location"]:
lat = float(configuration["location"]["lat"] or 0)
print(f"Fixing latitude at {lat} degrees")
if "lon" in configuration["location"]:
lon = float(configuration["location"]["lon"] or 0)
print(f"Fixing longitude at {lon} degrees")
print("Setting device position")
interface.localNode.setFixedPosition(lat, lon, alt)
time.sleep(0.5)
if "config" in configuration:
localConfig = interface.getNode(args.dest, **getNode_kwargs).localConfig
for section in configuration["config"]:
traverseConfig(
section, configuration["config"][section], localConfig
)
interface.getNode(args.dest, **getNode_kwargs).writeConfig(
meshtastic.util.camel_to_snake(section)
)
time.sleep(0.5)
if "module_config" in configuration:
moduleConfig = interface.getNode(args.dest, **getNode_kwargs).moduleConfig
for section in configuration["module_config"]:
traverseConfig(
section,
configuration["module_config"][section],
moduleConfig,
)
interface.getNode(args.dest, **getNode_kwargs).writeConfig(
meshtastic.util.camel_to_snake(section)
)
time.sleep(0.5)
interface.getNode(args.dest, False, **getNode_kwargs).commitSettingsTransaction()
print("Writing modified configuration to device")
if args.export_config:
if args.dest != BROADCAST_ADDR:
print("Exporting configuration of remote nodes is not supported.")
return
closeNow = True
config_txt = export_config(interface)
if args.export_config == "-":
# Output to stdout (preserves legacy use of `> file.yaml`)
print(config_txt)
else:
try:
with open(args.export_config, "w", encoding="utf-8") as f:
f.write(config_txt)
print(f"Exported configuration to {args.export_config}")
except Exception as e:
meshtastic.util.our_exit(f"ERROR: Failed to write config file: {e}")
if args.ch_set_url:
closeNow = True
interface.getNode(args.dest, **getNode_kwargs).setURL(args.ch_set_url, addOnly=False)
# handle changing channels
if args.ch_add_url:
closeNow = True
interface.getNode(args.dest, **getNode_kwargs).setURL(args.ch_add_url, addOnly=True)
if args.ch_add:
channelIndex = mt_config.channel_index
if channelIndex is not None:
# Since we set the channel index after adding a channel, don't allow --ch-index
meshtastic.util.our_exit(
"Warning: '--ch-add' and '--ch-index' are incompatible. Channel not added."
)
closeNow = True
if len(args.ch_add) > 10:
meshtastic.util.our_exit(
"Warning: Channel name must be shorter. Channel not added."
)
n = interface.getNode(args.dest, **getNode_kwargs)
ch = n.getChannelByName(args.ch_add)
if ch:
meshtastic.util.our_exit(
f"Warning: This node already has a '{args.ch_add}' channel. No changes were made."
)
else:
# get the first channel that is disabled (i.e., available)
ch = n.getDisabledChannel()
if not ch:
meshtastic.util.our_exit("Warning: No free channels were found")
chs = channel_pb2.ChannelSettings()
chs.psk = meshtastic.util.genPSK256()
chs.name = args.ch_add
ch.settings.CopyFrom(chs)
ch.role = channel_pb2.Channel.Role.SECONDARY
print(f"Writing modified channels to device")
n.writeChannel(ch.index)
if channelIndex is None:
print(
f"Setting newly-added channel's {ch.index} as '--ch-index' for further modifications"
)
mt_config.channel_index = ch.index
if args.ch_del:
closeNow = True
channelIndex = mt_config.channel_index
if channelIndex is None:
meshtastic.util.our_exit(
"Warning: Need to specify '--ch-index' for '--ch-del'.", 1
)
else:
if channelIndex == 0:
meshtastic.util.our_exit(
"Warning: Cannot delete primary channel.", 1
)
else:
print(f"Deleting channel {channelIndex}")
ch = interface.getNode(args.dest, **getNode_kwargs).deleteChannel(channelIndex)
def setSimpleConfig(modem_preset):
"""Set one of the simple modem_config"""
channelIndex = mt_config.channel_index
if channelIndex is not None and channelIndex > 0:
meshtastic.util.our_exit(
"Warning: Cannot set modem preset for non-primary channel", 1
)
# Overwrite modem_preset
node = interface.getNode(args.dest, False, **getNode_kwargs)
if len(node.localConfig.ListFields()) == 0:
node.requestConfig(node.localConfig.DESCRIPTOR.fields_by_name.get("lora"))
node.localConfig.lora.modem_preset = modem_preset
node.writeConfig("lora")
# handle the simple radio set commands
if args.ch_vlongslow:
setSimpleConfig(config_pb2.Config.LoRaConfig.ModemPreset.VERY_LONG_SLOW)
if args.ch_longslow:
setSimpleConfig(config_pb2.Config.LoRaConfig.ModemPreset.LONG_SLOW)
if args.ch_longfast:
setSimpleConfig(config_pb2.Config.LoRaConfig.ModemPreset.LONG_FAST)
if args.ch_medslow:
setSimpleConfig(config_pb2.Config.LoRaConfig.ModemPreset.MEDIUM_SLOW)
if args.ch_medfast:
setSimpleConfig(config_pb2.Config.LoRaConfig.ModemPreset.MEDIUM_FAST)
if args.ch_shortslow:
setSimpleConfig(config_pb2.Config.LoRaConfig.ModemPreset.SHORT_SLOW)
if args.ch_shortfast:
setSimpleConfig(config_pb2.Config.LoRaConfig.ModemPreset.SHORT_FAST)
if args.ch_set or args.ch_enable or args.ch_disable:
closeNow = True
channelIndex = mt_config.channel_index
if channelIndex is None:
meshtastic.util.our_exit("Warning: Need to specify '--ch-index'.", 1)
node = interface.getNode(args.dest, **getNode_kwargs)
ch = node.channels[channelIndex]
if args.ch_enable or args.ch_disable:
print(
"Warning: --ch-enable and --ch-disable can produce noncontiguous channels, "
"which can cause errors in some clients. Whenever possible, use --ch-add and --ch-del instead."
)
if channelIndex == 0:
meshtastic.util.our_exit(
"Warning: Cannot enable/disable PRIMARY channel."
)
enable = True # default to enable
if args.ch_enable:
enable = True
if args.ch_disable:
enable = False
# Handle the channel settings
for pref in args.ch_set or []:
if pref[0] == "psk":
found = True
ch.settings.psk = meshtastic.util.fromPSK(pref[1])
else:
found = setPref(ch.settings, pref[0], pref[1])
if not found:
category_settings = ["module_settings"]
print(
f"{ch.settings.__class__.__name__} does not have an attribute {pref[0]}."
)
print("Choices are...")
for field in ch.settings.DESCRIPTOR.fields:
if field.name not in category_settings:
print(f"{field.name}")
else:
print(f"{field.name}:")
config = ch.settings.DESCRIPTOR.fields_by_name.get(
field.name
)
names = []
for sub_field in config.message_type.fields:
tmp_name = f"{field.name}.{sub_field.name}"
names.append(tmp_name)
for temp_name in sorted(names):
print(f" {temp_name}")
enable = True # If we set any pref, assume the user wants to enable the channel
if enable:
ch.role = (
channel_pb2.Channel.Role.PRIMARY
if (channelIndex == 0)
else channel_pb2.Channel.Role.SECONDARY
)
else:
ch.role = channel_pb2.Channel.Role.DISABLED
print(f"Writing modified channels to device")
node.writeChannel(channelIndex)
if args.get_canned_message:
closeNow = True
print("")
messages = interface.getNode(args.dest, **getNode_kwargs).get_canned_message()
print(f"canned_plugin_message:{messages}")
if args.get_ringtone:
closeNow = True
print("")
ringtone = interface.getNode(args.dest, **getNode_kwargs).get_ringtone()
print(f"ringtone:{ringtone}")
if args.info:
print("")
# If we aren't trying to talk to our local node, don't show it
if args.dest == BROADCAST_ADDR:
interface.showInfo()
print("")
interface.getNode(args.dest, **getNode_kwargs).showInfo()
closeNow = True
print("")
pypi_version = meshtastic.util.check_if_newer_version()
if pypi_version:
print(
f"*** A newer version v{pypi_version} is available!"
' Consider running "pip install --upgrade meshtastic" ***\n'
)
else:
print("Showing info of remote node is not supported.")
print(
"Use the '--get' command for a specific configuration (e.g. 'lora') instead."
)
if args.get:
closeNow = True
node = interface.getNode(args.dest, False, **getNode_kwargs)
for pref in args.get:
found = getPref(node, pref[0])
if found:
print("Completed getting preferences")
if args.nodes:
closeNow = True
if args.dest != BROADCAST_ADDR:
print("Showing node list of a remote node is not supported.")
return
interface.showNodes(True, args.show_fields)
if args.show_fields and not args.nodes:
print("--show-fields can only be used with --nodes")
return
if args.qr or args.qr_all:
closeNow = True
url = interface.getNode(args.dest, True, **getNode_kwargs).getURL(includeAll=args.qr_all)
if args.qr_all:
urldesc = "Complete URL (includes all channels)"
else:
urldesc = "Primary channel URL"
print(f"{urldesc}: {url}")
if pyqrcode is not None:
qr = pyqrcode.create(url)
print(qr.terminal())
else:
print("Install pyqrcode to view a QR code printed to terminal.")
log_set: Optional = None # type: ignore[annotation-unchecked]
# we need to keep a reference to the logset so it doesn't get GCed early
if args.slog or args.power_stress:
if have_powermon:
# Setup loggers
global meter # pylint: disable=global-variable-not-assigned
log_set = LogSet(
interface, args.slog if args.slog != "default" else None, meter
)
if args.power_stress:
stress = PowerStress(interface)
stress.run()
closeNow = True # exit immediately after stress test
else:
meshtastic.util.our_exit("The powermon module could not be loaded. "
"You may need to run `poetry install --with powermon`. "
"Import Error was: " + powermon_exception)
if args.listen:
closeNow = False
have_tunnel = platform.system() == "Linux"
if have_tunnel and args.tunnel:
if args.dest != BROADCAST_ADDR:
print("A tunnel can only be created using the local node.")
return
# pylint: disable=C0415
from . import tunnel
# Even if others said we could close, stay open if the user asked for a tunnel
closeNow = False
if interface.noProto:
logger.warning(f"Not starting Tunnel - disabled by noProto")
else:
if args.tunnel_net:
tunnel.Tunnel(interface, subnet=args.tunnel_net)
else:
tunnel.Tunnel(interface)
if args.ack or (args.dest != BROADCAST_ADDR and waitForAckNak):
print(
f"Waiting for an acknowledgment from remote node (this could take a while)"
)
interface.getNode(args.dest, False, **getNode_kwargs).iface.waitForAckNak()
if args.wait_to_disconnect:
print(f"Waiting {args.wait_to_disconnect} seconds before disconnecting")
time.sleep(int(args.wait_to_disconnect))
# if the user didn't ask for serial debugging output, we might want to exit after we've done our operation
if (not args.seriallog) and closeNow:
interface.close() # after running command then exit
# Close any structured logs after we've done all of our API operations
if log_set:
log_set.close()
except Exception as ex:
print(f"Aborting due to: {ex}")
interface.close() # close the connection now, so that our app exits
sys.exit(1)
def printConfig(config) -> None:
"""print configuration"""
objDesc = config.DESCRIPTOR
for config_section in objDesc.fields:
if config_section.name != "version":
config = objDesc.fields_by_name.get(config_section.name)
print(f"{config_section.name}:")
names = []
for field in config.message_type.fields:
tmp_name = f"{config_section.name}.{field.name}"
if mt_config.camel_case:
tmp_name = meshtastic.util.snake_to_camel(tmp_name)
names.append(tmp_name)
for temp_name in sorted(names):
print(f" {temp_name}")
def onNode(node) -> None:
"""Callback invoked when the node DB changes"""
print(f"Node changed: {node}")
def subscribe() -> None:
"""Subscribe to the topics the user probably wants to see, prints output to stdout"""
pub.subscribe(onReceive, "meshtastic.receive")
# pub.subscribe(onConnection, "meshtastic.connection")
# We now call onConnected from main
# pub.subscribe(onConnected, "meshtastic.connection.established")
# pub.subscribe(onNode, "meshtastic.node")
def set_missing_flags_false(config_dict: dict, true_defaults: set[tuple[str, str]]) -> None:
"""Ensure that missing default=True keys are present in the config_dict and set to False."""
for path in true_defaults:
d = config_dict
for key in path[:-1]:
if key not in d or not isinstance(d[key], dict):
d[key] = {}
d = d[key]
if path[-1] not in d:
d[path[-1]] = False
def export_config(interface) -> str:
"""used in --export-config"""
configObj = {}
# A list of configuration keys that should be set to False if they are missing
config_true_defaults = {
("bluetooth", "enabled"),
("lora", "sx126xRxBoostedGain"),
("lora", "txEnabled"),
("lora", "usePreset"),
("position", "positionBroadcastSmartEnabled"),
("security", "serialEnabled"),
}
module_true_defaults = {
("mqtt", "encryptionEnabled"),
}
owner = interface.getLongName()
owner_short = interface.getShortName()
channel_url = interface.localNode.getURL()
myinfo = interface.getMyNodeInfo()
canned_messages = interface.getCannedMessage()
ringtone = interface.getRingtone()
pos = myinfo.get("position")
lat = None
lon = None
alt = None
if pos:
lat = pos.get("latitude")
lon = pos.get("longitude")
alt = pos.get("altitude")
if owner:
configObj["owner"] = owner
if owner_short:
configObj["owner_short"] = owner_short
if channel_url:
if mt_config.camel_case:
configObj["channelUrl"] = channel_url
else:
configObj["channel_url"] = channel_url
if canned_messages:
configObj["canned_messages"] = canned_messages
if ringtone:
configObj["ringtone"] = ringtone
# lat and lon don't make much sense without the other (so fill with 0s), and alt isn't meaningful without both
if lat or lon:
configObj["location"] = {"lat": lat or float(0), "lon": lon or float(0)}
if alt:
configObj["location"]["alt"] = alt
config = MessageToDict(interface.localNode.localConfig) #checkme - Used as a dictionary here and a string below
#was used as a string here and a Dictionary above
if config:
# Convert inner keys to correct snake/camelCase
prefs = {}
for pref in config:
if mt_config.camel_case:
prefs[meshtastic.util.snake_to_camel(pref)] = config[pref]
else:
prefs[pref] = config[pref]
# mark base64 encoded fields as such
if pref == "security":
if 'privateKey' in prefs[pref]:
prefs[pref]['privateKey'] = 'base64:' + prefs[pref]['privateKey']
if 'publicKey' in prefs[pref]:
prefs[pref]['publicKey'] = 'base64:' + prefs[pref]['publicKey']
if 'adminKey' in prefs[pref]:
for i in range(len(prefs[pref]['adminKey'])):
prefs[pref]['adminKey'][i] = 'base64:' + prefs[pref]['adminKey'][i]
if mt_config.camel_case:
configObj["config"] = config #Identical command here and 2 lines below?
else:
configObj["config"] = config
set_missing_flags_false(configObj["config"], config_true_defaults)
module_config = MessageToDict(interface.localNode.moduleConfig)
if module_config:
# Convert inner keys to correct snake/camelCase
prefs = {}
for pref in module_config:
if len(module_config[pref]) > 0:
prefs[pref] = module_config[pref]
if mt_config.camel_case:
configObj["module_config"] = prefs
else:
configObj["module_config"] = prefs
set_missing_flags_false(configObj["module_config"], module_true_defaults)
config_txt = "# start of Meshtastic configure yaml\n" #checkme - "config" (now changed to config_out)
#was used as a string here and a Dictionary above
config_txt += yaml.dump(configObj)
return config_txt
def create_power_meter():
"""Setup the power meter."""
global meter # pylint: disable=global-statement
args = mt_config.args
# If the user specified a voltage, make sure it is valid
v = 0.0
if args.power_voltage:
v = float(args.power_voltage)
if v < 0.8 or v > 5.0:
meshtastic.util.our_exit("Voltage must be between 0.8 and 5.0")
if args.power_riden:
meter = RidenPowerSupply(args.power_riden)
elif args.power_ppk2_supply or args.power_ppk2_meter:
meter = PPK2PowerSupply()
assert v > 0, "Voltage must be specified for PPK2"
meter.v = v # PPK2 requires setting voltage before selecting supply mode
meter.setIsSupply(args.power_ppk2_supply)
elif args.power_sim:
meter = SimPowerSupply()
if meter and v:
logger.info(f"Setting power supply to {v} volts")
meter.v = v
meter.powerOn()
if args.power_wait:
input("Powered on, press enter to continue...")
else:
logger.info("Powered-on, waiting for device to boot")
time.sleep(5)
def common():
"""Shared code for all of our command line wrappers."""
logfile = None
args = mt_config.args
parser = mt_config.parser
logging.basicConfig(
level=logging.DEBUG if (args.debug or args.listen) else logging.INFO,
format="%(levelname)s file:%(filename)s %(funcName)s line:%(lineno)s %(message)s",
)
# set all meshtastic loggers to DEBUG
if not (args.debug or args.listen) and args.debuglib:
logging.getLogger('meshtastic').setLevel(logging.DEBUG)
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
meshtastic.util.our_exit("", 1)
else:
if args.support:
meshtastic.util.support_info()
meshtastic.util.our_exit("", 0)
# Early validation for owner names before attempting device connection
if hasattr(args, 'set_owner') and args.set_owner is not None:
stripped_long_name = args.set_owner.strip()
if not stripped_long_name:
meshtastic.util.our_exit("ERROR: Long Name cannot be empty or contain only whitespace characters")
if hasattr(args, 'set_owner_short') and args.set_owner_short is not None:
stripped_short_name = args.set_owner_short.strip()
if not stripped_short_name:
meshtastic.util.our_exit("ERROR: Short Name cannot be empty or contain only whitespace characters")
if hasattr(args, 'set_ham') and args.set_ham is not None:
stripped_ham_name = args.set_ham.strip()
if not stripped_ham_name:
meshtastic.util.our_exit("ERROR: Ham radio callsign cannot be empty or contain only whitespace characters")
if have_powermon:
create_power_meter()
if args.ch_index is not None:
channelIndex = int(args.ch_index)
mt_config.channel_index = channelIndex
if not args.dest:
args.dest = BROADCAST_ADDR
if not args.seriallog:
if args.noproto:
args.seriallog = "stdout"
else:
args.seriallog = "none" # assume no debug output in this case
if args.deprecated is not None:
logger.error(
"This option has been deprecated, see help below for the correct replacement..."
)
parser.print_help(sys.stderr)
meshtastic.util.our_exit("", 1)
elif args.test:
if not have_test:
meshtastic.util.our_exit("Test module could not be important. Ensure you have the 'dotmap' module installed.")
else:
result = meshtastic.test.testAll()
if not result:
meshtastic.util.our_exit("Warning: Test was not successful.")
else:
meshtastic.util.our_exit("Test was a success.", 0)
else:
if args.seriallog == "stdout":
logfile = sys.stdout
elif args.seriallog == "none":
args.seriallog = None
logger.debug("Not logging serial output")
logfile = None
else:
logger.info(f"Logging serial output to {args.seriallog}")
# Note: using "line buffering"
# pylint: disable=R1732
logfile = open(args.seriallog, "w+", buffering=1, encoding="utf8")
mt_config.logfile = logfile
subscribe()
if args.ble_scan:
logger.debug("BLE scan starting")
for x in BLEInterface.scan():
print(f"Found: name='{x.name}' address='{x.address}'")
meshtastic.util.our_exit("BLE scan finished", 0)
elif args.ble:
client = BLEInterface(
args.ble if args.ble != "any" else None,
debugOut=logfile,
noProto=args.noproto,
noNodes=args.no_nodes,
timeout=args.timeout,
)
elif args.host:
try:
if ":" in args.host:
tcp_hostname, tcp_port = args.host.split(':')
else:
tcp_hostname = args.host
tcp_port = meshtastic.tcp_interface.DEFAULT_TCP_PORT
client = meshtastic.tcp_interface.TCPInterface(
tcp_hostname,
portNumber=tcp_port,
debugOut=logfile,
noProto=args.noproto,
noNodes=args.no_nodes,
timeout=args.timeout,
)
except Exception as ex:
meshtastic.util.our_exit(f"Error connecting to {args.host}:{ex}", 1)
else:
try:
client = meshtastic.serial_interface.SerialInterface(
args.port,
debugOut=logfile,
noProto=args.noproto,
noNodes=args.no_nodes,
timeout=args.timeout,
)
except FileNotFoundError:
# Handle the case where the serial device is not found
message = (
f"File Not Found Error:\n"
)
message += f" The serial device at '{args.port}' was not found.\n"
message += " Please check the following:\n"
message += " 1. Is the device connected properly?\n"
message += " 2. Is the correct serial port specified?\n"
message += " 3. Are the necessary drivers installed?\n"
message += " 4. Are you using a **power-only USB cable**? A power-only cable cannot transmit data.\n"
message += " Ensure you are using a **data-capable USB cable**.\n"
meshtastic.util.our_exit(message, 1)
except PermissionError as ex:
username = os.getlogin()
message = "Permission Error:\n"
message += (
" Need to add yourself to the 'dialout' group by running:\n"
)
message += f" sudo usermod -a -G dialout {username}\n"
message += " After running that command, log out and re-login for it to take effect.\n"
message += f"Error was:{ex}"
meshtastic.util.our_exit(message)
except OSError as ex:
message = f"OS Error:\n"
message += " The serial device couldn't be opened, it might be in use by another process.\n"
message += " Please close any applications or webpages that may be using the device and try again.\n"
message += f"\nOriginal error: {ex}"
meshtastic.util.our_exit(message)
if client.devPath is None:
try:
client = meshtastic.tcp_interface.TCPInterface(
"localhost",
debugOut=logfile,
noProto=args.noproto,
noNodes=args.no_nodes,
timeout=args.timeout,
)
except Exception as ex:
meshtastic.util.our_exit(
f"Error connecting to localhost:{ex}", 1
)
# We assume client is fully connected now
onConnected(client)
have_tunnel = platform.system() == "Linux"
if (
args.noproto
or args.reply
or (have_tunnel and args.tunnel)
or args.listen
): # loop until someone presses ctrlc
try:
while True:
time.sleep(1000)
except KeyboardInterrupt:
logger.info("Exiting due to keyboard interrupt")
# don't call exit, background threads might be running still
# sys.exit(0)
def addConnectionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""Add connection specification arguments"""
outer = parser.add_argument_group(
"Connection",
"Optional arguments that specify how to connect to a Meshtastic device.",
)
group = outer.add_mutually_exclusive_group()
group.add_argument(
"--port",
"--serial",
"-s",
help="The port of the device to connect to using serial, e.g. /dev/ttyUSB0. (defaults to trying to detect a port)",
nargs="?",
const=None,
default=None,
)
group.add_argument(
"--host",
"--tcp",
"-t",
help="Connect to a device using TCP, optionally passing hostname or IP address to use. (defaults to '%(const)s')",
nargs="?",
default=None,
const="localhost",
)
group.add_argument(
"--ble",
"-b",
help="Connect to a BLE device, optionally specifying a device name (defaults to '%(const)s')",
nargs="?",
default=None,
const="any",
)
outer.add_argument(
"--ble-scan",
help="Scan for Meshtastic BLE devices that may be available to connect to",
action="store_true",
)
return parser
def addSelectionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""Add node/channel specification arguments"""
group = parser.add_argument_group(
"Selection",
"Arguments that select channels to use, destination nodes, etc."
)
group.add_argument(
"--dest",
help="The destination node id for any sent commands. If not set '^all' or '^local' is assumed."
"Use the node ID with a '!' or '0x' prefix or the node number.",
default=None,
metavar="!xxxxxxxx",
)
group.add_argument(
"--ch-index",
help="Set the specified channel index for channel-specific commands. Channels start at 0 (0 is the PRIMARY channel).",
action="store",
metavar="INDEX",
)
return parser
def addImportExportArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""Add import/export config arguments"""
group = parser.add_argument_group(
"Import/Export",
"Arguments that concern importing and exporting configuration of Meshtastic devices",
)
group.add_argument(
"--configure",
help="Specify a path to a yaml(.yml) file containing the desired settings for the connected device.",
action="append",
)
group.add_argument(
"--export-config",
nargs="?",
const="-", # default to "-" if no value provided
metavar="FILE",
help="Export device config as YAML (to stdout if no file given)"
)
return parser
def addConfigArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""Add arguments to do with configuring a device"""
group = parser.add_argument_group(
"Configuration",
"Arguments that concern general configuration of Meshtastic devices",
)
group.add_argument(
"--get",
help=(
"Get a preferences field. Use an invalid field such as '0' to get a list of all fields."
" Can use either snake_case or camelCase format. (ex: 'power.ls_secs' or 'power.lsSecs')"
),
nargs=1,
action="append",
metavar="FIELD"
)
group.add_argument(
"--set",
help=(
"Set a preferences field. Can use either snake_case or camelCase format."
" (ex: 'power.ls_secs' or 'power.lsSecs'). May be less reliable when"
" setting properties from more than one configuration section."
),
nargs=2,
action="append",
metavar=("FIELD", "VALUE"),
)
group.add_argument(
"--begin-edit",
help="Tell the node to open a transaction to edit settings",
action="store_true",
)
group.add_argument(
"--commit-edit",
help="Tell the node to commit open settings transaction",
action="store_true",
)
group.add_argument(
"--get-canned-message",
help="Show the canned message plugin message",
action="store_true",
)
group.add_argument(
"--set-canned-message",
help="Set the canned messages plugin message (up to 200 characters).",
action="store",
)
group.add_argument(
"--get-ringtone", help="Show the stored ringtone", action="store_true"
)
group.add_argument(
"--set-ringtone",
help="Set the Notification Ringtone (up to 230 characters).",
action="store",
metavar="RINGTONE",
)
group.add_argument(
"--ch-vlongslow",
help="Change to the very long-range and slow modem preset",
action="store_true",
)
group.add_argument(
"--ch-longslow",
help="Change to the long-range and slow modem preset",
action="store_true",
)
group.add_argument(
"--ch-longfast",
help="Change to the long-range and fast modem preset",
action="store_true",
)
group.add_argument(
"--ch-medslow",
help="Change to the med-range and slow modem preset",
action="store_true",
)
group.add_argument(
"--ch-medfast",
help="Change to the med-range and fast modem preset",
action="store_true",
)
group.add_argument(
"--ch-shortslow",
help="Change to the short-range and slow modem preset",
action="store_true",
)
group.add_argument(
"--ch-shortfast",
help="Change to the short-range and fast modem preset",
action="store_true",
)
group.add_argument("--set-owner", help="Set device owner name", action="store")
group.add_argument(
"--set-owner-short", help="Set device owner short name", action="store"
)
group.add_argument(
"--set-ham", help="Set licensed Ham ID and turn off encryption", action="store"
)
group.add_argument(
"--set-is-unmessageable", "--set-is-unmessagable",
help="Set if a node is messageable or not", action="store"
)
group.add_argument(
"--ch-set-url", "--seturl",
help="Set all channels and set LoRa config from a supplied URL",
metavar="URL",
action="store"
)
group.add_argument(
"--ch-add-url",
help="Add secondary channels and set LoRa config from a supplied URL",
metavar="URL",
default=None,
)
return parser
def addChannelConfigArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""Add arguments to do with configuring channels"""
group = parser.add_argument_group(
"Channel Configuration",
"Arguments that concern configuration of channels",
)
group.add_argument(
"--ch-add",
help="Add a secondary channel, you must specify a channel name",
default=None,
)
group.add_argument(
"--ch-del", help="Delete the ch-index channel", action="store_true"
)
group.add_argument(
"--ch-set",
help=(
"Set a channel parameter. To see channel settings available:'--ch-set all all --ch-index 0'. "
"Can set the 'psk' using this command. To disable encryption on primary channel:'--ch-set psk none --ch-index 0'. "
"To set encryption with a new random key on second channel:'--ch-set psk random --ch-index 1'. "
"To set encryption back to the default:'--ch-set psk default --ch-index 0'. To set encryption with your "
"own key: '--ch-set psk 0x1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b --ch-index 0'."
),
nargs=2,
action="append",
metavar=("FIELD", "VALUE"),
)
group.add_argument(
"--channel-fetch-attempts",
help=("Attempt to retrieve channel settings for --ch-set this many times before giving up. Default %(default)s."),
default=3,
type=int,
metavar="ATTEMPTS",
)
group.add_argument(
"--qr",
help=(
"Display a QR code for the node's primary channel (or all channels with --qr-all). "
"Also shows the shareable channel URL."
),
action="store_true",
)
group.add_argument(
"--qr-all",
help="Display a QR code and URL for all of the node's channels.",
action="store_true",
)
group.add_argument(
"--ch-enable",
help="Enable the specified channel. Use --ch-add instead whenever possible.",
action="store_true",
dest="ch_enable",
default=False,
)
# Note: We are doing a double negative here (Do we want to disable? If ch_disable==True, then disable.)
group.add_argument(
"--ch-disable",
help="Disable the specified channel Use --ch-del instead whenever possible.",
action="store_true",
dest="ch_disable",
default=False,
)
return parser
def addPositionConfigArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""Add arguments to do with fixed positions and position config"""
group = parser.add_argument_group(
"Position Configuration",
"Arguments that modify fixed position and other position-related configuration.",
)
group.add_argument(
"--setalt",
help="Set device altitude in meters (allows use without GPS), and enable fixed position. "
"When providing positions with `--setlat`, `--setlon`, and `--setalt`, missing values will be set to 0.",
)
group.add_argument(
"--setlat",
help="Set device latitude (allows use without GPS), and enable fixed position. Accepts a decimal value or an integer premultiplied by 1e7. "
"When providing positions with `--setlat`, `--setlon`, and `--setalt`, missing values will be set to 0.",
)
group.add_argument(
"--setlon",
help="Set device longitude (allows use without GPS), and enable fixed position. Accepts a decimal value or an integer premultiplied by 1e7. "
"When providing positions with `--setlat`, `--setlon`, and `--setalt`, missing values will be set to 0.",
)
group.add_argument(
"--remove-position",
help="Clear any existing fixed position and disable fixed position.",
action="store_true",
)
group.add_argument(
"--pos-fields",
help="Specify fields to send when sending a position. Use no argument for a list of valid values. "
"Can pass multiple values as a space separated list like "
"this: '--pos-fields ALTITUDE HEADING SPEED'",
nargs="*",
action="store",
)
return parser
def addLocalActionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""Add arguments concerning local-only information & actions"""
group = parser.add_argument_group(
"Local Actions",
"Arguments that take actions or request information from the local node only.",
)
group.add_argument(
"--info",
help="Read and display the radio config information",
action="store_true",
)
group.add_argument(
"--nodes",
help="Print Node List in a pretty formatted table",
action="store_true",
)
group.add_argument(
"--show-fields",
help="Specify fields to show (comma-separated) when using --nodes",
type=lambda s: s.split(','),
default=None
)
return parser
def addRemoteActionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""Add arguments concerning information & actions that may interact with the mesh"""
group = parser.add_argument_group(
"Remote Actions",
"Arguments that take actions or request information from either the local node or remote nodes via the mesh.",
)
group.add_argument(
"--sendtext",
help="Send a text message. Can specify a destination '--dest', use of PRIVATE_APP port '--private', and/or channel index '--ch-index'.",
metavar="TEXT",
)
group.add_argument(
"--private",
help="Optional argument for sending text messages to the PRIVATE_APP port. Use in combination with --sendtext.",
action="store_true"
)
group.add_argument(
"--traceroute",
help="Traceroute from connected node to a destination. "
"You need pass the destination ID as argument, like "
"this: '--traceroute !ba4bf9d0' | '--traceroute 0xba4bf9d0'"
"Only nodes with a shared channel can be traced.",
metavar="!xxxxxxxx",
)
group.add_argument(
"--request-telemetry",
help="Request telemetry from a node. With an argument, requests that specific type of telemetry. "
"You need to pass the destination ID as argument with '--dest'. "
"For repeaters, the nodeNum is required.",
action="store",
nargs="?",
default=None,
const="device",
metavar="TYPE",
)
group.add_argument(
"--request-position",
help="Request the position from a node. "
"You need to pass the destination ID as an argument with '--dest'. "
"For repeaters, the nodeNum is required.",
action="store_true",
)
group.add_argument(
"--reply", help="Reply to received messages", action="store_true"
)
return parser
def addRemoteAdminArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""Add arguments concerning admin actions that may interact with the mesh"""
outer = parser.add_argument_group(
"Remote Admin Actions",
"Arguments that interact with local node or remote nodes via the mesh, requiring admin access.",
)
group = outer.add_mutually_exclusive_group()
group.add_argument(
"--reboot", help="Tell the destination node to reboot", action="store_true"
)
group.add_argument(
"--reboot-ota",
help="Tell the destination node to reboot into factory firmware (ESP32)",
action="store_true",
)
group.add_argument(
"--enter-dfu",
help="Tell the destination node to enter DFU mode (NRF52)",
action="store_true",
)
group.add_argument(
"--shutdown", help="Tell the destination node to shutdown", action="store_true"
)
group.add_argument(
"--device-metadata",
help="Get the device metadata from the node",
action="store_true",
)
group.add_argument(
"--factory-reset", "--factory-reset-config",
help="Tell the destination node to install the default config, preserving BLE bonds & PKI keys",
action="store_true",
)
group.add_argument(
"--factory-reset-device",
help="Tell the destination node to install the default config and clear BLE bonds & PKI keys",
action="store_true",
)
group.add_argument(
"--remove-node",
help="Tell the destination node to remove a specific node from its NodeDB. "
"Use the node ID with a '!' or '0x' prefix or the node number.",
metavar="!xxxxxxxx"
)
group.add_argument(
"--set-favorite-node",
help="Tell the destination node to set the specified node to be favorited on the NodeDB. "
"Use the node ID with a '!' or '0x' prefix or the node number.",
metavar="!xxxxxxxx"
)
group.add_argument(
"--remove-favorite-node",
help="Tell the destination node to set the specified node to be un-favorited on the NodeDB. "
"Use the node ID with a '!' or '0x' prefix or the node number.",
metavar="!xxxxxxxx"
)
group.add_argument(
"--set-ignored-node",
help="Tell the destination node to set the specified node to be ignored on the NodeDB. "
"Use the node ID with a '!' or '0x' prefix or the node number.",
metavar="!xxxxxxxx"
)
group.add_argument(
"--remove-ignored-node",
help="Tell the destination node to set the specified node to be un-ignored on the NodeDB. "
"Use the node ID with a '!' or '0x' prefix or the node number.",
metavar="!xxxxxxxx"
)
group.add_argument(
"--reset-nodedb",
help="Tell the destination node to clear its list of nodes",
action="store_true",
)
group.add_argument(
"--set-time",
help="Set the time to the provided unix epoch timestamp, or the system's current time if omitted or 0.",
action="store",
type=int,
nargs="?",
default=None,
const=0,
metavar="TIMESTAMP",
)
return parser
def initParser():
"""Initialize the command line argument parsing."""
parser = mt_config.parser
args = mt_config.args
# The "Help" group includes the help option and other informational stuff about the CLI itself
outerHelpGroup = parser.add_argument_group("Help")
helpGroup = outerHelpGroup.add_mutually_exclusive_group()
helpGroup.add_argument(
"-h", "--help", action="help", help="show this help message and exit"
)
the_version = get_active_version()
helpGroup.add_argument("--version", action="version", version=f"{the_version}")
helpGroup.add_argument(
"--support",
action="store_true",
help="Show support info (useful when troubleshooting an issue)",
)
# Connection arguments to indicate a device to connect to
parser = addConnectionArgs(parser)
# Selection arguments to denote nodes and channels to use
parser = addSelectionArgs(parser)
# Arguments concerning viewing and setting configuration
parser = addImportExportArgs(parser)
parser = addConfigArgs(parser)
parser = addPositionConfigArgs(parser)
parser = addChannelConfigArgs(parser)
# Arguments for sending or requesting things from the local device
parser = addLocalActionArgs(parser)
# Arguments for sending or requesting things from the mesh
parser = addRemoteActionArgs(parser)
parser = addRemoteAdminArgs(parser)
# All the rest of the arguments
group = parser.add_argument_group("Miscellaneous arguments")
group.add_argument(
"--seriallog",
help="Log device serial output to either 'none' or a filename to append to. Defaults to '%(const)s' if no filename specified.",
nargs="?",
const="stdout",
default=None,
metavar="LOG_DESTINATION",
)
group.add_argument(
"--ack",
help="Use in combination with compatible actions (e.g. --sendtext) to wait for an acknowledgment.",
action="store_true",
)
group.add_argument(
"--timeout",
help="How long to wait for replies. Default %(default)ss.",
default=300,
type=int,
metavar="SECONDS",
)
group.add_argument(
"--no-nodes",
help="Request that the node not send node info to the client. "
"Will break things that depend on the nodedb, but will speed up startup. Requires 2.3.11+ firmware.",
action="store_true",
)
group.add_argument(
"--debug", help="Show API library debug log messages", action="store_true"
)
group.add_argument(
"--debuglib", help="Show only API library debug log messages", action="store_true"
)
group.add_argument(
"--test",
help="Run stress test against all connected Meshtastic devices",
action="store_true",
)
group.add_argument(
"--wait-to-disconnect",
help="How many seconds to wait before disconnecting from the device.",
const="5",
nargs="?",
action="store",
metavar="SECONDS",
)
group.add_argument(
"--noproto",
help="Don't start the API, just function as a dumb serial terminal.",
action="store_true",
)
group.add_argument(
"--listen",
help="Just stay open and listen to the protobuf stream. Enables debug logging.",
action="store_true",
)
group.add_argument(
"--no-time",
help="Deprecated. Retained for backwards compatibility in scripts, but is a no-op.",
action="store_true",
)
power_group = parser.add_argument_group(
"Power Testing", "Options for power testing/logging."
)
power_supply_group = power_group.add_mutually_exclusive_group()
power_supply_group.add_argument(
"--power-riden",
help="Talk to a Riden power-supply. You must specify the device path, i.e. /dev/ttyUSBxxx",
)
power_supply_group.add_argument(
"--power-ppk2-meter",
help="Talk to a Nordic Power Profiler Kit 2 (in meter mode)",
action="store_true",
)
power_supply_group.add_argument(
"--power-ppk2-supply",
help="Talk to a Nordic Power Profiler Kit 2 (in supply mode)",
action="store_true",
)
power_supply_group.add_argument(
"--power-sim",
help="Use a simulated power meter (for development)",
action="store_true",
)
power_group.add_argument(
"--power-voltage",
help="Set the specified voltage on the power-supply. Be VERY careful, you can burn things up.",
)
power_group.add_argument(
"--power-stress",
help="Perform power monitor stress testing, to capture a power consumption profile for the device (also requires --power-mon)",
action="store_true",
)
power_group.add_argument(
"--power-wait",
help="Prompt the user to wait for device reset before looking for device serial ports (some boards kill power to USB serial port)",
action="store_true",
)
power_group.add_argument(
"--slog",
help="Store structured-logs (slogs) for this run, optionally you can specify a destination directory",
nargs="?",
default=None,
const="default",
)
remoteHardwareArgs = parser.add_argument_group(
"Remote Hardware", "Arguments related to the Remote Hardware module"
)
remoteHardwareArgs.add_argument(
"--gpio-wrb", nargs=2, help="Set a particular GPIO # to 1 or 0", action="append"
)
remoteHardwareArgs.add_argument(
"--gpio-rd", help="Read from a GPIO mask (ex: '0x10')"
)
remoteHardwareArgs.add_argument(
"--gpio-watch", help="Start watching a GPIO mask for changes (ex: '0x10')"
)
have_tunnel = platform.system() == "Linux"
if have_tunnel:
tunnelArgs = parser.add_argument_group(
"Tunnel", "Arguments related to establishing a tunnel device over the mesh."
)
tunnelArgs.add_argument(
"--tunnel",
action="store_true",
help="Create a TUN tunnel device for forwarding IP packets over the mesh",
)
tunnelArgs.add_argument(
"--subnet",
dest="tunnel_net",
help="Sets the local-end subnet address for the TUN IP bridge. (ex: 10.115' which is the default)",
default=None,
)
parser.set_defaults(deprecated=None)
if argcomplete is not None:
argcomplete.autocomplete(parser)
args = parser.parse_args()
mt_config.args = args
mt_config.parser = parser
def main():
"""Perform command line meshtastic operations"""
parser = argparse.ArgumentParser(
add_help=False,
epilog="If no connection arguments are specified, we search for a compatible serial device, "
"and if none is found, then attempt a TCP connection to localhost.",
)
mt_config.parser = parser
initParser()
common()
logfile = mt_config.logfile
if logfile:
logfile.close()
def tunnelMain():
"""Run a meshtastic IP tunnel"""
parser = argparse.ArgumentParser(add_help=False)
mt_config.parser = parser
initParser()
args = mt_config.args
args.tunnel = True
mt_config.args = args
common()
if __name__ == "__main__":
main()