Module meshtastic.__main__
-Main Meshtastic
--Expand source code -
-#!python3
-""" Main Meshtastic
-"""
-
-import argparse
-import platform
-import logging
-import sys
-import time
-import yaml
-from pubsub import pub
-import pyqrcode
-import pkg_resources
-import meshtastic.util
-import meshtastic.test
-from . import remote_hardware
-from . import portnums_pb2, channel_pb2, radioconfig_pb2
-from .globals import Globals
-
-
-have_tunnel = platform.system() == 'Linux'
-"""We only import the tunnel code if we are on a platform that can run it. """
-
-def onReceive(packet, interface):
- """Callback invoked when a packet arrives"""
- our_globals = Globals.getInstance()
- args = our_globals.get_args()
- try:
- d = packet.get('decoded')
- logging.debug(f'in onReceive() d:{d}')
-
- # Exit once we receive a reply
- if args and args.sendtext and packet["to"] == interface.myInfo.my_node_num and d["portnum"] == portnums_pb2.PortNum.TEXT_MESSAGE_APP:
- interface.close() # after running command then exit
-
- # Reply to every received message with some stats
- if args and args.reply:
- msg = d.get('text')
- if msg:
- rxSnr = packet['rxSnr']
- hopLimit = packet['hopLimit']
- print(f"message: {msg}")
- reply = "got msg \'{}\' with rxSnr: {} and hopLimit: {}".format(msg, rxSnr, hopLimit)
- print("Sending reply: ", reply)
- interface.sendText(reply)
-
- except Exception as ex:
- print(ex)
-
-
-def onConnection(interface, topic=pub.AUTO_TOPIC):
- """Callback invoked when we connect/disconnect from a radio"""
- print(f"Connection changed: {topic.getName()}")
-
-
-def getPref(attributes, name):
- """Get a channel or preferences value"""
-
- objDesc = attributes.DESCRIPTOR
- field = objDesc.fields_by_name.get(name)
- if not field:
- print(f"{attributes.__class__.__name__} does not have an attribute called {name}, so you can not get it.")
- print(f"Choices in sorted order are:")
- names = []
- for f in objDesc.fields:
- names.append(f'{f.name}')
- for temp_name in sorted(names):
- print(f" {temp_name}")
- return
-
- # okay - try to read the value
- try:
- try:
- val = getattr(attributes, name)
- except TypeError:
- # The getter didn't like our arg type guess try again as a string
- val = getattr(attributes, name)
-
- # succeeded!
- print(f"{name}: {str(val)}")
- except Exception as ex:
- print(f"Can't get {name} due to {ex}")
-
-
-def setPref(attributes, name, valStr):
- """Set a channel or preferences value"""
-
- objDesc = attributes.DESCRIPTOR
- field = objDesc.fields_by_name.get(name)
- if not field:
- print(f"{attributes.__class__.__name__} does not have an attribute called {name}, so you can not set it.")
- print(f"Choices in sorted order are:")
- names = []
- for f in objDesc.fields:
- names.append(f'{f.name}')
- for temp_name in sorted(names):
- print(f" {temp_name}")
- return
-
- val = meshtastic.util.fromStr(valStr)
-
- enumType = field.enum_type
- # pylint: disable=C0123
- if enumType and type(val) == str:
- # We've failed so far to convert this string into an enum, try to find it by reflection
- e = enumType.values_by_name.get(val)
- if e:
- val = e.number
- else:
- print(f"{name} does not have an enum called {val}, so you can not set it.")
- print(f"Choices in sorted order are:")
- names = []
- for f in enumType.values:
- names.append(f'{f.name}')
- for temp_name in sorted(names):
- print(f" {temp_name}")
- return
-
- # okay - try to read the value
- try:
- try:
- setattr(attributes, name, val)
- except TypeError:
- # The setter didn't like our arg type guess try again as a string
- setattr(attributes, name, valStr)
-
- # succeeded!
- print(f"Set {name} to {valStr}")
- except Exception as ex:
- print(f"Can't set {name} due to {ex}")
-
-
-def onConnected(interface):
- """Callback invoked when we connect to a radio"""
- closeNow = False # Should we drop the connection after we finish?
- try:
- our_globals = Globals.getInstance()
- args = our_globals.get_args()
-
- print("Connected to radio")
-
- def getNode():
- """This operation could be expensive, so we try to cache the results"""
- targetNode = our_globals.get_target_node()
- if not targetNode:
- targetNode = interface.getNode(args.destOrLocal)
- our_globals.set_target_node(targetNode)
- return targetNode
-
- if args.setlat or args.setlon or args.setalt:
- closeNow = True
-
- alt = 0
- lat = 0.0
- lon = 0.0
- prefs = interface.localNode.radioConfig.preferences
- if args.setalt:
- alt = int(args.setalt)
- prefs.fixed_position = True
- print(f"Fixing altitude at {alt} meters")
- if args.setlat:
- lat = float(args.setlat)
- prefs.fixed_position = True
- print(f"Fixing latitude at {lat} degrees")
- if args.setlon:
- lon = float(args.setlon)
- prefs.fixed_position = True
- print(f"Fixing longitude at {lon} degrees")
-
- print("Setting device position")
- # can include lat/long/alt etc: latitude = 37.5, longitude = -122.1
- interface.sendPosition(lat, lon, alt)
- interface.localNode.writeConfig()
- elif not args.no_time:
- # We normally provide a current time to the mesh when we connect
- interface.sendPosition()
-
- if args.set_owner:
- closeNow = True
- print(f"Setting device owner to {args.set_owner}")
- getNode().setOwner(args.set_owner)
-
- if args.pos_fields:
- # If --pos-fields invoked with args, set position fields
- closeNow = True
- prefs = getNode().radioConfig.preferences
- allFields = 0
-
- try:
- for field in args.pos_fields:
- v_field = radioconfig_pb2.PositionFlags.Value(field)
- allFields |= v_field
-
- except ValueError:
- print("ERROR: supported position fields are:")
- print(radioconfig_pb2.PositionFlags.keys())
- print("If no fields are specified, will read and display current value.")
-
- else:
- print(f"Setting position fields to {allFields}")
- setPref(prefs, 'position_flags', ('%d' % allFields))
- print("Writing modified preferences to device")
- getNode().writeConfig()
-
- elif args.pos_fields is not None:
- # If --pos-fields invoked without args, read and display current value
- closeNow = True
- prefs = getNode().radioConfig.preferences
-
- fieldNames = []
- for bit in radioconfig_pb2.PositionFlags.values():
- if prefs.position_flags & bit:
- fieldNames.append(radioconfig_pb2.PositionFlags.Name(bit))
- print(' '.join(fieldNames))
-
- if args.set_team:
- closeNow = True
- try:
- v_team = meshtastic.mesh_pb2.Team.Value(args.set_team.upper())
- except ValueError:
- v_team = 0
- print(f"ERROR: Team \'{args.set_team}\' not found.")
- print("Try a team name from the sorted list below, or use 'CLEAR' for unaffiliated:")
- print(sorted(meshtastic.mesh_pb2.Team.keys()))
- else:
- print(f"Setting team to {meshtastic.mesh_pb2.Team.Name(v_team)}")
- getNode().setOwner(team=v_team)
-
- if args.set_ham:
- closeNow = True
- print(f"Setting Ham ID to {args.set_ham} and turning off encryption")
- getNode().setOwner(args.set_ham, is_licensed=True)
- # Must turn off encryption on primary channel
- getNode().turnOffEncryptionOnPrimaryChannel()
-
- if args.reboot:
- closeNow = True
- getNode().reboot()
-
- if args.sendtext:
- closeNow = True
- channelIndex = 0
- if args.ch_index is not None:
- channelIndex = int(args.ch_index)
- ch = getNode().getChannelByChannelIndex(channelIndex)
- if ch and ch.role != channel_pb2.Channel.Role.DISABLED:
- print(f"Sending text message {args.sendtext} to {args.destOrAll} on channelIndex:{channelIndex}")
- interface.sendText(args.sendtext, args.destOrAll, wantAck=True, channelIndex=channelIndex)
- else:
- meshtastic.util.our_exit(f"Warning: {channelIndex} is not a valid channel. Channel must not be DISABLED.")
-
- if args.sendping:
- payload = str.encode("test string")
- print(f"Sending ping message to {args.destOrAll}")
- interface.sendData(payload, args.destOrAll, portNum=portnums_pb2.PortNum.REPLY_APP,
- wantAck=True, wantResponse=True)
-
- if args.gpio_wrb or args.gpio_rd or args.gpio_watch:
- rhc = remote_hardware.RemoteHardwareClient(interface)
-
- if args.gpio_wrb:
- bitmask = 0
- bitval = 0
- for wrpair in (args.gpio_wrb or []):
- bitmask |= 1 << int(wrpair[0])
- bitval |= int(wrpair[1]) << int(wrpair[0])
- print(f"Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}")
- rhc.writeGPIOs(args.dest, bitmask, bitval)
- closeNow = True
-
- if args.gpio_rd:
- bitmask = int(args.gpio_rd, 16)
- print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}")
- interface.mask = bitmask
- rhc.readGPIOs(args.dest, bitmask, None)
- if not interface.noProto:
- # wait up to X seconds for a response
- for _ in range(10):
- time.sleep(1)
- if interface.gotResponse:
- break
- logging.debug(f'end of gpio_rd')
-
- if args.gpio_watch:
- bitmask = int(args.gpio_watch, 16)
- print(f"Watching GPIO mask 0x{bitmask:x} from {args.dest}. Press ctrl-c to exit")
- while True:
- rhc.watchGPIOs(args.dest, bitmask)
- time.sleep(1)
-
- # handle settings
- if args.set:
- closeNow = True
- prefs = getNode().radioConfig.preferences
-
- # Handle the int/float/bool arguments
- for pref in args.set:
- setPref(prefs, pref[0], pref[1])
-
- print("Writing modified preferences to device")
- getNode().writeConfig()
-
- if args.configure:
- with open(args.configure[0], encoding='utf8') as file:
- configuration = yaml.safe_load(file)
- closeNow = True
-
- if 'owner' in configuration:
- print(f"Setting device owner to {configuration['owner']}")
- getNode().setOwner(configuration['owner'])
-
- if 'channel_url' in configuration:
- print("Setting channel url to", configuration['channel_url'])
- getNode().setURL(configuration['channel_url'])
-
- if 'location' in configuration:
- alt = 0
- lat = 0.0
- lon = 0.0
- prefs = interface.localNode.radioConfig.preferences
-
- if 'alt' in configuration['location']:
- alt = int(configuration['location']['alt'])
- prefs.fixed_position = True
- print(f"Fixing altitude at {alt} meters")
- if 'lat' in configuration['location']:
- lat = float(configuration['location']['lat'])
- prefs.fixed_position = True
- print(f"Fixing latitude at {lat} degrees")
- if 'lon' in configuration['location']:
- lon = float(configuration['location']['lon'])
- prefs.fixed_position = True
- print(f"Fixing longitude at {lon} degrees")
- print("Setting device position")
- interface.sendPosition(lat, lon, alt)
- interface.localNode.writeConfig()
-
- if 'user_prefs' in configuration:
- prefs = getNode().radioConfig.preferences
- for pref in configuration['user_prefs']:
- setPref(prefs, pref, str(configuration['user_prefs'][pref]))
- print("Writing modified preferences to device")
- getNode().writeConfig()
-
- if args.export_config:
- # export the configuration (the opposite of '--configure')
- closeNow = True
- export_config(interface)
-
- if args.seturl:
- closeNow = True
- getNode().setURL(args.seturl)
-
- # handle changing channels
-
- if args.ch_add:
- closeNow = True
- if len(args.ch_add) > 10:
- meshtastic.util.our_exit("Warning: Channel name must be shorter. Channel not added.")
- n = getNode()
- ch = n.getChannelByName(args.ch_add)
- if ch:
- meshtastic.util.our_exit(f"Warning: This node already has a '{args.ch_add}' channel. No changes were made.")
- else:
- # get the first channel that is disabled (i.e., available)
- ch = n.getDisabledChannel()
- if not ch:
- meshtastic.util.our_exit("Warning: No free channels were found")
- chs = channel_pb2.ChannelSettings()
- chs.psk = meshtastic.util.genPSK256()
- chs.name = args.ch_add
- ch.settings.CopyFrom(chs)
- ch.role = channel_pb2.Channel.Role.SECONDARY
- print(f"Writing modified channels to device")
- n.writeChannel(ch.index)
-
- if args.ch_del:
- closeNow = True
-
- channelIndex = our_globals.get_channel_index()
- if channelIndex is None:
- meshtastic.util.our_exit("Warning: Need to specify '--ch-index' for '--ch-del'.", 1)
- else:
- if channelIndex == 0:
- meshtastic.util.our_exit("Warning: Cannot delete primary channel.", 1)
- else:
- print(f"Deleting channel {channelIndex}")
- ch = getNode().deleteChannel(channelIndex)
-
- ch_changes = [args.ch_longslow, args.ch_longfast,
- args.ch_mediumslow, args.ch_mediumfast,
- args.ch_shortslow, args.ch_shortfast]
- any_primary_channel_changes = any(x for x in ch_changes)
- if args.ch_set or any_primary_channel_changes or args.ch_enable or args.ch_disable:
- closeNow = True
-
- channelIndex = our_globals.get_channel_index()
- if channelIndex is None:
- if any_primary_channel_changes:
- # we assume that they want the primary channel if they're setting range values
- channelIndex = 0
- else:
- meshtastic.util.our_exit("Warning: Need to specify '--ch-index'.", 1)
- ch = getNode().channels[channelIndex]
-
- if any_primary_channel_changes or args.ch_enable or args.ch_disable:
-
- if channelIndex == 0 and not any_primary_channel_changes:
- meshtastic.util.our_exit("Warning: Cannot enable/disable PRIMARY channel.")
-
- if channelIndex != 0:
- if any_primary_channel_changes:
- meshtastic.util.our_exit("Warning: Standard channel settings can only be applied to the PRIMARY channel")
-
- enable = True # default to enable
- if args.ch_enable:
- enable = True
- if args.ch_disable:
- enable = False
-
- def setSimpleChannel(modem_config):
- """Set one of the simple modem_config only based channels"""
-
- # Completely new channel settings
- chs = channel_pb2.ChannelSettings()
- chs.modem_config = modem_config
- chs.psk = bytes([1]) # Use default channel psk 1
-
- ch.settings.CopyFrom(chs)
-
- # handle the simple channel set commands
- if args.ch_longslow:
- setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw125Cr48Sf4096)
-
- if args.ch_longfast:
- setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw31_25Cr48Sf512)
-
- if args.ch_mediumslow:
- setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw250Cr46Sf2048)
-
- if args.ch_mediumfast:
- setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw250Cr47Sf1024)
-
- if args.ch_shortslow:
- setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw125Cr45Sf128)
-
- if args.ch_shortfast:
- setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw500Cr45Sf128)
-
- # Handle the channel settings
- for pref in (args.ch_set or []):
- if pref[0] == "psk":
- ch.settings.psk = meshtastic.util.fromPSK(pref[1])
- else:
- setPref(ch.settings, pref[0], pref[1])
- enable = True # If we set any pref, assume the user wants to enable the channel
-
- if enable:
- ch.role = channel_pb2.Channel.Role.PRIMARY if (
- channelIndex == 0) else channel_pb2.Channel.Role.SECONDARY
- else:
- ch.role = channel_pb2.Channel.Role.DISABLED
-
- print(f"Writing modified channels to device")
- getNode().writeChannel(channelIndex)
-
- if args.info:
- print("")
- if not args.dest: # If we aren't trying to talk to our local node, don't show it
- interface.showInfo()
-
- print("")
- getNode().showInfo()
- closeNow = True # FIXME, for now we leave the link up while talking to remote nodes
- print("")
-
- if args.get:
- closeNow = True
- prefs = getNode().radioConfig.preferences
-
- # Handle the int/float/bool arguments
- for pref in args.get:
- getPref(prefs, pref[0])
-
- print("Completed getting preferences")
-
- if args.nodes:
- closeNow = True
- interface.showNodes()
-
- if args.qr:
- closeNow = True
- url = interface.localNode.getURL(includeAll=False)
- print(f"Primary channel URL {url}")
- qr = pyqrcode.create(url)
- print(qr.terminal())
-
- if have_tunnel and args.tunnel:
- # pylint: disable=C0415
- from . import tunnel
- # Even if others said we could close, stay open if the user asked for a tunnel
- closeNow = False
- tunnel.Tunnel(interface, subnet=args.tunnel_net)
-
- # if the user didn't ask for serial debugging output, we might want to exit after we've done our operation
- if (not args.seriallog) and closeNow:
- interface.close() # after running command then exit
-
- except Exception as ex:
- print(f"Aborting due to: {ex}")
- interface.close() # close the connection now, so that our app exits
-
-
-def onNode(node):
- """Callback invoked when the node DB changes"""
- print(f"Node changed: {node}")
-
-
-def subscribe():
- """Subscribe to the topics the user probably wants to see, prints output to stdout"""
- pub.subscribe(onReceive, "meshtastic.receive")
- # pub.subscribe(onConnection, "meshtastic.connection")
-
- # We now call onConnected from main
- # pub.subscribe(onConnected, "meshtastic.connection.established")
-
- # pub.subscribe(onNode, "meshtastic.node")
-
-
-def export_config(interface):
- """used in--export-config"""
- owner = interface.getLongName()
- channel_url = interface.localNode.getURL()
- myinfo = interface.getMyNodeInfo()
- pos = myinfo.get('position')
- lat = None
- lon = None
- alt = None
- if pos:
- lat = pos.get('latitude')
- lon = pos.get('longitude')
- alt = pos.get('altitude')
-
- config = "# start of Meshtastic configure yaml\n"
- if owner:
- config += f"owner: {owner}\n\n"
- if channel_url:
- config += f"channel_url: {channel_url}\n\n"
- if lat or lon or alt:
- config += "location:\n"
- if lat:
- config += f" lat: {lat}\n"
- if lon:
- config += f" lon: {lon}\n"
- if alt:
- config += f" alt: {alt}\n"
- config += "\n"
- preferences = f'{interface.localNode.radioConfig.preferences}'
- prefs = preferences.splitlines()
- if prefs:
- config += "user_prefs:\n"
- for pref in prefs:
- config += f" {meshtastic.util.quoteBooleans(pref)}\n"
- print(config)
- return config
-
-
-def common():
- """Shared code for all of our command line wrappers"""
- our_globals = Globals.getInstance()
- args = our_globals.get_args()
- parser = our_globals.get_parser()
- logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO,
- format='%(levelname)s file:%(filename)s %(funcName)s line:%(lineno)s %(message)s')
-
- if len(sys.argv) == 1:
- parser.print_help(sys.stderr)
- meshtastic.util.our_exit("", 1)
- else:
- if args.support:
- meshtastic.util.support_info()
- meshtastic.util.our_exit("", 0)
-
- if args.ch_index is not None:
- channelIndex = int(args.ch_index)
- our_globals.set_channel_index(channelIndex)
-
- # Some commands require dest to be set, so we now use destOrAll/destOrLocal for more lenient commands
- if not args.dest:
- args.destOrAll = "^all"
- args.destOrLocal = "^local"
- else:
- args.destOrAll = args.dest
- args.destOrLocal = args.dest # FIXME, temp hack for debugging remove
-
- if not args.seriallog:
- if args.noproto:
- args.seriallog = "stdout"
- else:
- args.seriallog = "none" # assume no debug output in this case
-
- if args.deprecated is not None:
- logging.error(
- 'This option has been deprecated, see help below for the correct replacement...')
- parser.print_help(sys.stderr)
- meshtastic.util.our_exit('', 1)
- elif args.test:
- result = meshtastic.test.testAll()
- if not result:
- meshtastic.util.our_exit("Warning: Test was not successful.")
- else:
- meshtastic.util.our_exit("Test was a success.", 0)
- else:
- if args.seriallog == "stdout":
- logfile = sys.stdout
- elif args.seriallog == "none":
- args.seriallog = None
- logging.debug("Not logging serial output")
- logfile = None
- else:
- logging.info(f"Logging serial output to {args.seriallog}")
- # Note: using "line buffering"
- # pylint: disable=R1732
- logfile = open(args.seriallog, 'w+',
- buffering=1, encoding='utf8')
-
- subscribe()
- if args.ble:
- client = meshtastic.ble_interface.BLEInterface(args.ble, debugOut=logfile, noProto=args.noproto)
- elif args.host:
- client = meshtastic.tcp_interface.TCPInterface(
- args.host, debugOut=logfile, noProto=args.noproto)
- else:
- client = meshtastic.serial_interface.SerialInterface(
- args.port, debugOut=logfile, noProto=args.noproto)
-
- # We assume client is fully connected now
- onConnected(client)
-
- if args.noproto or (have_tunnel and args.tunnel): # loop until someone presses ctrlc
- while True:
- time.sleep(1000)
-
- # don't call exit, background threads might be running still
- # sys.exit(0)
-
-
-def initParser():
- """Initialize the command line argument parsing."""
- our_globals = Globals.getInstance()
- parser = our_globals.get_parser()
- args = our_globals.get_args()
-
- parser.add_argument(
- "--configure",
- help="Specify a path to a yaml(.yml) file containing the desired settings for the connected device.",
- action='append')
-
- parser.add_argument(
- "--export-config",
- help="Export the configuration in yaml(.yml) format.",
- action='store_true')
-
- parser.add_argument(
- "--port",
- help="The port the Meshtastic device is connected to, i.e. /dev/ttyUSB0. If unspecified, we'll try to find it.",
- default=None)
-
- parser.add_argument(
- "--host",
- help="The hostname/ipaddr of the device to connect to (over TCP)",
- default=None)
-
- parser.add_argument(
- "--seriallog",
- help="Log device serial output to either 'stdout', 'none' or a filename to append to.")
-
- parser.add_argument("--info", help="Read and display the radio config information",
- action="store_true")
-
- parser.add_argument("--nodes", help="Print Node List in a pretty formatted table",
- action="store_true")
-
- parser.add_argument("--qr", help="Display the QR code that corresponds to the current channel",
- action="store_true")
-
- parser.add_argument(
- "--get", help="Get a preferences field. Use an invalid field such as '0' to get a list of all fields.", nargs=1, action='append')
-
- parser.add_argument(
- "--set", help="Set a preferences field", nargs=2, action='append')
-
- parser.add_argument(
- "--seturl", help="Set a channel URL", action="store")
-
- parser.add_argument(
- "--ch-index", help="Set the specified channel index. Channels start at 0 (0 is the PRIMARY channel).", action="store")
-
- parser.add_argument(
- "--ch-add", help="Add a secondary channel, you must specify a channel name", default=None)
-
- parser.add_argument(
- "--ch-del", help="Delete the ch-index channel", action='store_true')
-
- parser.add_argument(
- "--ch-enable", help="Enable the specified channel", action="store_true", dest="ch_enable", default=False)
-
- # Note: We are doing a double negative here (Do we want to disable? If ch_disable==True, then disable.)
- parser.add_argument(
- "--ch-disable", help="Disable the specified channel", action="store_true", dest="ch_disable", default=False)
-
- parser.add_argument(
- "--ch-set", help="Set a channel parameter", nargs=2, action='append')
-
- parser.add_argument(
- "--ch-longslow", help="Change to the long-range and slow channel", action='store_true')
-
- parser.add_argument(
- "--ch-longfast", help="Change to the long-range and fast channel", action='store_true')
-
- parser.add_argument(
- "--ch-mediumslow", help="Change to the medium-range and slow channel", action='store_true')
-
- parser.add_argument(
- "--ch-mediumfast", help="Change to the medium-range and fast channel", action='store_true')
-
- parser.add_argument(
- "--ch-shortslow", help="Change to the short-range and slow channel", action='store_true')
-
- parser.add_argument(
- "--ch-shortfast", help="Change to the short-range and fast channel", action='store_true')
-
-
- parser.add_argument(
- "--set-owner", help="Set device owner name", action="store")
-
- parser.add_argument(
- "--set-team", help="Set team affiliation (an invalid team will list valid values)", action="store")
-
- parser.add_argument(
- "--set-ham", help="Set licensed Ham ID and turn off encryption", action="store")
-
- parser.add_argument(
- "--dest", help="The destination node id for any sent commands, if not set '^all' or '^local' is assumed as appropriate", default=None)
-
- parser.add_argument(
- "--sendtext", help="Send a text message. Can specify a destination '--dest' and/or channel index '--ch-index'.")
-
- parser.add_argument(
- "--sendping", help="Send a ping message (which requests a reply)", action="store_true")
-
- parser.add_argument(
- "--reboot", help="Tell the destination node to reboot", action="store_true")
-
- parser.add_argument(
- "--reply", help="Reply to received messages",
- action="store_true")
-
- parser.add_argument(
- "--gpio-wrb", nargs=2, help="Set a particular GPIO # to 1 or 0", action='append')
-
- parser.add_argument(
- "--gpio-rd", help="Read from a GPIO mask (ex: '0x10')")
-
- parser.add_argument(
- "--gpio-watch", help="Start watching a GPIO mask for changes (ex: '0x10')")
-
- parser.add_argument(
- "--no-time", help="Suppress sending the current time to the mesh", action="store_true")
-
- parser.add_argument(
- "--setalt", help="Set device altitude (allows use without GPS)")
-
- parser.add_argument(
- "--setlat", help="Set device latitude (allows use without GPS)")
-
- parser.add_argument(
- "--setlon", help="Set device longitude (allows use without GPS)")
-
- parser.add_argument(
- "--pos-fields", help="Specify fields to send when sending a position. Use no argument for a list of valid values. "\
- "Can pass multiple values as a space separated list like "\
- "this: '--pos-fields POS_ALTITUDE POS_ALT_MSL'",
- nargs="*", action="store")
-
- parser.add_argument("--debug", help="Show API library debug log messages",
- action="store_true")
-
- parser.add_argument("--test", help="Run stress test against all connected Meshtastic devices",
- action="store_true")
-
- parser.add_argument("--ble", help="BLE mac address to connect to (BLE is not yet supported for this tool)",
- default=None)
-
- parser.add_argument("--noproto", help="Don't start the API, just function as a dumb serial terminal.",
- action="store_true")
-
- parser.add_argument('--setchan', dest='deprecated', nargs=2, action='append',
- help='Deprecated, use "--ch-set param value" instead')
- parser.add_argument('--set-router', dest='deprecated',
- action='store_true', help='Deprecated, use "--set is_router true" instead')
- parser.add_argument('--unset-router', dest='deprecated',
- action='store_false', help='Deprecated, use "--set is_router false" instead')
-
- if have_tunnel:
- parser.add_argument('--tunnel',
- action='store_true', help="Create a TUN tunnel device for forwarding IP packets over the mesh")
- parser.add_argument(
- "--subnet", dest='tunnel_net', help="Sets the local-end subnet address for the TUN IP bridge", default=None)
-
- parser.set_defaults(deprecated=None)
-
- parser.add_argument('--version', action='version',
- version=f"{pkg_resources.require('meshtastic')[0].version}")
-
- parser.add_argument(
- "--support", action='store_true', help="Show support info (useful when troubleshooting an issue)")
-
- args = parser.parse_args()
- our_globals.set_args(args)
- our_globals.set_parser(parser)
-
-
-def main():
- """Perform command line meshtastic operations"""
- our_globals = Globals.getInstance()
- parser = argparse.ArgumentParser()
- our_globals.set_parser(parser)
- initParser()
- common()
-
-
-def tunnelMain():
- """Run a meshtastic IP tunnel"""
- our_globals = Globals.getInstance()
- parser = argparse.ArgumentParser()
- our_globals.set_parser(parser)
- initParser()
- args = our_globals.get_args()
- args.tunnel = True
- our_globals.set_args(args)
- common()
-
-
-if __name__ == "__main__":
- main()
-Global variables
--
-
var have_tunnel
--
--
We only import the tunnel code if we are on a platform that can run it.
-
Functions
--
-
-def common() -
--
--
Shared code for all of our command line wrappers
---Expand source code -
-
-def common(): - """Shared code for all of our command line wrappers""" - our_globals = Globals.getInstance() - args = our_globals.get_args() - parser = our_globals.get_parser() - logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO, - format='%(levelname)s file:%(filename)s %(funcName)s line:%(lineno)s %(message)s') - - if len(sys.argv) == 1: - parser.print_help(sys.stderr) - meshtastic.util.our_exit("", 1) - else: - if args.support: - meshtastic.util.support_info() - meshtastic.util.our_exit("", 0) - - if args.ch_index is not None: - channelIndex = int(args.ch_index) - our_globals.set_channel_index(channelIndex) - - # Some commands require dest to be set, so we now use destOrAll/destOrLocal for more lenient commands - if not args.dest: - args.destOrAll = "^all" - args.destOrLocal = "^local" - else: - args.destOrAll = args.dest - args.destOrLocal = args.dest # FIXME, temp hack for debugging remove - - if not args.seriallog: - if args.noproto: - args.seriallog = "stdout" - else: - args.seriallog = "none" # assume no debug output in this case - - if args.deprecated is not None: - logging.error( - 'This option has been deprecated, see help below for the correct replacement...') - parser.print_help(sys.stderr) - meshtastic.util.our_exit('', 1) - elif args.test: - result = meshtastic.test.testAll() - if not result: - meshtastic.util.our_exit("Warning: Test was not successful.") - else: - meshtastic.util.our_exit("Test was a success.", 0) - else: - if args.seriallog == "stdout": - logfile = sys.stdout - elif args.seriallog == "none": - args.seriallog = None - logging.debug("Not logging serial output") - logfile = None - else: - logging.info(f"Logging serial output to {args.seriallog}") - # Note: using "line buffering" - # pylint: disable=R1732 - logfile = open(args.seriallog, 'w+', - buffering=1, encoding='utf8') - - subscribe() - if args.ble: - client = meshtastic.ble_interface.BLEInterface(args.ble, debugOut=logfile, noProto=args.noproto) - elif args.host: - client = meshtastic.tcp_interface.TCPInterface( - args.host, debugOut=logfile, noProto=args.noproto) - else: - client = meshtastic.serial_interface.SerialInterface( - args.port, debugOut=logfile, noProto=args.noproto) - - # We assume client is fully connected now - onConnected(client) - - if args.noproto or (have_tunnel and args.tunnel): # loop until someone presses ctrlc - while True: - time.sleep(1000) - - # don't call exit, background threads might be running still - # sys.exit(0)
- -def export_config(interface) -
--
--
used in–export-config
---Expand source code -
-
-def export_config(interface): - """used in--export-config""" - owner = interface.getLongName() - channel_url = interface.localNode.getURL() - myinfo = interface.getMyNodeInfo() - pos = myinfo.get('position') - lat = None - lon = None - alt = None - if pos: - lat = pos.get('latitude') - lon = pos.get('longitude') - alt = pos.get('altitude') - - config = "# start of Meshtastic configure yaml\n" - if owner: - config += f"owner: {owner}\n\n" - if channel_url: - config += f"channel_url: {channel_url}\n\n" - if lat or lon or alt: - config += "location:\n" - if lat: - config += f" lat: {lat}\n" - if lon: - config += f" lon: {lon}\n" - if alt: - config += f" alt: {alt}\n" - config += "\n" - preferences = f'{interface.localNode.radioConfig.preferences}' - prefs = preferences.splitlines() - if prefs: - config += "user_prefs:\n" - for pref in prefs: - config += f" {meshtastic.util.quoteBooleans(pref)}\n" - print(config) - return config
- -def getPref(attributes, name) -
--
--
Get a channel or preferences value
---Expand source code -
-
-def getPref(attributes, name): - """Get a channel or preferences value""" - - objDesc = attributes.DESCRIPTOR - field = objDesc.fields_by_name.get(name) - if not field: - print(f"{attributes.__class__.__name__} does not have an attribute called {name}, so you can not get it.") - print(f"Choices in sorted order are:") - names = [] - for f in objDesc.fields: - names.append(f'{f.name}') - for temp_name in sorted(names): - print(f" {temp_name}") - return - - # okay - try to read the value - try: - try: - val = getattr(attributes, name) - except TypeError: - # The getter didn't like our arg type guess try again as a string - val = getattr(attributes, name) - - # succeeded! - print(f"{name}: {str(val)}") - except Exception as ex: - print(f"Can't get {name} due to {ex}")
- -def initParser() -
--
--
Initialize the command line argument parsing.
---Expand source code -
-
-def initParser(): - """Initialize the command line argument parsing.""" - our_globals = Globals.getInstance() - parser = our_globals.get_parser() - args = our_globals.get_args() - - parser.add_argument( - "--configure", - help="Specify a path to a yaml(.yml) file containing the desired settings for the connected device.", - action='append') - - parser.add_argument( - "--export-config", - help="Export the configuration in yaml(.yml) format.", - action='store_true') - - parser.add_argument( - "--port", - help="The port the Meshtastic device is connected to, i.e. /dev/ttyUSB0. If unspecified, we'll try to find it.", - default=None) - - parser.add_argument( - "--host", - help="The hostname/ipaddr of the device to connect to (over TCP)", - default=None) - - parser.add_argument( - "--seriallog", - help="Log device serial output to either 'stdout', 'none' or a filename to append to.") - - parser.add_argument("--info", help="Read and display the radio config information", - action="store_true") - - parser.add_argument("--nodes", help="Print Node List in a pretty formatted table", - action="store_true") - - parser.add_argument("--qr", help="Display the QR code that corresponds to the current channel", - action="store_true") - - parser.add_argument( - "--get", help="Get a preferences field. Use an invalid field such as '0' to get a list of all fields.", nargs=1, action='append') - - parser.add_argument( - "--set", help="Set a preferences field", nargs=2, action='append') - - parser.add_argument( - "--seturl", help="Set a channel URL", action="store") - - parser.add_argument( - "--ch-index", help="Set the specified channel index. Channels start at 0 (0 is the PRIMARY channel).", action="store") - - parser.add_argument( - "--ch-add", help="Add a secondary channel, you must specify a channel name", default=None) - - parser.add_argument( - "--ch-del", help="Delete the ch-index channel", action='store_true') - - parser.add_argument( - "--ch-enable", help="Enable the specified channel", action="store_true", dest="ch_enable", default=False) - - # Note: We are doing a double negative here (Do we want to disable? If ch_disable==True, then disable.) - parser.add_argument( - "--ch-disable", help="Disable the specified channel", action="store_true", dest="ch_disable", default=False) - - parser.add_argument( - "--ch-set", help="Set a channel parameter", nargs=2, action='append') - - parser.add_argument( - "--ch-longslow", help="Change to the long-range and slow channel", action='store_true') - - parser.add_argument( - "--ch-longfast", help="Change to the long-range and fast channel", action='store_true') - - parser.add_argument( - "--ch-mediumslow", help="Change to the medium-range and slow channel", action='store_true') - - parser.add_argument( - "--ch-mediumfast", help="Change to the medium-range and fast channel", action='store_true') - - parser.add_argument( - "--ch-shortslow", help="Change to the short-range and slow channel", action='store_true') - - parser.add_argument( - "--ch-shortfast", help="Change to the short-range and fast channel", action='store_true') - - - parser.add_argument( - "--set-owner", help="Set device owner name", action="store") - - parser.add_argument( - "--set-team", help="Set team affiliation (an invalid team will list valid values)", action="store") - - parser.add_argument( - "--set-ham", help="Set licensed Ham ID and turn off encryption", action="store") - - parser.add_argument( - "--dest", help="The destination node id for any sent commands, if not set '^all' or '^local' is assumed as appropriate", default=None) - - parser.add_argument( - "--sendtext", help="Send a text message. Can specify a destination '--dest' and/or channel index '--ch-index'.") - - parser.add_argument( - "--sendping", help="Send a ping message (which requests a reply)", action="store_true") - - parser.add_argument( - "--reboot", help="Tell the destination node to reboot", action="store_true") - - parser.add_argument( - "--reply", help="Reply to received messages", - action="store_true") - - parser.add_argument( - "--gpio-wrb", nargs=2, help="Set a particular GPIO # to 1 or 0", action='append') - - parser.add_argument( - "--gpio-rd", help="Read from a GPIO mask (ex: '0x10')") - - parser.add_argument( - "--gpio-watch", help="Start watching a GPIO mask for changes (ex: '0x10')") - - parser.add_argument( - "--no-time", help="Suppress sending the current time to the mesh", action="store_true") - - parser.add_argument( - "--setalt", help="Set device altitude (allows use without GPS)") - - parser.add_argument( - "--setlat", help="Set device latitude (allows use without GPS)") - - parser.add_argument( - "--setlon", help="Set device longitude (allows use without GPS)") - - parser.add_argument( - "--pos-fields", help="Specify fields to send when sending a position. Use no argument for a list of valid values. "\ - "Can pass multiple values as a space separated list like "\ - "this: '--pos-fields POS_ALTITUDE POS_ALT_MSL'", - nargs="*", action="store") - - parser.add_argument("--debug", help="Show API library debug log messages", - action="store_true") - - parser.add_argument("--test", help="Run stress test against all connected Meshtastic devices", - action="store_true") - - parser.add_argument("--ble", help="BLE mac address to connect to (BLE is not yet supported for this tool)", - default=None) - - parser.add_argument("--noproto", help="Don't start the API, just function as a dumb serial terminal.", - action="store_true") - - parser.add_argument('--setchan', dest='deprecated', nargs=2, action='append', - help='Deprecated, use "--ch-set param value" instead') - parser.add_argument('--set-router', dest='deprecated', - action='store_true', help='Deprecated, use "--set is_router true" instead') - parser.add_argument('--unset-router', dest='deprecated', - action='store_false', help='Deprecated, use "--set is_router false" instead') - - if have_tunnel: - parser.add_argument('--tunnel', - action='store_true', help="Create a TUN tunnel device for forwarding IP packets over the mesh") - parser.add_argument( - "--subnet", dest='tunnel_net', help="Sets the local-end subnet address for the TUN IP bridge", default=None) - - parser.set_defaults(deprecated=None) - - parser.add_argument('--version', action='version', - version=f"{pkg_resources.require('meshtastic')[0].version}") - - parser.add_argument( - "--support", action='store_true', help="Show support info (useful when troubleshooting an issue)") - - args = parser.parse_args() - our_globals.set_args(args) - our_globals.set_parser(parser)
- -def main() -
--
--
Perform command line meshtastic operations
---Expand source code -
-
-def main(): - """Perform command line meshtastic operations""" - our_globals = Globals.getInstance() - parser = argparse.ArgumentParser() - our_globals.set_parser(parser) - initParser() - common()
- -def onConnected(interface) -
--
--
Callback invoked when we connect to a radio
---Expand source code -
-
-def onConnected(interface): - """Callback invoked when we connect to a radio""" - closeNow = False # Should we drop the connection after we finish? - try: - our_globals = Globals.getInstance() - args = our_globals.get_args() - - print("Connected to radio") - - def getNode(): - """This operation could be expensive, so we try to cache the results""" - targetNode = our_globals.get_target_node() - if not targetNode: - targetNode = interface.getNode(args.destOrLocal) - our_globals.set_target_node(targetNode) - return targetNode - - if args.setlat or args.setlon or args.setalt: - closeNow = True - - alt = 0 - lat = 0.0 - lon = 0.0 - prefs = interface.localNode.radioConfig.preferences - if args.setalt: - alt = int(args.setalt) - prefs.fixed_position = True - print(f"Fixing altitude at {alt} meters") - if args.setlat: - lat = float(args.setlat) - prefs.fixed_position = True - print(f"Fixing latitude at {lat} degrees") - if args.setlon: - lon = float(args.setlon) - prefs.fixed_position = True - print(f"Fixing longitude at {lon} degrees") - - print("Setting device position") - # can include lat/long/alt etc: latitude = 37.5, longitude = -122.1 - interface.sendPosition(lat, lon, alt) - interface.localNode.writeConfig() - elif not args.no_time: - # We normally provide a current time to the mesh when we connect - interface.sendPosition() - - if args.set_owner: - closeNow = True - print(f"Setting device owner to {args.set_owner}") - getNode().setOwner(args.set_owner) - - if args.pos_fields: - # If --pos-fields invoked with args, set position fields - closeNow = True - prefs = getNode().radioConfig.preferences - allFields = 0 - - try: - for field in args.pos_fields: - v_field = radioconfig_pb2.PositionFlags.Value(field) - allFields |= v_field - - except ValueError: - print("ERROR: supported position fields are:") - print(radioconfig_pb2.PositionFlags.keys()) - print("If no fields are specified, will read and display current value.") - - else: - print(f"Setting position fields to {allFields}") - setPref(prefs, 'position_flags', ('%d' % allFields)) - print("Writing modified preferences to device") - getNode().writeConfig() - - elif args.pos_fields is not None: - # If --pos-fields invoked without args, read and display current value - closeNow = True - prefs = getNode().radioConfig.preferences - - fieldNames = [] - for bit in radioconfig_pb2.PositionFlags.values(): - if prefs.position_flags & bit: - fieldNames.append(radioconfig_pb2.PositionFlags.Name(bit)) - print(' '.join(fieldNames)) - - if args.set_team: - closeNow = True - try: - v_team = meshtastic.mesh_pb2.Team.Value(args.set_team.upper()) - except ValueError: - v_team = 0 - print(f"ERROR: Team \'{args.set_team}\' not found.") - print("Try a team name from the sorted list below, or use 'CLEAR' for unaffiliated:") - print(sorted(meshtastic.mesh_pb2.Team.keys())) - else: - print(f"Setting team to {meshtastic.mesh_pb2.Team.Name(v_team)}") - getNode().setOwner(team=v_team) - - if args.set_ham: - closeNow = True - print(f"Setting Ham ID to {args.set_ham} and turning off encryption") - getNode().setOwner(args.set_ham, is_licensed=True) - # Must turn off encryption on primary channel - getNode().turnOffEncryptionOnPrimaryChannel() - - if args.reboot: - closeNow = True - getNode().reboot() - - if args.sendtext: - closeNow = True - channelIndex = 0 - if args.ch_index is not None: - channelIndex = int(args.ch_index) - ch = getNode().getChannelByChannelIndex(channelIndex) - if ch and ch.role != channel_pb2.Channel.Role.DISABLED: - print(f"Sending text message {args.sendtext} to {args.destOrAll} on channelIndex:{channelIndex}") - interface.sendText(args.sendtext, args.destOrAll, wantAck=True, channelIndex=channelIndex) - else: - meshtastic.util.our_exit(f"Warning: {channelIndex} is not a valid channel. Channel must not be DISABLED.") - - if args.sendping: - payload = str.encode("test string") - print(f"Sending ping message to {args.destOrAll}") - interface.sendData(payload, args.destOrAll, portNum=portnums_pb2.PortNum.REPLY_APP, - wantAck=True, wantResponse=True) - - if args.gpio_wrb or args.gpio_rd or args.gpio_watch: - rhc = remote_hardware.RemoteHardwareClient(interface) - - if args.gpio_wrb: - bitmask = 0 - bitval = 0 - for wrpair in (args.gpio_wrb or []): - bitmask |= 1 << int(wrpair[0]) - bitval |= int(wrpair[1]) << int(wrpair[0]) - print(f"Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}") - rhc.writeGPIOs(args.dest, bitmask, bitval) - closeNow = True - - if args.gpio_rd: - bitmask = int(args.gpio_rd, 16) - print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}") - interface.mask = bitmask - rhc.readGPIOs(args.dest, bitmask, None) - if not interface.noProto: - # wait up to X seconds for a response - for _ in range(10): - time.sleep(1) - if interface.gotResponse: - break - logging.debug(f'end of gpio_rd') - - if args.gpio_watch: - bitmask = int(args.gpio_watch, 16) - print(f"Watching GPIO mask 0x{bitmask:x} from {args.dest}. Press ctrl-c to exit") - while True: - rhc.watchGPIOs(args.dest, bitmask) - time.sleep(1) - - # handle settings - if args.set: - closeNow = True - prefs = getNode().radioConfig.preferences - - # Handle the int/float/bool arguments - for pref in args.set: - setPref(prefs, pref[0], pref[1]) - - print("Writing modified preferences to device") - getNode().writeConfig() - - if args.configure: - with open(args.configure[0], encoding='utf8') as file: - configuration = yaml.safe_load(file) - closeNow = True - - if 'owner' in configuration: - print(f"Setting device owner to {configuration['owner']}") - getNode().setOwner(configuration['owner']) - - if 'channel_url' in configuration: - print("Setting channel url to", configuration['channel_url']) - getNode().setURL(configuration['channel_url']) - - if 'location' in configuration: - alt = 0 - lat = 0.0 - lon = 0.0 - prefs = interface.localNode.radioConfig.preferences - - if 'alt' in configuration['location']: - alt = int(configuration['location']['alt']) - prefs.fixed_position = True - print(f"Fixing altitude at {alt} meters") - if 'lat' in configuration['location']: - lat = float(configuration['location']['lat']) - prefs.fixed_position = True - print(f"Fixing latitude at {lat} degrees") - if 'lon' in configuration['location']: - lon = float(configuration['location']['lon']) - prefs.fixed_position = True - print(f"Fixing longitude at {lon} degrees") - print("Setting device position") - interface.sendPosition(lat, lon, alt) - interface.localNode.writeConfig() - - if 'user_prefs' in configuration: - prefs = getNode().radioConfig.preferences - for pref in configuration['user_prefs']: - setPref(prefs, pref, str(configuration['user_prefs'][pref])) - print("Writing modified preferences to device") - getNode().writeConfig() - - if args.export_config: - # export the configuration (the opposite of '--configure') - closeNow = True - export_config(interface) - - if args.seturl: - closeNow = True - getNode().setURL(args.seturl) - - # handle changing channels - - if args.ch_add: - closeNow = True - if len(args.ch_add) > 10: - meshtastic.util.our_exit("Warning: Channel name must be shorter. Channel not added.") - n = getNode() - ch = n.getChannelByName(args.ch_add) - if ch: - meshtastic.util.our_exit(f"Warning: This node already has a '{args.ch_add}' channel. No changes were made.") - else: - # get the first channel that is disabled (i.e., available) - ch = n.getDisabledChannel() - if not ch: - meshtastic.util.our_exit("Warning: No free channels were found") - chs = channel_pb2.ChannelSettings() - chs.psk = meshtastic.util.genPSK256() - chs.name = args.ch_add - ch.settings.CopyFrom(chs) - ch.role = channel_pb2.Channel.Role.SECONDARY - print(f"Writing modified channels to device") - n.writeChannel(ch.index) - - if args.ch_del: - closeNow = True - - channelIndex = our_globals.get_channel_index() - if channelIndex is None: - meshtastic.util.our_exit("Warning: Need to specify '--ch-index' for '--ch-del'.", 1) - else: - if channelIndex == 0: - meshtastic.util.our_exit("Warning: Cannot delete primary channel.", 1) - else: - print(f"Deleting channel {channelIndex}") - ch = getNode().deleteChannel(channelIndex) - - ch_changes = [args.ch_longslow, args.ch_longfast, - args.ch_mediumslow, args.ch_mediumfast, - args.ch_shortslow, args.ch_shortfast] - any_primary_channel_changes = any(x for x in ch_changes) - if args.ch_set or any_primary_channel_changes or args.ch_enable or args.ch_disable: - closeNow = True - - channelIndex = our_globals.get_channel_index() - if channelIndex is None: - if any_primary_channel_changes: - # we assume that they want the primary channel if they're setting range values - channelIndex = 0 - else: - meshtastic.util.our_exit("Warning: Need to specify '--ch-index'.", 1) - ch = getNode().channels[channelIndex] - - if any_primary_channel_changes or args.ch_enable or args.ch_disable: - - if channelIndex == 0 and not any_primary_channel_changes: - meshtastic.util.our_exit("Warning: Cannot enable/disable PRIMARY channel.") - - if channelIndex != 0: - if any_primary_channel_changes: - meshtastic.util.our_exit("Warning: Standard channel settings can only be applied to the PRIMARY channel") - - enable = True # default to enable - if args.ch_enable: - enable = True - if args.ch_disable: - enable = False - - def setSimpleChannel(modem_config): - """Set one of the simple modem_config only based channels""" - - # Completely new channel settings - chs = channel_pb2.ChannelSettings() - chs.modem_config = modem_config - chs.psk = bytes([1]) # Use default channel psk 1 - - ch.settings.CopyFrom(chs) - - # handle the simple channel set commands - if args.ch_longslow: - setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw125Cr48Sf4096) - - if args.ch_longfast: - setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw31_25Cr48Sf512) - - if args.ch_mediumslow: - setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw250Cr46Sf2048) - - if args.ch_mediumfast: - setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw250Cr47Sf1024) - - if args.ch_shortslow: - setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw125Cr45Sf128) - - if args.ch_shortfast: - setSimpleChannel(channel_pb2.ChannelSettings.ModemConfig.Bw500Cr45Sf128) - - # Handle the channel settings - for pref in (args.ch_set or []): - if pref[0] == "psk": - ch.settings.psk = meshtastic.util.fromPSK(pref[1]) - else: - setPref(ch.settings, pref[0], pref[1]) - enable = True # If we set any pref, assume the user wants to enable the channel - - if enable: - ch.role = channel_pb2.Channel.Role.PRIMARY if ( - channelIndex == 0) else channel_pb2.Channel.Role.SECONDARY - else: - ch.role = channel_pb2.Channel.Role.DISABLED - - print(f"Writing modified channels to device") - getNode().writeChannel(channelIndex) - - if args.info: - print("") - if not args.dest: # If we aren't trying to talk to our local node, don't show it - interface.showInfo() - - print("") - getNode().showInfo() - closeNow = True # FIXME, for now we leave the link up while talking to remote nodes - print("") - - if args.get: - closeNow = True - prefs = getNode().radioConfig.preferences - - # Handle the int/float/bool arguments - for pref in args.get: - getPref(prefs, pref[0]) - - print("Completed getting preferences") - - if args.nodes: - closeNow = True - interface.showNodes() - - if args.qr: - closeNow = True - url = interface.localNode.getURL(includeAll=False) - print(f"Primary channel URL {url}") - qr = pyqrcode.create(url) - print(qr.terminal()) - - if have_tunnel and args.tunnel: - # pylint: disable=C0415 - from . import tunnel - # Even if others said we could close, stay open if the user asked for a tunnel - closeNow = False - tunnel.Tunnel(interface, subnet=args.tunnel_net) - - # if the user didn't ask for serial debugging output, we might want to exit after we've done our operation - if (not args.seriallog) and closeNow: - interface.close() # after running command then exit - - except Exception as ex: - print(f"Aborting due to: {ex}") - interface.close() # close the connection now, so that our app exits
- -def onConnection(interface, topic=pubsub.core.callables.AUTO_TOPIC) -
--
--
Callback invoked when we connect/disconnect from a radio
---Expand source code -
-
-def onConnection(interface, topic=pub.AUTO_TOPIC): - """Callback invoked when we connect/disconnect from a radio""" - print(f"Connection changed: {topic.getName()}")
- -def onNode(node) -
--
--
Callback invoked when the node DB changes
---Expand source code -
-
-def onNode(node): - """Callback invoked when the node DB changes""" - print(f"Node changed: {node}")
- -def onReceive(packet, interface) -
--
--
Callback invoked when a packet arrives
---Expand source code -
-
-def onReceive(packet, interface): - """Callback invoked when a packet arrives""" - our_globals = Globals.getInstance() - args = our_globals.get_args() - try: - d = packet.get('decoded') - logging.debug(f'in onReceive() d:{d}') - - # Exit once we receive a reply - if args and args.sendtext and packet["to"] == interface.myInfo.my_node_num and d["portnum"] == portnums_pb2.PortNum.TEXT_MESSAGE_APP: - interface.close() # after running command then exit - - # Reply to every received message with some stats - if args and args.reply: - msg = d.get('text') - if msg: - rxSnr = packet['rxSnr'] - hopLimit = packet['hopLimit'] - print(f"message: {msg}") - reply = "got msg \'{}\' with rxSnr: {} and hopLimit: {}".format(msg, rxSnr, hopLimit) - print("Sending reply: ", reply) - interface.sendText(reply) - - except Exception as ex: - print(ex)
- -def setPref(attributes, name, valStr) -
--
--
Set a channel or preferences value
---Expand source code -
-
-def setPref(attributes, name, valStr): - """Set a channel or preferences value""" - - objDesc = attributes.DESCRIPTOR - field = objDesc.fields_by_name.get(name) - if not field: - print(f"{attributes.__class__.__name__} does not have an attribute called {name}, so you can not set it.") - print(f"Choices in sorted order are:") - names = [] - for f in objDesc.fields: - names.append(f'{f.name}') - for temp_name in sorted(names): - print(f" {temp_name}") - return - - val = meshtastic.util.fromStr(valStr) - - enumType = field.enum_type - # pylint: disable=C0123 - if enumType and type(val) == str: - # We've failed so far to convert this string into an enum, try to find it by reflection - e = enumType.values_by_name.get(val) - if e: - val = e.number - else: - print(f"{name} does not have an enum called {val}, so you can not set it.") - print(f"Choices in sorted order are:") - names = [] - for f in enumType.values: - names.append(f'{f.name}') - for temp_name in sorted(names): - print(f" {temp_name}") - return - - # okay - try to read the value - try: - try: - setattr(attributes, name, val) - except TypeError: - # The setter didn't like our arg type guess try again as a string - setattr(attributes, name, valStr) - - # succeeded! - print(f"Set {name} to {valStr}") - except Exception as ex: - print(f"Can't set {name} due to {ex}")
- -def subscribe() -
--
--
Subscribe to the topics the user probably wants to see, prints output to stdout
---Expand source code -
-
-def subscribe(): - """Subscribe to the topics the user probably wants to see, prints output to stdout""" - pub.subscribe(onReceive, "meshtastic.receive") - # pub.subscribe(onConnection, "meshtastic.connection") - - # We now call onConnected from main - # pub.subscribe(onConnected, "meshtastic.connection.established") - - # pub.subscribe(onNode, "meshtastic.node")
- -def tunnelMain() -
--
--
Run a meshtastic IP tunnel
---Expand source code -
-
-def tunnelMain(): - """Run a meshtastic IP tunnel""" - our_globals = Globals.getInstance() - parser = argparse.ArgumentParser() - our_globals.set_parser(parser) - initParser() - args = our_globals.get_args() - args.tunnel = True - our_globals.set_args(args) - common()
-