diff --git a/.vscode/settings.json b/.vscode/settings.json index a1fd574..ba9060a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -4,6 +4,7 @@ "boardid", "Meshtastic", "powermon", + "pyarrow", "TORADIO", "Vids" ], diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 323d13e..1287f4f 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -21,8 +21,8 @@ from meshtastic import channel_pb2, config_pb2, portnums_pb2, remote_hardware, B from meshtastic.version import get_active_version from meshtastic.ble_interface import BLEInterface from meshtastic.mesh_interface import MeshInterface -from meshtastic.powermon import RidenPowerSupply, PPK2PowerSupply -from meshtastic.slog import StructuredLogger +from meshtastic.powermon import RidenPowerSupply, PPK2PowerSupply, SimPowerSupply +from meshtastic.slog import LogSet def onReceive(packet, interface): """Callback invoked when a packet arrives""" @@ -1090,21 +1090,26 @@ def common(): # We assume client is fully connected now onConnected(client) + # Setup power meters meter = None # assume no power meter if args.power_riden: meter = RidenPowerSupply(args.power_riden) - elif args.power_ppk2: + elif args.power_ppk2_supply or args.power_ppk2_meter: meter = PPK2PowerSupply() + meter.setIsSupply(args.power_ppk2_supply) + elif args.power_sim: + meter = SimPowerSupply() if meter and args.power_voltage: v = float(args.power_voltage) - if v < 1.0 or v >5.0: + if v < 0.5 or v >5.0: meshtastic.util.our_exit("Voltage must be between 1.0 and 5.0") logging.info(f"Setting power supply to {v} volts") meter.v = v meter.powerOn() - StructuredLogger(client, meter) + # Setup loggers + LogSet(client, meter) have_tunnel = platform.system() == "Linux" if ( @@ -1520,14 +1525,28 @@ def initParser(): action="store_true", ) - group.add_argument( + power_supply_group = group.add_mutually_exclusive_group() + + power_supply_group.add_argument( "--power-riden", help="Talk to a Riden power-supply. You must specify the device path, i.e. /dev/ttyUSBxxx", ) - group.add_argument( - "--power-ppk2", - help="Talk to a Nordic Power Profiler Kit 2", + power_supply_group.add_argument( + "--power-ppk2-meter", + help="Talk to a Nordic Power Profiler Kit 2 (in meter mode)", + action="store_true", + ) + + power_supply_group.add_argument( + "--power-ppk2-supply", + help="Talk to a Nordic Power Profiler Kit 2 (in supply mode)", + action="store_true", + ) + + power_supply_group.add_argument( + "--power-sim", + help="Use a simulated power meter (for development)", action="store_true", ) diff --git a/meshtastic/slog/__init__.py b/meshtastic/slog/__init__.py index a96ae8e..acd5d21 100644 --- a/meshtastic/slog/__init__.py +++ b/meshtastic/slog/__init__.py @@ -1,3 +1,3 @@ -"""Structured logging framework (see dev docs for more info)""" +"""Structured logging framework (see dev docs for more info).""" -from .slog import StructuredLogger +from .slog import LogSet diff --git a/meshtastic/slog/arrow.py b/meshtastic/slog/arrow.py new file mode 100644 index 0000000..2342c9b --- /dev/null +++ b/meshtastic/slog/arrow.py @@ -0,0 +1,42 @@ +import pyarrow as pa + +chunk_size = 10 # disk writes are batched based on this number of rows + + +class ArrowWriter: + """Writes an arrow file in a streaming fashion""" + + def __init__(self, file_name: str): + """Create a new ArrowWriter object. + + file_name (str): The name of the file to write to. + """ + self.sink = pa.OSFile(file_name, "wb") + self.new_rows: list[dict] = [] + self.schema: pa.Schema | None = None # haven't yet learned the schema + self.writer: pa.RecordBatchFileWriter | None = None + + def close(self): + """Close the stream and writes the file as needed.""" + self._write() + if self.writer: + self.writer.close() + self.sink.close() + + def _write(self): + """Write the new rows to the file.""" + if len(self.new_rows) > 0: + if self.schema is None: + self.schema = pa.Table.from_pylist(self.new_rows).schema + self.writer = pa.ipc.new_stream(self.sink, self.schema) + + self.writer.write_batch(pa.RecordBatch.from_pylist(self.new_rows)) + self.new_rows = [] + + def add_row(self, row_dict: dict): + """Add a row to the arrow file. + We will automatically learn the schema from the first row. But all rows must use that schema. + """ + self.new_rows.append(row_dict) + if len(self.new_rows) >= chunk_size: + self._write() diff --git a/meshtastic/slog/slog.py b/meshtastic/slog/slog.py index ab47414..0c480e7 100644 --- a/meshtastic/slog/slog.py +++ b/meshtastic/slog/slog.py @@ -1,96 +1,104 @@ """code logging power consumption of meshtastic devices.""" +import atexit import logging import re -import atexit -from datetime import datetime +import threading +import time from dataclasses import dataclass +from datetime import datetime import parse -import pandas as pd -from pubsub import pub # type: ignore[import-untyped] +from pubsub import pub # type: ignore[import-untyped] from meshtastic.mesh_interface import MeshInterface from meshtastic.powermon import PowerMeter +from .arrow import ArrowWriter + @dataclass(init=False) class LogDef: """Log definition.""" - code: str # i.e. PM or B or whatever... see meshtastic slog documentation - format: str # A format string that can be used to parse the arguments + + code: str # i.e. PM or B or whatever... see meshtastic slog documentation + format: str # A format string that can be used to parse the arguments def __init__(self, code: str, fmt: str) -> None: """Initialize the LogDef object. - code (str): The code. - format (str): The format. + code (str): The code. + format (str): The format. """ self.code = code self.format = parse.compile(fmt) + """A dictionary mapping from logdef code to logdef""" -log_defs = {d.code: d for d in [ - LogDef("B", "{boardid:d},{version}"), - LogDef("PM", "{bitmask:d},{reason}") - ]} +log_defs = { + d.code: d + for d in [ + LogDef("B", "{boardid:d},{version}"), + LogDef("PM", "{bitmask:d},{reason}"), + ] +} log_regex = re.compile(".*S:([0-9A-Za-z]+):(.*)") +class PowerLogger: + """Logs current watts reading periodically using PowerMeter and ArrowWriter.""" + + def __init__(self, pMeter: PowerMeter, file_path: str, interval=0.2) -> None: + """Initialize the PowerLogger object.""" + self.pMeter = pMeter + self.writer = ArrowWriter(file_path) + self.interval = interval + self.is_logging = True + self.thread = threading.Thread(target=self._logging_thread, name="PowerLogger") + self.thread.start() + + def _logging_thread(self) -> None: + """Background thread for logging the current watts reading.""" + while self.is_logging: + watts = self.pMeter.getWatts() + d = {"time": datetime.now(), "watts": watts} + self.writer.add_row(d) + time.sleep(self.interval) + + def close(self) -> None: + """Close the PowerLogger and stop logging.""" + if self.is_logging: + self.is_logging = False + self.thread.join() + self.writer.close() + +# FIXME move these defs somewhere else +TOPIC_MESHTASTIC_LOG_LINE = "meshtastic.log.line" + class StructuredLogger: """Sniffs device logs for structured log messages, extracts those into pandas/CSV format.""" - def __init__(self, client: MeshInterface, pMeter: PowerMeter = None) -> None: + def __init__(self, client: MeshInterface, file_path: str) -> None: """Initialize the PowerMonClient object. - power (PowerSupply): The power supply object. - client (MeshInterface): The MeshInterface object to monitor. + power (PowerSupply): The power supply object. + client (MeshInterface): The MeshInterface object to monitor. """ self.client = client - self.pMeter = pMeter - self.columns = ["time", "power"] - self.rawData = pd.DataFrame(columns=self.columns) # use time as the index - # self.rawData.set_index("time", inplace=True) + self.writer = ArrowWriter(file_path) + self.listener = pub.subscribe(self._onLogMessage, TOPIC_MESHTASTIC_LOG_LINE) - # for efficiency reasons we keep new data in a list - only adding to rawData when needed - self.newData: list[dict] = [] + def close(self) -> None: + """Stop logging.""" + pub.unsubscribe(self.listener, TOPIC_MESHTASTIC_LOG_LINE) + self.writer.close() - atexit.register(self._exitHandler) - pub.subscribe(self._onLogMessage, "meshtastic.log.line") - - def getRawData(self) -> pd.DataFrame: - """Get the raw data. - - Returns - ------- - pd.DataFrame: The raw data. - """ - - df = pd.DataFrame(self.newData) - - # We prefer some columns to be integers - intcols = [ "bitmask" ] - for c in intcols: - if c in df: - df[c] = df[c].astype('Int64') - - # df.set_index("time") - # Add new data, creating new columns as needed (an outer join) - self.rawData = pd.concat([self.rawData, df], axis=0, ignore_index=True) - self.newData = [] - - return self.rawData - - def _exitHandler(self) -> None: - """Exit handler.""" - fn = "/tmp/powermon.slog" # Find a better place - logging.info(f"Storing slog in {fn}") - self.getRawData().to_csv(fn) - - def _onLogMessage(self, line: str, interface: MeshInterface) -> None: # pylint: disable=unused-argument + def _onLogMessage( + self, line: str, interface: MeshInterface + ) -> None: # pylint: disable=unused-argument """Handle log messages. - line (str): the line of log output + line (str): the line of log output """ m = log_regex.match(line) if m: @@ -101,15 +109,44 @@ class StructuredLogger: logging.debug(f"SLog {src}, reason: {args}") d = log_defs.get(src) if d: - r = d.format.parse(args) # get the values with the correct types + r = d.format.parse(args) # get the values with the correct types if r: di = r.named di["time"] = datetime.now() - if self.pMeter: # if we have a power meter include a fresh power reading - di["power"] = self.pMeter.getWatts() - self.newData.append(di) - self.getRawData() + self.writer.add_row(di) else: logging.warning(f"Failed to parse slog {line} with {d.format}") else: logging.warning(f"Unknown Structured Log: {line}") + + +class LogSet: + """A complete set of meshtastic log/metadata for a particular run.""" + + def __init__(self, client: MeshInterface, power_meter: PowerMeter = None) -> None: + """Initialize the PowerMonClient object. + + power (PowerSupply): The power supply object. + client (MeshInterface): The MeshInterface object to monitor. + """ + self.dir_name = "/tmp" # FIXME + + self.slog_logger = StructuredLogger(client, f"{self.dir_name}/slog.arrow") + if power_meter: + self.power_logger = PowerLogger(power_meter, f"{self.dir_name}/power.arrow") + else: + self.power_logger = None + + atexit.register(self._exitHandler) + + def close(self) -> None: + """Close the log set.""" + + logging.info(f"Storing slog in {self.dir_name}") + self.slog_logger.close() + if self.power_logger: + self.power_logger.close() + + def _exitHandler(self) -> None: + """Exit handler.""" + self.close()