Switch from pandas to apache arrow for live data logging (better streaming)

This commit is contained in:
Kevin Hester
2024-06-24 15:26:05 -07:00
parent 43e1f65a75
commit 7ce7d73e89
5 changed files with 170 additions and 71 deletions

View File

@@ -4,6 +4,7 @@
"boardid",
"Meshtastic",
"powermon",
"pyarrow",
"TORADIO",
"Vids"
],

View File

@@ -21,8 +21,8 @@ from meshtastic import channel_pb2, config_pb2, portnums_pb2, remote_hardware, B
from meshtastic.version import get_active_version
from meshtastic.ble_interface import BLEInterface
from meshtastic.mesh_interface import MeshInterface
from meshtastic.powermon import RidenPowerSupply, PPK2PowerSupply
from meshtastic.slog import StructuredLogger
from meshtastic.powermon import RidenPowerSupply, PPK2PowerSupply, SimPowerSupply
from meshtastic.slog import LogSet
def onReceive(packet, interface):
"""Callback invoked when a packet arrives"""
@@ -1090,21 +1090,26 @@ def common():
# We assume client is fully connected now
onConnected(client)
# Setup power meters
meter = None # assume no power meter
if args.power_riden:
meter = RidenPowerSupply(args.power_riden)
elif args.power_ppk2:
elif args.power_ppk2_supply or args.power_ppk2_meter:
meter = PPK2PowerSupply()
meter.setIsSupply(args.power_ppk2_supply)
elif args.power_sim:
meter = SimPowerSupply()
if meter and args.power_voltage:
v = float(args.power_voltage)
if v < 1.0 or v >5.0:
if v < 0.5 or v >5.0:
meshtastic.util.our_exit("Voltage must be between 1.0 and 5.0")
logging.info(f"Setting power supply to {v} volts")
meter.v = v
meter.powerOn()
StructuredLogger(client, meter)
# Setup loggers
LogSet(client, meter)
have_tunnel = platform.system() == "Linux"
if (
@@ -1520,14 +1525,28 @@ def initParser():
action="store_true",
)
group.add_argument(
power_supply_group = group.add_mutually_exclusive_group()
power_supply_group.add_argument(
"--power-riden",
help="Talk to a Riden power-supply. You must specify the device path, i.e. /dev/ttyUSBxxx",
)
group.add_argument(
"--power-ppk2",
help="Talk to a Nordic Power Profiler Kit 2",
power_supply_group.add_argument(
"--power-ppk2-meter",
help="Talk to a Nordic Power Profiler Kit 2 (in meter mode)",
action="store_true",
)
power_supply_group.add_argument(
"--power-ppk2-supply",
help="Talk to a Nordic Power Profiler Kit 2 (in supply mode)",
action="store_true",
)
power_supply_group.add_argument(
"--power-sim",
help="Use a simulated power meter (for development)",
action="store_true",
)

View File

@@ -1,3 +1,3 @@
"""Structured logging framework (see dev docs for more info)"""
"""Structured logging framework (see dev docs for more info)."""
from .slog import StructuredLogger
from .slog import LogSet

42
meshtastic/slog/arrow.py Normal file
View File

@@ -0,0 +1,42 @@
import pyarrow as pa
chunk_size = 10 # disk writes are batched based on this number of rows
class ArrowWriter:
"""Writes an arrow file in a streaming fashion"""
def __init__(self, file_name: str):
"""Create a new ArrowWriter object.
file_name (str): The name of the file to write to.
"""
self.sink = pa.OSFile(file_name, "wb")
self.new_rows: list[dict] = []
self.schema: pa.Schema | None = None # haven't yet learned the schema
self.writer: pa.RecordBatchFileWriter | None = None
def close(self):
"""Close the stream and writes the file as needed."""
self._write()
if self.writer:
self.writer.close()
self.sink.close()
def _write(self):
"""Write the new rows to the file."""
if len(self.new_rows) > 0:
if self.schema is None:
self.schema = pa.Table.from_pylist(self.new_rows).schema
self.writer = pa.ipc.new_stream(self.sink, self.schema)
self.writer.write_batch(pa.RecordBatch.from_pylist(self.new_rows))
self.new_rows = []
def add_row(self, row_dict: dict):
"""Add a row to the arrow file.
We will automatically learn the schema from the first row. But all rows must use that schema.
"""
self.new_rows.append(row_dict)
if len(self.new_rows) >= chunk_size:
self._write()

View File

@@ -1,96 +1,104 @@
"""code logging power consumption of meshtastic devices."""
import atexit
import logging
import re
import atexit
from datetime import datetime
import threading
import time
from dataclasses import dataclass
from datetime import datetime
import parse
import pandas as pd
from pubsub import pub # type: ignore[import-untyped]
from pubsub import pub # type: ignore[import-untyped]
from meshtastic.mesh_interface import MeshInterface
from meshtastic.powermon import PowerMeter
from .arrow import ArrowWriter
@dataclass(init=False)
class LogDef:
"""Log definition."""
code: str # i.e. PM or B or whatever... see meshtastic slog documentation
format: str # A format string that can be used to parse the arguments
code: str # i.e. PM or B or whatever... see meshtastic slog documentation
format: str # A format string that can be used to parse the arguments
def __init__(self, code: str, fmt: str) -> None:
"""Initialize the LogDef object.
code (str): The code.
format (str): The format.
code (str): The code.
format (str): The format.
"""
self.code = code
self.format = parse.compile(fmt)
"""A dictionary mapping from logdef code to logdef"""
log_defs = {d.code: d for d in [
LogDef("B", "{boardid:d},{version}"),
LogDef("PM", "{bitmask:d},{reason}")
]}
log_defs = {
d.code: d
for d in [
LogDef("B", "{boardid:d},{version}"),
LogDef("PM", "{bitmask:d},{reason}"),
]
}
log_regex = re.compile(".*S:([0-9A-Za-z]+):(.*)")
class PowerLogger:
"""Logs current watts reading periodically using PowerMeter and ArrowWriter."""
def __init__(self, pMeter: PowerMeter, file_path: str, interval=0.2) -> None:
"""Initialize the PowerLogger object."""
self.pMeter = pMeter
self.writer = ArrowWriter(file_path)
self.interval = interval
self.is_logging = True
self.thread = threading.Thread(target=self._logging_thread, name="PowerLogger")
self.thread.start()
def _logging_thread(self) -> None:
"""Background thread for logging the current watts reading."""
while self.is_logging:
watts = self.pMeter.getWatts()
d = {"time": datetime.now(), "watts": watts}
self.writer.add_row(d)
time.sleep(self.interval)
def close(self) -> None:
"""Close the PowerLogger and stop logging."""
if self.is_logging:
self.is_logging = False
self.thread.join()
self.writer.close()
# FIXME move these defs somewhere else
TOPIC_MESHTASTIC_LOG_LINE = "meshtastic.log.line"
class StructuredLogger:
"""Sniffs device logs for structured log messages, extracts those into pandas/CSV format."""
def __init__(self, client: MeshInterface, pMeter: PowerMeter = None) -> None:
def __init__(self, client: MeshInterface, file_path: str) -> None:
"""Initialize the PowerMonClient object.
power (PowerSupply): The power supply object.
client (MeshInterface): The MeshInterface object to monitor.
power (PowerSupply): The power supply object.
client (MeshInterface): The MeshInterface object to monitor.
"""
self.client = client
self.pMeter = pMeter
self.columns = ["time", "power"]
self.rawData = pd.DataFrame(columns=self.columns) # use time as the index
# self.rawData.set_index("time", inplace=True)
self.writer = ArrowWriter(file_path)
self.listener = pub.subscribe(self._onLogMessage, TOPIC_MESHTASTIC_LOG_LINE)
# for efficiency reasons we keep new data in a list - only adding to rawData when needed
self.newData: list[dict] = []
def close(self) -> None:
"""Stop logging."""
pub.unsubscribe(self.listener, TOPIC_MESHTASTIC_LOG_LINE)
self.writer.close()
atexit.register(self._exitHandler)
pub.subscribe(self._onLogMessage, "meshtastic.log.line")
def getRawData(self) -> pd.DataFrame:
"""Get the raw data.
Returns
-------
pd.DataFrame: The raw data.
"""
df = pd.DataFrame(self.newData)
# We prefer some columns to be integers
intcols = [ "bitmask" ]
for c in intcols:
if c in df:
df[c] = df[c].astype('Int64')
# df.set_index("time")
# Add new data, creating new columns as needed (an outer join)
self.rawData = pd.concat([self.rawData, df], axis=0, ignore_index=True)
self.newData = []
return self.rawData
def _exitHandler(self) -> None:
"""Exit handler."""
fn = "/tmp/powermon.slog" # Find a better place
logging.info(f"Storing slog in {fn}")
self.getRawData().to_csv(fn)
def _onLogMessage(self, line: str, interface: MeshInterface) -> None: # pylint: disable=unused-argument
def _onLogMessage(
self, line: str, interface: MeshInterface
) -> None: # pylint: disable=unused-argument
"""Handle log messages.
line (str): the line of log output
line (str): the line of log output
"""
m = log_regex.match(line)
if m:
@@ -101,15 +109,44 @@ class StructuredLogger:
logging.debug(f"SLog {src}, reason: {args}")
d = log_defs.get(src)
if d:
r = d.format.parse(args) # get the values with the correct types
r = d.format.parse(args) # get the values with the correct types
if r:
di = r.named
di["time"] = datetime.now()
if self.pMeter: # if we have a power meter include a fresh power reading
di["power"] = self.pMeter.getWatts()
self.newData.append(di)
self.getRawData()
self.writer.add_row(di)
else:
logging.warning(f"Failed to parse slog {line} with {d.format}")
else:
logging.warning(f"Unknown Structured Log: {line}")
class LogSet:
"""A complete set of meshtastic log/metadata for a particular run."""
def __init__(self, client: MeshInterface, power_meter: PowerMeter = None) -> None:
"""Initialize the PowerMonClient object.
power (PowerSupply): The power supply object.
client (MeshInterface): The MeshInterface object to monitor.
"""
self.dir_name = "/tmp" # FIXME
self.slog_logger = StructuredLogger(client, f"{self.dir_name}/slog.arrow")
if power_meter:
self.power_logger = PowerLogger(power_meter, f"{self.dir_name}/power.arrow")
else:
self.power_logger = None
atexit.register(self._exitHandler)
def close(self) -> None:
"""Close the log set."""
logging.info(f"Storing slog in {self.dir_name}")
self.slog_logger.close()
if self.power_logger:
self.power_logger.close()
def _exitHandler(self) -> None:
"""Exit handler."""
self.close()