diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index d9e3517..b7b86a2 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -13,24 +13,30 @@ import sys import time from typing import Optional -import pyqrcode # type: ignore[import-untyped] +import pyqrcode # type: ignore[import-untyped] import yaml from google.protobuf.json_format import MessageToDict -from pubsub import pub # type: ignore[import-untyped] +from pubsub import pub # type: ignore[import-untyped] import meshtastic.test import meshtastic.util -from meshtastic import mt_config -from meshtastic.protobuf import channel_pb2, config_pb2, portnums_pb2 -from meshtastic import remote_hardware, BROADCAST_ADDR -from meshtastic.version import get_active_version +from meshtastic import BROADCAST_ADDR, mt_config, remote_hardware from meshtastic.ble_interface import BLEInterface from meshtastic.mesh_interface import MeshInterface -from meshtastic.powermon import RidenPowerSupply, PPK2PowerSupply, SimPowerSupply, PowerStress, PowerMeter +from meshtastic.powermon import ( + PowerMeter, + PowerStress, + PPK2PowerSupply, + RidenPowerSupply, + SimPowerSupply, +) +from meshtastic.protobuf import channel_pb2, config_pb2, portnums_pb2 from meshtastic.slog import LogSet +from meshtastic.version import get_active_version meter: Optional[PowerMeter] = None + def onReceive(packet, interface): """Callback invoked when a packet arrives""" args = mt_config.args @@ -66,11 +72,13 @@ def onConnection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=W0613 """Callback invoked when we connect/disconnect from a radio""" print(f"Connection changed: {topic.getName()}") + def checkChannel(interface: MeshInterface, channelIndex: int) -> bool: """Given an interface and channel index, return True if that channel is non-disabled on the local node""" ch = interface.localNode.getChannelByChannelIndex(channelIndex) logging.debug(f"ch:{ch}") - return (ch and ch.role != channel_pb2.Channel.Role.DISABLED) + return ch and ch.role != channel_pb2.Channel.Role.DISABLED + def getPref(node, comp_name): """Get a channel or preferences value""" @@ -146,6 +154,7 @@ def splitCompoundName(comp_name): name.append(comp_name) return name + def traverseConfig(config_root, config, interface_config): """Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference""" snake_name = meshtastic.util.camel_to_snake(config_root) @@ -154,14 +163,11 @@ def traverseConfig(config_root, config, interface_config): if isinstance(config[pref], dict): traverseConfig(pref_name, config[pref], interface_config) else: - setPref( - interface_config, - pref_name, - str(config[pref]) - ) + setPref(interface_config, pref_name, str(config[pref])) return True + def setPref(config, comp_name, valStr) -> bool: """Set a channel or preferences value""" @@ -275,7 +281,9 @@ def onConnected(interface): interface.localNode.removeFixedPosition() elif args.setlat or args.setlon or args.setalt: if args.dest != BROADCAST_ADDR: - print("Setting latitude, longitude, and altitude of remote nodes is not supported.") + print( + "Setting latitude, longitude, and altitude of remote nodes is not supported." + ) return closeNow = True @@ -303,10 +311,17 @@ def onConnected(interface): interface.localNode.setFixedPosition(lat, lon, alt) elif not args.no_time: # We normally provide a current time to the mesh when we connect - if interface.localNode.nodeNum in interface.nodesByNum and "position" in interface.nodesByNum[interface.localNode.nodeNum]: + if ( + interface.localNode.nodeNum in interface.nodesByNum + and "position" in interface.nodesByNum[interface.localNode.nodeNum] + ): # send the same position the node already knows, just to update time position = interface.nodesByNum[interface.localNode.nodeNum]["position"] - interface.sendPosition(position.get("latitude", 0.0), position.get("longitude", 0.0), position.get("altitude", 0.0)) + interface.sendPosition( + position.get("latitude", 0.0), + position.get("longitude", 0.0), + position.get("altitude", 0.0), + ) else: interface.sendPosition() @@ -454,7 +469,9 @@ def onConnected(interface): dest = str(args.traceroute) channelIndex = mt_config.channel_index or 0 if checkChannel(interface, channelIndex): - print(f"Sending traceroute request to {dest} on channelIndex:{channelIndex} (this could take a while)") + print( + f"Sending traceroute request to {dest} on channelIndex:{channelIndex} (this could take a while)" + ) interface.sendTraceRoute(dest, hopLimit, channelIndex=channelIndex) if args.request_telemetry: @@ -463,8 +480,14 @@ def onConnected(interface): else: channelIndex = mt_config.channel_index or 0 if checkChannel(interface, channelIndex): - print(f"Sending telemetry request to {args.dest} on channelIndex:{channelIndex} (this could take a while)") - interface.sendTelemetry(destinationId=args.dest, wantResponse=True, channelIndex=channelIndex) + print( + f"Sending telemetry request to {args.dest} on channelIndex:{channelIndex} (this could take a while)" + ) + interface.sendTelemetry( + destinationId=args.dest, + wantResponse=True, + channelIndex=channelIndex, + ) if args.request_position: if args.dest == BROADCAST_ADDR: @@ -472,8 +495,14 @@ def onConnected(interface): else: channelIndex = mt_config.channel_index or 0 if checkChannel(interface, channelIndex): - print(f"Sending position request to {args.dest} on channelIndex:{channelIndex} (this could take a while)") - interface.sendPosition(destinationId=args.dest, wantResponse=True, channelIndex=channelIndex) + print( + f"Sending position request to {args.dest} on channelIndex:{channelIndex} (this could take a while)" + ) + interface.sendPosition( + destinationId=args.dest, + wantResponse=True, + channelIndex=channelIndex, + ) if args.gpio_wrb or args.gpio_rd or args.gpio_watch: if args.dest == BROADCAST_ADDR: @@ -615,7 +644,9 @@ def onConnected(interface): if "config" in configuration: localConfig = interface.getNode(args.dest).localConfig for section in configuration["config"]: - traverseConfig(section, configuration["config"][section], localConfig) + traverseConfig( + section, configuration["config"][section], localConfig + ) interface.getNode(args.dest).writeConfig( meshtastic.util.camel_to_snake(section) ) @@ -623,7 +654,11 @@ def onConnected(interface): if "module_config" in configuration: moduleConfig = interface.getNode(args.dest).moduleConfig for section in configuration["module_config"]: - traverseConfig(section, configuration["module_config"][section], moduleConfig) + traverseConfig( + section, + configuration["module_config"][section], + moduleConfig, + ) interface.getNode(args.dest).writeConfig( meshtastic.util.camel_to_snake(section) ) @@ -676,7 +711,9 @@ def onConnected(interface): print(f"Writing modified channels to device") n.writeChannel(ch.index) if channelIndex is None: - print(f"Setting newly-added channel's {ch.index} as '--ch-index' for further modifications") + print( + f"Setting newly-added channel's {ch.index} as '--ch-index' for further modifications" + ) mt_config.channel_index = ch.index if args.ch_del: @@ -762,7 +799,7 @@ def onConnected(interface): else: found = setPref(ch.settings, pref[0], pref[1]) if not found: - category_settings = ['module_settings'] + category_settings = ["module_settings"] print( f"{ch.settings.__class__.__name__} does not have an attribute {pref[0]}." ) @@ -772,7 +809,9 @@ def onConnected(interface): print(f"{field.name}") else: print(f"{field.name}:") - config = ch.settings.DESCRIPTOR.fields_by_name.get(field.name) + config = ch.settings.DESCRIPTOR.fields_by_name.get( + field.name + ) names = [] for sub_field in config.message_type.fields: tmp_name = f"{field.name}.{sub_field.name}" @@ -852,16 +891,20 @@ def onConnected(interface): qr = pyqrcode.create(url) print(qr.terminal()) - log_set: Optional[LogSet] = None # we need to keep a reference to the logset so it doesn't get GCed early + log_set: Optional[LogSet] = None # type: ignore[annotation-unchecked] + # we need to keep a reference to the logset so it doesn't get GCed early + if args.slog or args.power_stress: # Setup loggers global meter # pylint: disable=global-variable-not-assigned - log_set = LogSet(interface, args.slog if args.slog != 'default' else None, meter) + log_set = LogSet( + interface, args.slog if args.slog != "default" else None, meter + ) if args.power_stress: stress = PowerStress(interface) stress.run() - closeNow = True # exit immediately after stress test + closeNow = True # exit immediately after stress test if args.listen: closeNow = False @@ -891,7 +934,7 @@ def onConnected(interface): interface.getNode(args.dest, False).iface.waitForAckNak() if args.wait_to_disconnect: - print(f"Waiting {args.wait_to_disconnect} seconds before disconnecting" ) + print(f"Waiting {args.wait_to_disconnect} seconds before disconnecting") time.sleep(int(args.wait_to_disconnect)) # if the user didn't ask for serial debugging output, we might want to exit after we've done our operation @@ -1004,6 +1047,7 @@ def export_config(interface): print(config) return config + def create_power_meter(): """Setup the power meter.""" @@ -1038,6 +1082,7 @@ def create_power_meter(): logging.info("Powered-on, waiting for device to boot") time.sleep(5) + def common(): """Shared code for all of our command line wrappers.""" logfile = None @@ -1104,20 +1149,29 @@ def common(): print(f"Found: name='{x.name}' address='{x.address}'") meshtastic.util.our_exit("BLE scan finished", 0) elif args.ble: - client = BLEInterface(args.ble if args.ble != "any" else None, debugOut=logfile, noProto=args.noproto, noNodes=args.no_nodes) + client = BLEInterface( + args.ble if args.ble != "any" else None, + debugOut=logfile, + noProto=args.noproto, + noNodes=args.no_nodes, + ) elif args.host: try: client = meshtastic.tcp_interface.TCPInterface( - args.host, debugOut=logfile, noProto=args.noproto, noNodes=args.no_nodes + args.host, + debugOut=logfile, + noProto=args.noproto, + noNodes=args.no_nodes, ) except Exception as ex: - meshtastic.util.our_exit( - f"Error connecting to {args.host}:{ex}", 1 - ) + meshtastic.util.our_exit(f"Error connecting to {args.host}:{ex}", 1) else: try: client = meshtastic.serial_interface.SerialInterface( - args.port, debugOut=logfile, noProto=args.noproto, noNodes=args.no_nodes + args.port, + debugOut=logfile, + noProto=args.noproto, + noNodes=args.no_nodes, ) except PermissionError as ex: username = os.getlogin() @@ -1132,7 +1186,10 @@ def common(): if client.devPath is None: try: client = meshtastic.tcp_interface.TCPInterface( - "localhost", debugOut=logfile, noProto=args.noproto, noNodes=args.no_nodes + "localhost", + debugOut=logfile, + noProto=args.noproto, + noNodes=args.no_nodes, ) except Exception as ex: meshtastic.util.our_exit( @@ -1144,7 +1201,10 @@ def common(): have_tunnel = platform.system() == "Linux" if ( - args.noproto or args.reply or (have_tunnel and args.tunnel) or args.listen + args.noproto + or args.reply + or (have_tunnel and args.tunnel) + or args.listen ): # loop until someone presses ctrlc try: while True: @@ -1155,13 +1215,19 @@ def common(): # don't call exit, background threads might be running still # sys.exit(0) + def addConnectionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: """Add connection specifiation arguments""" - outer = parser.add_argument_group('Connection', 'Optional arguments that specify how to connect to a Meshtastic device.') + outer = parser.add_argument_group( + "Connection", + "Optional arguments that specify how to connect to a Meshtastic device.", + ) group = outer.add_mutually_exclusive_group() group.add_argument( - "--port", "--serial", "-s", + "--port", + "--serial", + "-s", help="The port of the device to connect to using serial, e.g. /dev/ttyUSB0. (defaults to trying to detect a port)", nargs="?", const=None, @@ -1169,19 +1235,22 @@ def addConnectionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParse ) group.add_argument( - "--host", "--tcp", "-t", + "--host", + "--tcp", + "-t", help="Connect to a device using TCP, optionally passing hostname or IP address to use. (defaults to '%(const)s')", nargs="?", default=None, - const="localhost" + const="localhost", ) group.add_argument( - "--ble", "-b", + "--ble", + "-b", help="Connect to a BLE device, optionally specifying a device name (defaults to '%(const)s')", nargs="?", default=None, - const="any" + const="any", ) return parser @@ -1193,9 +1262,11 @@ def initParser(): args = mt_config.args # The "Help" group includes the help option and other informational stuff about the CLI itself - outerHelpGroup = parser.add_argument_group('Help') + outerHelpGroup = parser.add_argument_group("Help") helpGroup = outerHelpGroup.add_mutually_exclusive_group() - helpGroup.add_argument("-h", "--help", action="help", help="show this help message and exit") + helpGroup.add_argument( + "-h", "--help", action="help", help="show this help message and exit" + ) the_version = get_active_version() helpGroup.add_argument("--version", action="version", version=f"{the_version}") @@ -1232,9 +1303,9 @@ def initParser(): group.add_argument( "--seriallog", help="Log device serial output to either 'none' or a filename to append to. Defaults to 'stdout' if no filename specified.", - nargs='?', + nargs="?", const="stdout", - default=None + default=None, ) group.add_argument( @@ -1490,7 +1561,7 @@ def initParser(): group.add_argument( "--remove-node", - help="Tell the destination node to remove a specific node from its DB, by node number or ID" + help="Tell the destination node to remove a specific node from its DB, by node number or ID", ) group.add_argument( "--reset-nodedb", @@ -1555,7 +1626,9 @@ def initParser(): action="store_true", ) - power_group = parser.add_argument_group('Power Testing', 'Options for power testing/logging.') + power_group = parser.add_argument_group( + "Power Testing", "Options for power testing/logging." + ) power_supply_group = power_group.add_mutually_exclusive_group() @@ -1604,7 +1677,7 @@ def initParser(): help="Store structured-logs (slogs) for this run, optionally you can specifiy a destination directory", nargs="?", default=None, - const="default" + const="default", ) group.add_argument( @@ -1633,7 +1706,9 @@ def initParser(): action="store_true", ) - remoteHardwareArgs = parser.add_argument_group('Remote Hardware', 'Arguments related to the Remote Hardware module') + remoteHardwareArgs = parser.add_argument_group( + "Remote Hardware", "Arguments related to the Remote Hardware module" + ) remoteHardwareArgs.add_argument( "--gpio-wrb", nargs=2, help="Set a particular GPIO # to 1 or 0", action="append" @@ -1647,10 +1722,11 @@ def initParser(): "--gpio-watch", help="Start watching a GPIO mask for changes (ex: '0x10')" ) - have_tunnel = platform.system() == "Linux" if have_tunnel: - tunnelArgs = parser.add_argument_group('Tunnel', 'Arguments related to establishing a tunnel device over the mesh.') + tunnelArgs = parser.add_argument_group( + "Tunnel", "Arguments related to establishing a tunnel device over the mesh." + ) tunnelArgs.add_argument( "--tunnel", action="store_true", @@ -1665,7 +1741,6 @@ def initParser(): parser.set_defaults(deprecated=None) - args = parser.parse_args() mt_config.args = args mt_config.parser = parser @@ -1676,7 +1751,8 @@ def main(): parser = argparse.ArgumentParser( add_help=False, epilog="If no connection arguments are specified, we search for a compatible serial device, " - "and if none is found, then attempt a TCP connection to localhost.") + "and if none is found, then attempt a TCP connection to localhost.", + ) mt_config.parser = parser initParser() common() diff --git a/meshtastic/analysis/__main__.py b/meshtastic/analysis/__main__.py index 2fcc7bf..993f04b 100644 --- a/meshtastic/analysis/__main__.py +++ b/meshtastic/analysis/__main__.py @@ -2,15 +2,16 @@ import argparse import logging +from typing import cast -import dash_bootstrap_components as dbc +import dash_bootstrap_components as dbc # type: ignore[import-untyped] import numpy as np import pandas as pd -import plotly.express as px -import plotly.graph_objects as go +import plotly.express as px # type: ignore[import-untyped] +import plotly.graph_objects as go # type: ignore[import-untyped] import pyarrow as pa -import pyarrow.feather as feather -from dash import Dash, Input, Output, callback, dash_table, dcc, html +from dash import Dash, dcc, html # type: ignore[import-untyped] +from pyarrow import feather from .. import mesh_pb2, powermon_pb2 from ..slog import root_dir @@ -60,7 +61,8 @@ def read_pandas(filepath: str) -> pd.DataFrame: pa.float64(): pd.Float64Dtype(), pa.string(): pd.StringDtype(), } - return feather.read_table(filepath).to_pandas(types_mapper=dtype_mapping.get) + + return cast(pd.DataFrame, feather.read_table(filepath).to_pandas(types_mapper=dtype_mapping.get)) # type: ignore[arg-type] def get_pmon_raises(dslog: pd.DataFrame) -> pd.DataFrame: @@ -87,6 +89,7 @@ def get_pmon_raises(dslog: pd.DataFrame) -> pd.DataFrame: pmon_raises = pmon_events[pmon_events["pm_raises"].notnull()][["time", "pm_raises"]] pmon_falls = pmon_events[pmon_events["pm_falls"].notnull()] + # pylint: disable=unused-variable def get_endtime(row): """Find the corresponding fall event.""" following = pmon_falls[ @@ -134,13 +137,8 @@ def create_dash(slog_path: str) -> Dash: """ app = Dash(external_stylesheets=[dbc.themes.BOOTSTRAP]) - parser = create_argparser() - args = parser.parse_args() - if not args.slog: - args.slog = f"{root_dir()}/latest" - - dpwr = read_pandas(f"{args.slog}/power.feather") - dslog = read_pandas(f"{args.slog}/slog.feather") + dpwr = read_pandas(f"{slog_path}/power.feather") + dslog = read_pandas(f"{slog_path}/slog.feather") pmon_raises = get_pmon_raises(dslog) @@ -167,7 +165,9 @@ def create_dash(slog_path: str) -> Dash: fig = go.Figure(data=max_pwr_points.data + avg_pwr_lines.data + pmon_points.data) - fig.update_layout(legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01)) + fig.update_layout( + legend={"yanchor": "top", "y": 0.99, "xanchor": "left", "x": 0.01} + ) # App layout app.layout = [ @@ -180,11 +180,16 @@ def create_dash(slog_path: str) -> Dash: def main(): """Entry point of the script.""" - app = create_dash(slog_path="/home/kevinh/.local/share/meshtastic/slogs/latest") + + parser = create_argparser() + args = parser.parse_args() + if not args.slog: + args.slog = f"{root_dir()}/latest" + + app = create_dash(slog_path=args.slog) port = 8051 - logging.info( - f"Running Dash visualization webapp on port {port} (publicly accessible)" - ) + logging.info(f"Running Dash visualization of {args.slog} (publicly accessible)") + app.run_server(debug=True, host="0.0.0.0", port=port)