Files
python/meshtastic/__main__.py
2021-03-14 10:49:10 +08:00

611 lines
21 KiB
Python

#!python3
import argparse
import platform
import logging, sys, codecs, base64, os
from . import SerialInterface, TCPInterface, BLEInterface, test, remote_hardware
from pubsub import pub
from . import mesh_pb2, portnums_pb2, channel_pb2
from .util import stripnl
import google.protobuf.json_format
import pyqrcode
import traceback
import pkg_resources
from datetime import datetime
import timeago
from easy_table import EasyTable
"""We only import the tunnel code if we are on a platform that can run it"""
have_tunnel = platform.system() == 'Linux'
"""The command line arguments"""
args = None
"""The parser for arguments"""
parser = argparse.ArgumentParser()
channelIndex = 0
def onReceive(packet, interface):
"""Callback invoked when a packet arrives"""
try:
d = packet.get('decoded')
# Exit once we receive a reply
if args.sendtext and packet["to"] == interface.myInfo.my_node_num and d["portnum"] == portnums_pb2.PortNum.TEXT_MESSAGE_APP:
interface.close() # after running command then exit
# Reply to every received message with some stats
if args.reply:
msg = d.get('text')
if msg:
#shortName = packet['decoded']['data']['shortName']
rxSnr = packet['rxSnr']
hopLimit = packet['hopLimit']
print(f"message: {msg}")
reply = "got msg \'{}\' with rxSnr: {} and hopLimit: {}".format(
msg, rxSnr, hopLimit)
print("Sending reply: ", reply)
interface.sendText(reply)
except Exception as ex:
print(ex)
def onConnection(interface, topic=pub.AUTO_TOPIC):
"""Callback invoked when we connect/disconnect from a radio"""
print(f"Connection changed: {topic.getName()}")
trueTerms = {"t", "true", "yes"}
falseTerms = {"f", "false", "no"}
def genPSKS256():
return os.urandom(32)
def fromPSK(valstr):
"""A special version of fromStr that assumes the user is trying to set a PSK.
In that case we also allow "none", "default" or "random" (to have python generate one), or simpleN
"""
if valstr == "random":
return genPSK256()
elif valstr == "none":
return bytes([0]) # Use the 'no encryption' PSK
elif valstr == "default":
return bytes([1]) # Use default channel psk
elif valstr.startswith("simple"):
return bytes([int(valstr[6:])]) # Use one of the single byte encodings
else:
return fromStr(valstr)
def fromStr(valstr):
"""try to parse as int, float or bool (and fallback to a string as last resort)
Returns: an int, bool, float, str or byte array (for strings of hex digits)
Args:
valstr (string): A user provided string
"""
if(len(valstr) == 0): # Treat an emptystring as an empty bytes
val = bytes()
elif(valstr.startswith('0x')):
# if needed convert to string with asBytes.decode('utf-8')
val = bytes.fromhex(valstr[2:])
elif valstr in trueTerms:
val = True
elif valstr in falseTerms:
val = False
else:
try:
val = int(valstr)
except ValueError:
try:
val = float(valstr)
except ValueError:
val = valstr # Not a float or an int, assume string
return val
never = 0xffffffff
oneday = 24 * 60 * 60
# Returns formatted value
def formatFloat(value, formatStr="{:.2f}", unit="", default="N/A"):
return formatStr.format(value)+unit if value else default
# Returns Last Heard Time in human readable format
def getLH(ts, default="N/A"):
return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') if ts else default
# Returns time ago for the last heard
def getTimeAgo(ts, default="N/A"):
return timeago.format(datetime.fromtimestamp(ts), datetime.now()) if ts else default
# Print Nodes
def printNodes(nodes, myId):
# Create the table and define the structure
table = EasyTable("Nodes")
table.setCorners("/", "\\", "\\", "/")
table.setOuterStructure("|", "-")
table.setInnerStructure("|", "-", "+")
tableData = []
for node in nodes:
if node['user']['id'] == myId:
continue
# aux var to get not defined keys
lat = formatFloat(node['position'].get("latitude"), "{:.4f}", "°")
lon = formatFloat(node['position'].get("longitude"), "{:.4f}", "°")
alt = formatFloat(node['position'].get("altitude"), "{:.0f}", " m")
batt = formatFloat(node['position'].get("batteryLevel"), "{:.2f}", "%")
snr = formatFloat(node.get("snr"), "{:.2f}", " dB")
LH = getLH(node['position'].get("time"))
timeAgo = getTimeAgo(node['position'].get("time"))
tableData.append({"N": 0, "User": node['user']['longName'],
"AKA": node['user']['shortName'], "ID": node['user']['id'],
"Position": lat+", "+lon+", "+alt,
"Battery": batt, "SNR": snr,
"LastHeard": LH, "Since": timeAgo})
Rows = sorted(tableData, key=lambda k: k['LastHeard'], reverse=True)
RowsOk = sorted(Rows, key=lambda k: k['LastHeard'].startswith("N/A"))
for i in range(len(RowsOk)):
RowsOk[i]['N'] = i+1
table.setData(RowsOk)
table.displayTable()
def setPref(attributes, name, valStr):
"""Set a channel or preferences value"""
val = fromStr(valStr)
try:
try:
setattr(attributes, name, val)
except TypeError as ex:
# The setter didn't like our arg type guess try again as a string
setattr(attributes, name, valStr)
# succeeded!
print(f"Set {name} to {valStr}")
except Exception as ex:
print(f"Can't set {name} due to {ex}")
targetNode = None
def onConnected(interface):
"""Callback invoked when we connect to a radio"""
closeNow = False # Should we drop the connection after we finish?
try:
global args
print("Connected to radio")
def getNode():
"""This operation could be expensive, so we try to cache the results"""
global targetNode
if not targetNode:
targetNode = interface.getNode(args.destOrLocal)
return targetNode
if args.settime or args.setlat or args.setlon or args.setalt:
closeNow = True
alt = 0
lat = 0.0
lon = 0.0
time = 0
prefs = interface.localNode.radioConfig.preferences
if args.settime:
time = int(args.settime)
if args.setalt:
alt = int(args.setalt)
prefs.fixed_position = True
print(f"Fixing altitude at {alt} meters")
if args.setlat:
lat = float(args.setlat)
prefs.fixed_position = True
print(f"Fixing latitude at {lat} degrees")
if args.setlon:
lon = float(args.setlon)
prefs.fixed_position = True
print(f"Fixing longitude at {lon} degrees")
print("Setting device time/position")
# can include lat/long/alt etc: latitude = 37.5, longitude = -122.1
interface.sendPosition(lat, lon, alt, time)
interface.localNode.writeConfig()
if args.set_owner:
closeNow = True
print(f"Setting device owner to {args.set_owner}")
getNode().setOwner(args.set_owner)
if args.set_ham:
closeNow = True
print(f"Setting HAM ID to {args.set_ham} and turning off encryption")
getNode().setOwner(args.set_ham)
ch = getNode().channels[0] # Must turn off crypt on primary channel
ch.settings.psk = fromPSK("none")
print(f"Writing modified channels to device")
getNode().writeChannel(0)
if args.sendtext:
closeNow = True
print(f"Sending text message {args.sendtext} to {args.destOrAll}")
interface.sendText(args.sendtext, args.destOrAll,
wantAck=True)
if args.sendping:
print(f"Sending ping message {args.sendtext} to {args.destOrAll}")
payload = str.encode("test string")
interface.sendData(payload, args.destOrAll, portNum=portnums_pb2.PortNum.REPLY_APP,
wantAck=True, wantResponse=True)
if args.gpio_wrb or args.gpio_rd or args.gpio_watch:
rhc = remote_hardware.RemoteHardwareClient(interface)
if args.gpio_wrb:
bitmask = 0
bitval = 0
for wrpair in (args.gpio_wrb or []):
bitmask |= 1 << int(wrpair[0])
bitval |= int(wrpair[1]) << int(wrpair[0])
print(
f"Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}")
rhc.writeGPIOs(args.dest, bitmask, bitval)
if args.gpio_rd:
bitmask = int(args.gpio_rd, 16)
print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}")
rhc.readGPIOs(args.dest, bitmask)
if args.gpio_watch:
bitmask = int(args.gpio_watch, 16)
print(f"Watching GPIO mask 0x{bitmask:x} from {args.dest}")
rhc.watchGPIOs(args.dest, bitmask)
# handle settings
if args.set:
closeNow = True
prefs = getNode().radioConfig.preferences
# Handle the int/float/bool arguments
for pref in args.set:
setPref(
prefs, pref[0], pref[1])
print("Writing modified preferences to device")
getNode().writeConfig()
if args.seturl:
closeNow = True
getNode().setURL(args.seturl)
# handle changing channels
if args.ch_add:
closeNow = True
n = getNode()
ch = n.getChannelByName(args.ch_add)
if ch:
logging.error(f"This node already has a '{args.ch_add}' channel - no changes.")
else:
ch = n.getDisabledChannel()
if not ch:
raise Exception("No free channels were found")
chs = channel_pb2.ChannelSettings()
chs.psk = genPSKS256()
chs.name = args.ch_add
ch.settings.CopyFrom(chs)
ch.role = channel_pb2.Channel.Role.SECONDARY
print(f"Writing modified channels to device")
n.writeChannel(ch.index)
if args.setchan or args.setch_longslow or args.setch_shortfast:
closeNow = True
ch = getNode().channels[channelIndex]
enable = args.ch_enable # should we enable this channel?
if args.setch_longslow or args.setch_shortfast:
if channelIndex != 0:
raise Exception("standard channel settings can only be applied to the PRIMARY channel")
enable = True # force enable
def setSimpleChannel(modem_config):
"""Set one of the simple modem_config only based channels"""
# Completely new channel settings
chs = channel_pb2.ChannelSettings()
chs.modem_config = modem_config
chs.psk = bytes([1]) # Use default channel psk 1
ch.settings.CopyFrom(chs)
# handle the simple channel set commands
if args.setch_longslow:
setSimpleChannel(
channel_pb2.ChannelSettings.ModemConfig.Bw125Cr48Sf4096)
if args.setch_shortfast:
setSimpleChannel(
channel_pb2.ChannelSettings.ModemConfig.Bw500Cr45Sf128)
# Handle the channel settings
for pref in (args.setchan or []):
if pref[0] == "psk":
ch.settings.psk =fromPSK(pref[1])
else:
setPref(ch.settings, pref[0], pref[1])
enable = True # If we set any pref, assume the user wants to enable the channel
if enable:
ch.role = channel_pb2.Channel.Role.PRIMARY if (channelIndex == 0) else channel_pb2.Channel.Role.SECONDARY
else:
ch.role = channel_pb2.Channel.Role.DISABLED
print(f"Writing modified channels to device")
getNode().writeChannel(channelIndex)
if args.info:
if not args.dest: # If we aren't trying to talk to our local node, don't show it
interface.showInfo()
getNode().showInfo()
closeNow = True # FIXME, for now we leave the link up while talking to remote nodes
if args.nodes:
closeNow = True
printNodes(interface.nodes.values(),
interface.getMyNodeInfo()['user']['id'])
if args.qr:
closeNow = True
print(f"Channel URL {interface.channelURL}")
url = pyqrcode.create(interface.channelURL)
print(url.terminal())
if have_tunnel and args.tunnel:
from . import tunnel
# Even if others said we could close, stay open if the user asked for a tunnel
closeNow = False
tunnel.Tunnel(interface, subnet=args.tunnel_net)
except Exception as ex:
print(f"Exception while handling connection: {ex}")
# if the user didn't ask for serial debugging output, we might want to exit after we've done our operation
if (not args.seriallog) and closeNow:
interface.close() # after running command then exit
def onNode(node):
"""Callback invoked when the node DB changes"""
print(f"Node changed: {node}")
def subscribe():
"""Subscribe to the topics the user probably wants to see, prints output to stdout"""
pub.subscribe(onReceive, "meshtastic.receive")
# pub.subscribe(onConnection, "meshtastic.connection")
pub.subscribe(onConnected, "meshtastic.connection.established")
# pub.subscribe(onNode, "meshtastic.node")
def common():
"""Shared code for all of our command line wrappers"""
global args
logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO)
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)
else:
if args.ch_index is not None:
global channelIndex
channelIndex = int(args.ch_index)
# Some commands require dest to be set, so we now use destOrAll/destOrLocal for more lenient commands
if not args.dest:
args.destOrAll = "^all"
args.destOrLocal = "^local"
else:
args.destOrAll = args.dest
args.destOrLocal = args.dest # FIXME, temp hack for debugging remove
if not args.seriallog:
if args.info or args.nodes or args.set or args.seturl or args.set_owner or args.setlat or args.setlon or \
args.settime or \
args.setch_longslow or args.setch_shortfast or args.setchan or args.sendtext or \
args.qr or args.ch_add or args.set_ham:
args.seriallog = "none" # assume no debug output in this case
else:
args.seriallog = "stdout" # default to stdout
if args.router != None:
logging.error(
'--set-router has been deprecated. Use "--set is_router true" or "--set is_router false" instead')
elif args.test:
test.testAll()
else:
if args.seriallog == "stdout":
logfile = sys.stdout
elif args.seriallog == "none":
args.seriallog = None
logging.debug("Not logging serial output")
logfile = None
else:
logging.info(f"Logging serial output to {args.seriallog}")
logfile = open(args.seriallog, 'w+',
buffering=1) # line buffering
subscribe()
if args.ble:
client = BLEInterface(args.ble, debugOut=logfile)
elif args.host:
client = TCPInterface(
args.host, debugOut=logfile, noProto=args.noproto)
else:
client = SerialInterface(
args.port, debugOut=logfile, noProto=args.noproto)
# don't call exit, background threads might be running still
# sys.exit(0)
def initParser():
global parser, args
parser.add_argument(
"--port",
help="The port the Meshtastic device is connected to, i.e. /dev/ttyUSB0. If unspecified, we'll try to find it.",
default=None)
parser.add_argument(
"--host",
help="The hostname/ipaddr of the device to connect to (over TCP)",
default=None)
parser.add_argument(
"--seriallog",
help="Log device serial output to either 'stdout', 'none' or a filename to append to.")
parser.add_argument("--info", help="Read and display the radio config information",
action="store_true")
parser.add_argument("--nodes", help="Print Node List in a pretty formatted table",
action="store_true")
parser.add_argument("--qr", help="Display the QR code that corresponds to the current channel",
action="store_true")
parser.add_argument(
"--set", help="Set a preferences field", nargs=2, action='append')
parser.add_argument(
"--setchan", help="Set a channel parameter", nargs=2, action='append')
parser.add_argument(
"--setch-longslow", help="Change to the standard long-range (but slow) channel", action='store_true')
parser.add_argument(
"--setch-shortfast", help="Change to the standard fast (but short range) channel", action='store_true')
parser.add_argument(
"--seturl", help="Set a channel URL", action="store")
parser.add_argument(
"--ch-index", help="Set the specified channel index", action="store")
parser.add_argument(
"--ch-add", help="Add a secondary channel, you must specify a channel name", default=None)
parser.add_argument(
"--ch-enable", help="Enable the specified channel", action="store_true", dest="ch_enable")
parser.add_argument(
"--ch-disable", help="Disable the specified channel", action="store_false", dest="ch_enable")
parser.add_argument(
"--set-owner", help="Set device owner name", action="store")
parser.add_argument(
"--set-ham", help="Set licensed HAM ID and turn off encryption", action="store")
parser.add_argument(
"--dest", help="The destination node id for any sent commands, if not set '^all' or '^local' is assumed as appropriate", default=None)
parser.add_argument(
"--sendtext", help="Send a text message")
parser.add_argument(
"--sendping", help="Send a ping message (which requests a reply)", action="store_true")
# parser.add_argument(
# "--repeat", help="Normally the send commands send only one message, use this option to request repeated sends")
parser.add_argument(
"--reply", help="Reply to received messages",
action="store_true")
parser.add_argument(
"--gpio-wrb", nargs=2, help="Set a particlar GPIO # to 1 or 0", action='append')
parser.add_argument(
"--gpio-rd", help="Read from a GPIO mask")
parser.add_argument(
"--gpio-watch", help="Start watching a GPIO mask for changes")
parser.add_argument(
"--settime", help="Set the real time clock on the device", action="store_true")
parser.add_argument(
"--setalt", help="Set device altitude (allows use without GPS)")
parser.add_argument(
"--setlat", help="Set device latitude (allows use without GPS)")
parser.add_argument(
"--setlon", help="Set device longitude (allows use without GPS)")
parser.add_argument("--debug", help="Show API library debug log messages",
action="store_true")
parser.add_argument("--test", help="Run stress test against all connected Meshtastic devices",
action="store_true")
parser.add_argument("--ble", help="BLE mac address to connect to (BLE is not yet supported for this tool)",
default=None)
parser.add_argument("--noproto", help="Don't start the API, just function as a dumb serial terminal.",
action="store_true")
parser.add_argument('--set-router', dest='router',
action='store_true', help="Deprecated, use --set router true instead")
parser.add_argument('--unset-router', dest='router',
action='store_false', help="Deprecated, use --set router false instead")
if have_tunnel:
parser.add_argument('--tunnel',
action='store_true', help="Create a TUN tunnel device for forwarding IP packets over the mesh")
parser.add_argument(
"--subnet", dest='tunnel_net', help="Read from a GPIO mask", default=None)
parser.set_defaults(router=None)
parser.add_argument('--version', action='version',
version=f"{pkg_resources.require('meshtastic')[0].version}")
args = parser.parse_args()
def main():
"""Perform command line meshtastic operations"""
initParser()
common()
def tunnelMain():
"""Run a meshtastic IP tunnel"""
global args
initParser()
args.tunnel = True
common()
if __name__ == "__main__":
main()