Files
python/meshtastic/__main__.py

266 lines
9.2 KiB
Python

#!python3
import argparse
from . import SerialInterface, TCPInterface, BLEInterface, test
import logging
import sys
from pubsub import pub
import google.protobuf.json_format
import pyqrcode
import traceback
import codecs
import base64
"""The command line arguments"""
args = None
def onReceive(packet, interface):
"""Callback invoked when a packet arrives"""
print(f"Received: {packet}")
try:
# Exit once we receive a reply
if args.sendtext and packet["to"] == interface.myInfo.my_node_num:
interface.close() # after running command then exit
# Reply to every received message with some stats
if args.reply:
if packet['decoded']['data'] is not None:
msg = packet['decoded']['data']['text']
#shortName = packet['decoded']['data']['shortName']
rxSnr = packet['rxSnr']
hopLimit = packet['hopLimit']
print(f"message: {msg}")
reply = "got msg \'{}\' with rxSnr: {} and hopLimit: {}".format(
msg, rxSnr, hopLimit)
print("Sending reply: ", reply)
interface.sendText(reply)
except Exception as ex:
print(ex)
def onConnection(interface, topic=pub.AUTO_TOPIC):
"""Callback invoked when we connect/disconnect from a radio"""
print(f"Connection changed: {topic.getName()}")
def fromStr(valstr):
"""try to parse as int, float or bool (and fallback to a string as last resort)
Returns: an int, bool, float or str
Args:
valstr (string): A user provided string
"""
try:
val = int(valstr)
except ValueError:
try:
val = float(valstr)
except ValueError:
trueTerms = {"t", "true", "yes"}
falseTerms = {"f", "false", "no"}
if valstr in trueTerms:
val = True
elif valstr in falseTerms:
val = False
else:
val = valstr # Try to treat the parameter as a string
return val
def onConnected(interface):
"""Callback invoked when we connect to a radio"""
global args
print("Connected to radio")
closeNow = False # Should we drop the connection after we finish?
try:
if args.settime:
print("Setting device RTC time")
# can include lat/long/alt etc: latitude = 37.5, longitude = -122.1
interface.sendPosition()
if args.sendtext:
print(f"Sending text message {args.sendtext} to {args.dest}")
interface.sendText(args.sendtext, args.dest,
wantAck=True, wantResponse=True)
if args.set or args.setstr or args.setchan or args.seturl:
closeNow = True
def setPref(attributes, name, val):
"""Set a preferences value"""
print(f"Setting {name} to {val}")
try:
try:
setattr(attributes, name, val)
except TypeError as ex:
# The setter didn't like our arg type - try again as a byte array (so we can convert strings to bytearray)
if isinstance(val, str):
setattr(attributes, name,
codecs.decode(val, "hex"))
else:
print(f"Incorrect type for {name} {ex}")
except Exception as ex:
print(f"Can't set {name} due to {ex}")
# Handle the int/float/bool arguments
for pref in (args.set or []):
setPref(
interface.radioConfig.preferences, pref[0], fromStr(pref[1]))
# Handle the string arguments
for pref in (args.setstr or []):
setPref(interface.radioConfig.preferences, pref[0], pref[1])
# Handle the channel settings
for pref in (args.setchan or []):
setPref(interface.radioConfig.channel_settings,
pref[0], fromStr(pref[1]))
# Handle set URL
if args.seturl:
# URLs are of the form https://www.meshtastic.org/c/#{base64_channel_settings}
# Split on '/#' to find the base64 encoded channel settings
splitURL = args.seturl.split("/#")
bytes = base64.urlsafe_b64decode(splitURL[-1])
interface.radioConfig.channel_settings.ParseFromString(bytes)
print("Writing modified preferences to device")
interface.writeConfig()
if args.info:
closeNow = True
print(interface.myInfo)
print(interface.radioConfig)
print(f"Channel URL {interface.channelURL}")
print("Nodes in mesh:")
for n in interface.nodes.values():
print(n)
if args.qr:
closeNow = True
print(f"Channel URL {interface.channelURL}")
url = pyqrcode.create(interface.channelURL)
print(url.terminal())
except Exception as ex:
print(ex)
# if the user didn't ask for serial debugging output, we might want to exit after we've done our operation
if (not args.seriallog) and closeNow:
interface.close() # after running command then exit
def onNode(node):
"""Callback invoked when the node DB changes"""
print(f"Node changed: {node}")
def subscribe():
"""Subscribe to the topics the user probably wants to see, prints output to stdout"""
pub.subscribe(onReceive, "meshtastic.receive")
# pub.subscribe(onConnection, "meshtastic.connection")
pub.subscribe(onConnected, "meshtastic.connection.established")
pub.subscribe(onNode, "meshtastic.node")
def main():
"""Perform command line meshtastic operations"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--port",
help="The port the Meshtastic device is connected to, i.e. /dev/ttyUSB0. If unspecified, we'll try to find it.",
default=None)
parser.add_argument(
"--host",
help="The hostname/ipaddr of the device to connect to (over TCP)",
default=None)
parser.add_argument(
"--seriallog",
help="Log device serial output to either 'stdout', 'none' or a filename to append to. Defaults to stdout.")
parser.add_argument("--info", help="Read and display the radio config information",
action="store_true")
parser.add_argument("--qr", help="Display the QR code that corresponds to the current channel",
action="store_true")
parser.add_argument(
"--set", help="Set a numeric preferences field", nargs=2, action='append')
parser.add_argument(
"--setstr", help="Set a string preferences field", nargs=2, action='append')
parser.add_argument(
"--setchan", help="Set a channel parameter", nargs=2, action='append')
parser.add_argument(
"--seturl", help="Set a channel URL", action="store")
parser.add_argument(
"--dest", help="The destination node id for the --send commands, if not set '^all' is assumed", default="^all")
parser.add_argument(
"--sendtext", help="Send a text message")
parser.add_argument(
"--reply", help="Reply to received messages",
action="store_true")
parser.add_argument(
"--settime", help="Set the real time clock on the device", action="store_true")
parser.add_argument("--debug", help="Show API library debug log messages",
action="store_true")
parser.add_argument("--test", help="Run stress test against all connected Meshtastic devices",
action="store_true")
parser.add_argument("--ble", help="BLE mac address to connect to (BLE is not yet supported for this tool)",
default=None)
parser.add_argument("--noproto", help="Don't start the API, just function as a dumb serial terminal.",
action="store_true")
global args
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO)
if not args.seriallog:
if args.info or args.set or args.setstr or args.setchan or args.sendtext or args.qr:
args.seriallog = "none" # assume no debug output in this case
else:
args.seriallog = "stdout" # default to stdout
if args.test:
test.testAll()
else:
if args.seriallog == "stdout":
logfile = sys.stdout
elif args.seriallog == "none":
args.seriallog = None
logging.debug("Not logging serial output")
logfile = None
else:
logging.info(f"Logging serial output to {args.seriallog}")
logfile = open(args.seriallog, 'w+', buffering=1) # line buffering
subscribe()
if args.ble:
client = BLEInterface(args.ble, debugOut=logfile)
elif args.host:
client = TCPInterface(
args.host, debugOut=logfile, noProto=args.noproto)
else:
client = SerialInterface(
args.port, debugOut=logfile, noProto=args.noproto)
if __name__ == "__main__":
main()