fix linter warnings (and alas: reformat __main__.py)

main.py's only real change is
        log_set: Optional[LogSet] = None  # type: ignore[annotation-unchecked]
Everything else is the automated reformatting to match our trunk formatting
rules.
This commit is contained in:
Kevin Hester
2024-07-31 15:19:16 -07:00
parent a4715171e4
commit 4906f79be5
2 changed files with 155 additions and 74 deletions

View File

@@ -13,24 +13,30 @@ import sys
import time
from typing import Optional
import pyqrcode # type: ignore[import-untyped]
import pyqrcode # type: ignore[import-untyped]
import yaml
from google.protobuf.json_format import MessageToDict
from pubsub import pub # type: ignore[import-untyped]
from pubsub import pub # type: ignore[import-untyped]
import meshtastic.test
import meshtastic.util
from meshtastic import mt_config
from meshtastic.protobuf import channel_pb2, config_pb2, portnums_pb2
from meshtastic import remote_hardware, BROADCAST_ADDR
from meshtastic.version import get_active_version
from meshtastic import BROADCAST_ADDR, mt_config, remote_hardware
from meshtastic.ble_interface import BLEInterface
from meshtastic.mesh_interface import MeshInterface
from meshtastic.powermon import RidenPowerSupply, PPK2PowerSupply, SimPowerSupply, PowerStress, PowerMeter
from meshtastic.powermon import (
PowerMeter,
PowerStress,
PPK2PowerSupply,
RidenPowerSupply,
SimPowerSupply,
)
from meshtastic.protobuf import channel_pb2, config_pb2, portnums_pb2
from meshtastic.slog import LogSet
from meshtastic.version import get_active_version
meter: Optional[PowerMeter] = None
def onReceive(packet, interface):
"""Callback invoked when a packet arrives"""
args = mt_config.args
@@ -66,11 +72,13 @@ def onConnection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=W0613
"""Callback invoked when we connect/disconnect from a radio"""
print(f"Connection changed: {topic.getName()}")
def checkChannel(interface: MeshInterface, channelIndex: int) -> bool:
"""Given an interface and channel index, return True if that channel is non-disabled on the local node"""
ch = interface.localNode.getChannelByChannelIndex(channelIndex)
logging.debug(f"ch:{ch}")
return (ch and ch.role != channel_pb2.Channel.Role.DISABLED)
return ch and ch.role != channel_pb2.Channel.Role.DISABLED
def getPref(node, comp_name):
"""Get a channel or preferences value"""
@@ -146,6 +154,7 @@ def splitCompoundName(comp_name):
name.append(comp_name)
return name
def traverseConfig(config_root, config, interface_config):
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
snake_name = meshtastic.util.camel_to_snake(config_root)
@@ -154,14 +163,11 @@ def traverseConfig(config_root, config, interface_config):
if isinstance(config[pref], dict):
traverseConfig(pref_name, config[pref], interface_config)
else:
setPref(
interface_config,
pref_name,
str(config[pref])
)
setPref(interface_config, pref_name, str(config[pref]))
return True
def setPref(config, comp_name, valStr) -> bool:
"""Set a channel or preferences value"""
@@ -275,7 +281,9 @@ def onConnected(interface):
interface.localNode.removeFixedPosition()
elif args.setlat or args.setlon or args.setalt:
if args.dest != BROADCAST_ADDR:
print("Setting latitude, longitude, and altitude of remote nodes is not supported.")
print(
"Setting latitude, longitude, and altitude of remote nodes is not supported."
)
return
closeNow = True
@@ -303,10 +311,17 @@ def onConnected(interface):
interface.localNode.setFixedPosition(lat, lon, alt)
elif not args.no_time:
# We normally provide a current time to the mesh when we connect
if interface.localNode.nodeNum in interface.nodesByNum and "position" in interface.nodesByNum[interface.localNode.nodeNum]:
if (
interface.localNode.nodeNum in interface.nodesByNum
and "position" in interface.nodesByNum[interface.localNode.nodeNum]
):
# send the same position the node already knows, just to update time
position = interface.nodesByNum[interface.localNode.nodeNum]["position"]
interface.sendPosition(position.get("latitude", 0.0), position.get("longitude", 0.0), position.get("altitude", 0.0))
interface.sendPosition(
position.get("latitude", 0.0),
position.get("longitude", 0.0),
position.get("altitude", 0.0),
)
else:
interface.sendPosition()
@@ -454,7 +469,9 @@ def onConnected(interface):
dest = str(args.traceroute)
channelIndex = mt_config.channel_index or 0
if checkChannel(interface, channelIndex):
print(f"Sending traceroute request to {dest} on channelIndex:{channelIndex} (this could take a while)")
print(
f"Sending traceroute request to {dest} on channelIndex:{channelIndex} (this could take a while)"
)
interface.sendTraceRoute(dest, hopLimit, channelIndex=channelIndex)
if args.request_telemetry:
@@ -463,8 +480,14 @@ def onConnected(interface):
else:
channelIndex = mt_config.channel_index or 0
if checkChannel(interface, channelIndex):
print(f"Sending telemetry request to {args.dest} on channelIndex:{channelIndex} (this could take a while)")
interface.sendTelemetry(destinationId=args.dest, wantResponse=True, channelIndex=channelIndex)
print(
f"Sending telemetry request to {args.dest} on channelIndex:{channelIndex} (this could take a while)"
)
interface.sendTelemetry(
destinationId=args.dest,
wantResponse=True,
channelIndex=channelIndex,
)
if args.request_position:
if args.dest == BROADCAST_ADDR:
@@ -472,8 +495,14 @@ def onConnected(interface):
else:
channelIndex = mt_config.channel_index or 0
if checkChannel(interface, channelIndex):
print(f"Sending position request to {args.dest} on channelIndex:{channelIndex} (this could take a while)")
interface.sendPosition(destinationId=args.dest, wantResponse=True, channelIndex=channelIndex)
print(
f"Sending position request to {args.dest} on channelIndex:{channelIndex} (this could take a while)"
)
interface.sendPosition(
destinationId=args.dest,
wantResponse=True,
channelIndex=channelIndex,
)
if args.gpio_wrb or args.gpio_rd or args.gpio_watch:
if args.dest == BROADCAST_ADDR:
@@ -615,7 +644,9 @@ def onConnected(interface):
if "config" in configuration:
localConfig = interface.getNode(args.dest).localConfig
for section in configuration["config"]:
traverseConfig(section, configuration["config"][section], localConfig)
traverseConfig(
section, configuration["config"][section], localConfig
)
interface.getNode(args.dest).writeConfig(
meshtastic.util.camel_to_snake(section)
)
@@ -623,7 +654,11 @@ def onConnected(interface):
if "module_config" in configuration:
moduleConfig = interface.getNode(args.dest).moduleConfig
for section in configuration["module_config"]:
traverseConfig(section, configuration["module_config"][section], moduleConfig)
traverseConfig(
section,
configuration["module_config"][section],
moduleConfig,
)
interface.getNode(args.dest).writeConfig(
meshtastic.util.camel_to_snake(section)
)
@@ -676,7 +711,9 @@ def onConnected(interface):
print(f"Writing modified channels to device")
n.writeChannel(ch.index)
if channelIndex is None:
print(f"Setting newly-added channel's {ch.index} as '--ch-index' for further modifications")
print(
f"Setting newly-added channel's {ch.index} as '--ch-index' for further modifications"
)
mt_config.channel_index = ch.index
if args.ch_del:
@@ -762,7 +799,7 @@ def onConnected(interface):
else:
found = setPref(ch.settings, pref[0], pref[1])
if not found:
category_settings = ['module_settings']
category_settings = ["module_settings"]
print(
f"{ch.settings.__class__.__name__} does not have an attribute {pref[0]}."
)
@@ -772,7 +809,9 @@ def onConnected(interface):
print(f"{field.name}")
else:
print(f"{field.name}:")
config = ch.settings.DESCRIPTOR.fields_by_name.get(field.name)
config = ch.settings.DESCRIPTOR.fields_by_name.get(
field.name
)
names = []
for sub_field in config.message_type.fields:
tmp_name = f"{field.name}.{sub_field.name}"
@@ -852,16 +891,20 @@ def onConnected(interface):
qr = pyqrcode.create(url)
print(qr.terminal())
log_set: Optional[LogSet] = None # we need to keep a reference to the logset so it doesn't get GCed early
log_set: Optional[LogSet] = None # type: ignore[annotation-unchecked]
# we need to keep a reference to the logset so it doesn't get GCed early
if args.slog or args.power_stress:
# Setup loggers
global meter # pylint: disable=global-variable-not-assigned
log_set = LogSet(interface, args.slog if args.slog != 'default' else None, meter)
log_set = LogSet(
interface, args.slog if args.slog != "default" else None, meter
)
if args.power_stress:
stress = PowerStress(interface)
stress.run()
closeNow = True # exit immediately after stress test
closeNow = True # exit immediately after stress test
if args.listen:
closeNow = False
@@ -891,7 +934,7 @@ def onConnected(interface):
interface.getNode(args.dest, False).iface.waitForAckNak()
if args.wait_to_disconnect:
print(f"Waiting {args.wait_to_disconnect} seconds before disconnecting" )
print(f"Waiting {args.wait_to_disconnect} seconds before disconnecting")
time.sleep(int(args.wait_to_disconnect))
# if the user didn't ask for serial debugging output, we might want to exit after we've done our operation
@@ -1004,6 +1047,7 @@ def export_config(interface):
print(config)
return config
def create_power_meter():
"""Setup the power meter."""
@@ -1038,6 +1082,7 @@ def create_power_meter():
logging.info("Powered-on, waiting for device to boot")
time.sleep(5)
def common():
"""Shared code for all of our command line wrappers."""
logfile = None
@@ -1104,20 +1149,29 @@ def common():
print(f"Found: name='{x.name}' address='{x.address}'")
meshtastic.util.our_exit("BLE scan finished", 0)
elif args.ble:
client = BLEInterface(args.ble if args.ble != "any" else None, debugOut=logfile, noProto=args.noproto, noNodes=args.no_nodes)
client = BLEInterface(
args.ble if args.ble != "any" else None,
debugOut=logfile,
noProto=args.noproto,
noNodes=args.no_nodes,
)
elif args.host:
try:
client = meshtastic.tcp_interface.TCPInterface(
args.host, debugOut=logfile, noProto=args.noproto, noNodes=args.no_nodes
args.host,
debugOut=logfile,
noProto=args.noproto,
noNodes=args.no_nodes,
)
except Exception as ex:
meshtastic.util.our_exit(
f"Error connecting to {args.host}:{ex}", 1
)
meshtastic.util.our_exit(f"Error connecting to {args.host}:{ex}", 1)
else:
try:
client = meshtastic.serial_interface.SerialInterface(
args.port, debugOut=logfile, noProto=args.noproto, noNodes=args.no_nodes
args.port,
debugOut=logfile,
noProto=args.noproto,
noNodes=args.no_nodes,
)
except PermissionError as ex:
username = os.getlogin()
@@ -1132,7 +1186,10 @@ def common():
if client.devPath is None:
try:
client = meshtastic.tcp_interface.TCPInterface(
"localhost", debugOut=logfile, noProto=args.noproto, noNodes=args.no_nodes
"localhost",
debugOut=logfile,
noProto=args.noproto,
noNodes=args.no_nodes,
)
except Exception as ex:
meshtastic.util.our_exit(
@@ -1144,7 +1201,10 @@ def common():
have_tunnel = platform.system() == "Linux"
if (
args.noproto or args.reply or (have_tunnel and args.tunnel) or args.listen
args.noproto
or args.reply
or (have_tunnel and args.tunnel)
or args.listen
): # loop until someone presses ctrlc
try:
while True:
@@ -1155,13 +1215,19 @@ def common():
# don't call exit, background threads might be running still
# sys.exit(0)
def addConnectionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""Add connection specifiation arguments"""
outer = parser.add_argument_group('Connection', 'Optional arguments that specify how to connect to a Meshtastic device.')
outer = parser.add_argument_group(
"Connection",
"Optional arguments that specify how to connect to a Meshtastic device.",
)
group = outer.add_mutually_exclusive_group()
group.add_argument(
"--port", "--serial", "-s",
"--port",
"--serial",
"-s",
help="The port of the device to connect to using serial, e.g. /dev/ttyUSB0. (defaults to trying to detect a port)",
nargs="?",
const=None,
@@ -1169,19 +1235,22 @@ def addConnectionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParse
)
group.add_argument(
"--host", "--tcp", "-t",
"--host",
"--tcp",
"-t",
help="Connect to a device using TCP, optionally passing hostname or IP address to use. (defaults to '%(const)s')",
nargs="?",
default=None,
const="localhost"
const="localhost",
)
group.add_argument(
"--ble", "-b",
"--ble",
"-b",
help="Connect to a BLE device, optionally specifying a device name (defaults to '%(const)s')",
nargs="?",
default=None,
const="any"
const="any",
)
return parser
@@ -1193,9 +1262,11 @@ def initParser():
args = mt_config.args
# The "Help" group includes the help option and other informational stuff about the CLI itself
outerHelpGroup = parser.add_argument_group('Help')
outerHelpGroup = parser.add_argument_group("Help")
helpGroup = outerHelpGroup.add_mutually_exclusive_group()
helpGroup.add_argument("-h", "--help", action="help", help="show this help message and exit")
helpGroup.add_argument(
"-h", "--help", action="help", help="show this help message and exit"
)
the_version = get_active_version()
helpGroup.add_argument("--version", action="version", version=f"{the_version}")
@@ -1232,9 +1303,9 @@ def initParser():
group.add_argument(
"--seriallog",
help="Log device serial output to either 'none' or a filename to append to. Defaults to 'stdout' if no filename specified.",
nargs='?',
nargs="?",
const="stdout",
default=None
default=None,
)
group.add_argument(
@@ -1490,7 +1561,7 @@ def initParser():
group.add_argument(
"--remove-node",
help="Tell the destination node to remove a specific node from its DB, by node number or ID"
help="Tell the destination node to remove a specific node from its DB, by node number or ID",
)
group.add_argument(
"--reset-nodedb",
@@ -1555,7 +1626,9 @@ def initParser():
action="store_true",
)
power_group = parser.add_argument_group('Power Testing', 'Options for power testing/logging.')
power_group = parser.add_argument_group(
"Power Testing", "Options for power testing/logging."
)
power_supply_group = power_group.add_mutually_exclusive_group()
@@ -1604,7 +1677,7 @@ def initParser():
help="Store structured-logs (slogs) for this run, optionally you can specifiy a destination directory",
nargs="?",
default=None,
const="default"
const="default",
)
group.add_argument(
@@ -1633,7 +1706,9 @@ def initParser():
action="store_true",
)
remoteHardwareArgs = parser.add_argument_group('Remote Hardware', 'Arguments related to the Remote Hardware module')
remoteHardwareArgs = parser.add_argument_group(
"Remote Hardware", "Arguments related to the Remote Hardware module"
)
remoteHardwareArgs.add_argument(
"--gpio-wrb", nargs=2, help="Set a particular GPIO # to 1 or 0", action="append"
@@ -1647,10 +1722,11 @@ def initParser():
"--gpio-watch", help="Start watching a GPIO mask for changes (ex: '0x10')"
)
have_tunnel = platform.system() == "Linux"
if have_tunnel:
tunnelArgs = parser.add_argument_group('Tunnel', 'Arguments related to establishing a tunnel device over the mesh.')
tunnelArgs = parser.add_argument_group(
"Tunnel", "Arguments related to establishing a tunnel device over the mesh."
)
tunnelArgs.add_argument(
"--tunnel",
action="store_true",
@@ -1665,7 +1741,6 @@ def initParser():
parser.set_defaults(deprecated=None)
args = parser.parse_args()
mt_config.args = args
mt_config.parser = parser
@@ -1676,7 +1751,8 @@ def main():
parser = argparse.ArgumentParser(
add_help=False,
epilog="If no connection arguments are specified, we search for a compatible serial device, "
"and if none is found, then attempt a TCP connection to localhost.")
"and if none is found, then attempt a TCP connection to localhost.",
)
mt_config.parser = parser
initParser()
common()

View File

@@ -2,15 +2,16 @@
import argparse
import logging
from typing import cast
import dash_bootstrap_components as dbc
import dash_bootstrap_components as dbc # type: ignore[import-untyped]
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import plotly.express as px # type: ignore[import-untyped]
import plotly.graph_objects as go # type: ignore[import-untyped]
import pyarrow as pa
import pyarrow.feather as feather
from dash import Dash, Input, Output, callback, dash_table, dcc, html
from dash import Dash, dcc, html # type: ignore[import-untyped]
from pyarrow import feather
from .. import mesh_pb2, powermon_pb2
from ..slog import root_dir
@@ -60,7 +61,8 @@ def read_pandas(filepath: str) -> pd.DataFrame:
pa.float64(): pd.Float64Dtype(),
pa.string(): pd.StringDtype(),
}
return feather.read_table(filepath).to_pandas(types_mapper=dtype_mapping.get)
return cast(pd.DataFrame, feather.read_table(filepath).to_pandas(types_mapper=dtype_mapping.get)) # type: ignore[arg-type]
def get_pmon_raises(dslog: pd.DataFrame) -> pd.DataFrame:
@@ -87,6 +89,7 @@ def get_pmon_raises(dslog: pd.DataFrame) -> pd.DataFrame:
pmon_raises = pmon_events[pmon_events["pm_raises"].notnull()][["time", "pm_raises"]]
pmon_falls = pmon_events[pmon_events["pm_falls"].notnull()]
# pylint: disable=unused-variable
def get_endtime(row):
"""Find the corresponding fall event."""
following = pmon_falls[
@@ -134,13 +137,8 @@ def create_dash(slog_path: str) -> Dash:
"""
app = Dash(external_stylesheets=[dbc.themes.BOOTSTRAP])
parser = create_argparser()
args = parser.parse_args()
if not args.slog:
args.slog = f"{root_dir()}/latest"
dpwr = read_pandas(f"{args.slog}/power.feather")
dslog = read_pandas(f"{args.slog}/slog.feather")
dpwr = read_pandas(f"{slog_path}/power.feather")
dslog = read_pandas(f"{slog_path}/slog.feather")
pmon_raises = get_pmon_raises(dslog)
@@ -167,7 +165,9 @@ def create_dash(slog_path: str) -> Dash:
fig = go.Figure(data=max_pwr_points.data + avg_pwr_lines.data + pmon_points.data)
fig.update_layout(legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01))
fig.update_layout(
legend={"yanchor": "top", "y": 0.99, "xanchor": "left", "x": 0.01}
)
# App layout
app.layout = [
@@ -180,11 +180,16 @@ def create_dash(slog_path: str) -> Dash:
def main():
"""Entry point of the script."""
app = create_dash(slog_path="/home/kevinh/.local/share/meshtastic/slogs/latest")
parser = create_argparser()
args = parser.parse_args()
if not args.slog:
args.slog = f"{root_dir()}/latest"
app = create_dash(slog_path=args.slog)
port = 8051
logging.info(
f"Running Dash visualization webapp on port {port} (publicly accessible)"
)
logging.info(f"Running Dash visualization of {args.slog} (publicly accessible)")
app.run_server(debug=True, host="0.0.0.0", port=port)