diff --git a/.vscode/launch.json b/.vscode/launch.json index 3b82225..aac37db 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -204,7 +204,7 @@ "request": "launch", "module": "meshtastic", "justMyCode": false, - "args": ["--slog", "--power-ppk2-meter", "--power-stress", "--power-voltage", "3.3"] + "args": ["--slog", "--power-ppk2-meter", "--power-stress", "--power-voltage", "3.3", "--seriallog"] }, { "name": "meshtastic test", diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 3873e44..1bcfd56 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -897,8 +897,10 @@ def onConnected(interface): # if the user didn't ask for serial debugging output, we might want to exit after we've done our operation if (not args.seriallog) and closeNow: interface.close() # after running command then exit - if log_set: - log_set.close() + + # Close any structured logs after we've done all of our API operations + if log_set: + log_set.close() except Exception as ex: print(f"Aborting due to: {ex}") diff --git a/meshtastic/powermon/stress.py b/meshtastic/powermon/stress.py index 2a07c63..4e82508 100644 --- a/meshtastic/powermon/stress.py +++ b/meshtastic/powermon/stress.py @@ -95,11 +95,13 @@ class PowerStress: num_seconds = 5.0 states = [ powermon_pb2.PowerStressMessage.LED_ON, + powermon_pb2.PowerStressMessage.LED_OFF, powermon_pb2.PowerStressMessage.BT_OFF, powermon_pb2.PowerStressMessage.BT_ON, powermon_pb2.PowerStressMessage.CPU_FULLON, powermon_pb2.PowerStressMessage.CPU_IDLE, - powermon_pb2.PowerStressMessage.CPU_DEEPSLEEP, + # FIXME - can't test deepsleep yet because the ttyACM device disappears. Fix the python code to retry connections + # powermon_pb2.PowerStressMessage.CPU_DEEPSLEEP, ] for s in states: s_name = powermon_pb2.PowerStressMessage.Opcode.Name(s) diff --git a/meshtastic/slog/arrow.py b/meshtastic/slog/arrow.py index 0833704..be908ce 100644 --- a/meshtastic/slog/arrow.py +++ b/meshtastic/slog/arrow.py @@ -29,13 +29,22 @@ class ArrowWriter: self.writer.close() self.sink.close() + def set_schema(self, schema: pa.Schema): + """Set the schema for the file. + Only needed for datasets where we can't learn it from the first record written. + + schema (pa.Schema): The schema to use. + """ + assert self.schema is None + self.schema = schema + self.writer = pa.ipc.new_stream(self.sink, schema) + def _write(self): """Write the new rows to the file.""" if len(self.new_rows) > 0: if self.schema is None: # only need to look at the first row to learn the schema - self.schema = pa.Table.from_pylist([self.new_rows[0]]).schema - self.writer = pa.ipc.new_stream(self.sink, self.schema) + self.set_schema(pa.Table.from_pylist([self.new_rows[0]]).schema) self.writer.write_batch(pa.RecordBatch.from_pylist(self.new_rows)) self.new_rows = [] diff --git a/meshtastic/slog/slog.py b/meshtastic/slog/slog.py index d5c1998..91c6350 100644 --- a/meshtastic/slog/slog.py +++ b/meshtastic/slog/slog.py @@ -9,10 +9,12 @@ import threading import time from dataclasses import dataclass from datetime import datetime +from functools import reduce from typing import Optional import parse # type: ignore[import-untyped] import platformdirs +import pyarrow as pa from pubsub import pub # type: ignore[import-untyped] from meshtastic.mesh_interface import MeshInterface @@ -26,15 +28,29 @@ class LogDef: """Log definition.""" code: str # i.e. PM or B or whatever... see meshtastic slog documentation + fields: list[tuple[str, pa.DataType]] # A list of field names and their arrow types format: parse.Parser # A format string that can be used to parse the arguments - def __init__(self, code: str, fmt: str) -> None: + def __init__(self, code: str, fields: list[tuple[str, pa.DataType]]) -> None: """Initialize the LogDef object. code (str): The code. format (str): The format. + """ self.code = code + self.fields = fields + + fmt = "" + for idx, f in enumerate(fields): + if idx != 0: + fmt += "," + + # make the format string + suffix = ( + "" if f[1] == pa.string() else ":d" + ) # treat as a string or an int (the only types we have so far) + fmt += "{" + f[0] + suffix + "}" self.format = parse.compile(fmt) @@ -42,8 +58,9 @@ class LogDef: log_defs = { d.code: d for d in [ - LogDef("B", "{boardid:d},{version}"), - LogDef("PM", "{bitmask:d},{reason}"), + LogDef("B", [("board_id", pa.uint32()), ("sw_version", pa.string())]), + LogDef("PM", [("pm_mask", pa.uint64()), ("pm_reason", pa.string())]), + LogDef("PS", [("ps_state", pa.uint64())]), ] } log_regex = re.compile(".*S:([0-9A-Za-z]+):(.*)") @@ -99,7 +116,15 @@ class StructuredLogger: client (MeshInterface): The MeshInterface object to monitor. """ self.client = client + + # Setup the arrow writer (and its schema) self.writer = FeatherWriter(f"{dir_path}/slog") + all_fields = reduce( + (lambda x, y: x + y), map(lambda x: x.fields, log_defs.values()) + ) + + self.writer.set_schema(pa.schema(all_fields)) + self.raw_file: Optional[ io.TextIOWrapper ] = open( # pylint: disable=consider-using-with @@ -131,21 +156,20 @@ class StructuredLogger: src = m.group(1) args = m.group(2) args += " " # append a space so that if the last arg is an empty str it will still be accepted as a match - logging.debug(f"SLog {src}, reason: {args}") - if src != "PM": - logging.warning(f"Not yet handling structured log {src} (FIXME)") - else: - d = log_defs.get(src) - if d: - r = d.format.parse(args) # get the values with the correct types - if r: - di = r.named - di["time"] = datetime.now() - self.writer.add_row(di) - else: - logging.warning(f"Failed to parse slog {line} with {d.format}") + logging.debug(f"SLog {src}, args: {args}") + + d = log_defs.get(src) + if d: + r = d.format.parse(args) # get the values with the correct types + if r: + di = r.named + di["time"] = datetime.now() + self.writer.add_row(di) else: - logging.warning(f"Unknown Structured Log: {line}") + logging.warning(f"Failed to parse slog {line} with {d.format}") + else: + logging.warning(f"Unknown Structured Log: {line}") + if self.raw_file: self.raw_file.write(line + "\n") # Write the raw log