Use .feather files as our long-term representation

This commit is contained in:
Kevin Hester
2024-06-26 11:12:02 -07:00
parent f2c427430c
commit 320bb30d29
2 changed files with 37 additions and 4 deletions

View File

@@ -1,6 +1,10 @@
"""Utilities for Apache Arrow serialization."""
import logging
import os
import pyarrow as pa
import pyarrow.feather as feather
chunk_size = 1000 # disk writes are batched based on this number of rows
@@ -42,3 +46,32 @@ class ArrowWriter:
self.new_rows.append(row_dict)
if len(self.new_rows) >= chunk_size:
self._write()
class FeatherWriter(ArrowWriter):
"""A smaller more interoperable version of arrow files.
Uses a temporary .arrow file (which could be huge) but converts to a much smaller (but still fast)
feather file.
"""
def __init__(self, file_name: str):
super().__init__(file_name + ".arrow")
self.base_file_name = file_name
def close(self):
super().close()
src_name = self.base_file_name + ".arrow"
dest_name = self.base_file_name + ".feather"
if os.path.getsize(src_name) == 0:
logging.warning(f"Discarding empty file: {src_name}")
os.remove(src_name)
else:
logging.info(f"Compressing log data into {dest_name}")
# note: must use open_stream, not open_file/read_table because the streaming layout is different
# data = feather.read_table(src_name)
with pa.memory_map(src_name) as source:
array = pa.ipc.open_stream(source).read_all()
# See https://stackoverflow.com/a/72406099 for more info and performance testing measurements
feather.write_feather(array, dest_name, compression="zstd")

View File

@@ -17,7 +17,7 @@ from pubsub import pub # type: ignore[import-untyped]
from meshtastic.mesh_interface import MeshInterface
from meshtastic.powermon import PowerMeter
from .arrow import ArrowWriter
from .arrow import FeatherWriter
@dataclass(init=False)
@@ -54,7 +54,7 @@ class PowerLogger:
def __init__(self, pMeter: PowerMeter, file_path: str, interval=0.2) -> None:
"""Initialize the PowerLogger object."""
self.pMeter = pMeter
self.writer = ArrowWriter(file_path)
self.writer = FeatherWriter(file_path)
self.interval = interval
self.is_logging = True
self.thread = threading.Thread(
@@ -98,7 +98,7 @@ class StructuredLogger:
client (MeshInterface): The MeshInterface object to monitor.
"""
self.client = client
self.writer = ArrowWriter(f"{dir_path}/slog.arrow")
self.writer = FeatherWriter(f"{dir_path}/slog")
self.raw_file = open( # pylint: disable=consider-using-with
f"{dir_path}/raw.txt", "w", encoding="utf8"
)
@@ -175,7 +175,7 @@ class LogSet:
self.power_logger: Optional[PowerLogger] = (
None
if not power_meter
else PowerLogger(power_meter, f"{self.dir_name}/power.arrow")
else PowerLogger(power_meter, f"{self.dir_name}/power")
)
# Store a lambda so we can find it again to unregister