Files
python/meshtastic/analysis/__main__.py
2024-07-31 13:40:29 -07:00

163 lines
5.3 KiB
Python

"""Post-run analysis tools for meshtastic."""
import logging
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import pyarrow as pa
import pyarrow.feather as feather
from dash import Dash, Input, Output, callback, dash_table, dcc, html
import dash_bootstrap_components as dbc
from .. import mesh_pb2, powermon_pb2
# per https://arrow.apache.org/docs/python/pandas.html#reducing-memory-use-in-table-to-pandas
# use this to get nullable int fields treated as ints rather than floats in pandas
dtype_mapping = {
pa.int8(): pd.Int8Dtype(),
pa.int16(): pd.Int16Dtype(),
pa.int32(): pd.Int32Dtype(),
pa.int64(): pd.Int64Dtype(),
pa.uint8(): pd.UInt8Dtype(),
pa.uint16(): pd.UInt16Dtype(),
pa.uint32(): pd.UInt32Dtype(),
pa.uint64(): pd.UInt64Dtype(),
pa.bool_(): pd.BooleanDtype(),
pa.float32(): pd.Float32Dtype(),
pa.float64(): pd.Float64Dtype(),
pa.string(): pd.StringDtype(),
}
# Configure panda options
pd.options.mode.copy_on_write = True
def to_pmon_names(arr) -> list[str]:
"""Convert the power monitor state numbers to their corresponding names.
arr (list): List of power monitor state numbers.
Returns the List of corresponding power monitor state names.
"""
def to_pmon_name(n):
try:
s = powermon_pb2.PowerMon.State.Name(int(n))
return s if s != "None" else None
except ValueError:
return None
return [to_pmon_name(x) for x in arr]
def read_pandas(filepath: str) -> pd.DataFrame:
"""Read a feather file and convert it to a pandas DataFrame.
filepath (str): Path to the feather file.
Returns the pandas DataFrame.
"""
return feather.read_table(filepath).to_pandas(types_mapper=dtype_mapping.get)
def get_pmon_raises(dslog: pd.DataFrame) -> pd.DataFrame:
"""Get the power monitor raises from the slog DataFrame.
dslog (pd.DataFrame): The slog DataFrame.
Returns the DataFrame containing the power monitor raises.
"""
pmon_events = dslog[dslog["pm_mask"].notnull()]
pm_masks = pd.Series(pmon_events["pm_mask"]).to_numpy()
# possible to do this with pandas rolling windows if I was smarter?
pm_changes = [(pm_masks[i - 1] ^ x if i != 0 else x) for i, x in enumerate(pm_masks)]
pm_raises = [(pm_masks[i] & x) for i, x in enumerate(pm_changes)]
pm_falls = [(~pm_masks[i] & x if i != 0 else 0) for i, x in enumerate(pm_changes)]
pmon_events["pm_raises"] = to_pmon_names(pm_raises)
pmon_events["pm_falls"] = to_pmon_names(pm_falls)
pmon_raises = pmon_events[pmon_events["pm_raises"].notnull()][["time", "pm_raises"]]
pmon_falls = pmon_events[pmon_events["pm_falls"].notnull()]
def get_endtime(row):
"""Find the corresponding fall event."""
following = pmon_falls[(pmon_falls["pm_falls"] == row["pm_raises"]) &
(pmon_falls["time"] > row["time"])]
return following.iloc[0] if not following.empty else None
# HMM - setting end_time doesn't work yet - leave off for now
# pmon_raises['end_time'] = pmon_raises.apply(get_endtime, axis=1)
return pmon_raises
def get_board_info(dslog: pd.DataFrame) -> tuple:
"""Get the board information from the slog DataFrame.
dslog (pd.DataFrame): The slog DataFrame.
Returns a tuple containing the board ID and software version.
"""
board_info = dslog[dslog["sw_version"].notnull()]
sw_version = board_info.iloc[0]["sw_version"]
board_id = mesh_pb2.HardwareModel.Name(board_info.iloc[0]["board_id"])
return (board_id, sw_version)
def create_dash(slog_path: str) -> Dash:
"""Create a Dash application for visualizing power consumption data.
slog_path (str): Path to the slog directory.
Returns the Dash application.
"""
app = Dash(
external_stylesheets=[dbc.themes.BOOTSTRAP]
)
dpwr = read_pandas(f"{slog_path}/power.feather")
dslog = read_pandas(f"{slog_path}/slog.feather")
pmon_raises = get_pmon_raises(dslog)
def set_legend(f, name):
f["data"][0]["showlegend"] = True
f["data"][0]["name"] = name
return f
avg_pwr_lines = px.line(dpwr, x="time", y="average_mW").update_traces(
line_color="red"
)
set_legend(avg_pwr_lines, "avg power")
max_pwr_points = px.scatter(dpwr, x="time", y="max_mW").update_traces(
marker_color="blue"
)
set_legend(max_pwr_points, "max power")
min_pwr_points = px.scatter(dpwr, x="time", y="min_mW").update_traces(
marker_color="green"
)
set_legend(min_pwr_points, "min power")
fake_y = np.full(len(pmon_raises), 10.0)
pmon_points = px.scatter(pmon_raises, x="time", y=fake_y, text="pm_raises")
fig = go.Figure(data=max_pwr_points.data + avg_pwr_lines.data + pmon_points.data)
fig.update_layout(legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01))
# App layout
app.layout = [
html.Div(children="Early Meshtastic power analysis tool testing..."),
dcc.Graph(figure=fig),
]
return app
def main():
"""Entry point of the script."""
app = create_dash(slog_path="/home/kevinh/.local/share/meshtastic/slogs/latest")
port = 8051
logging.info(f"Running Dash visualization webapp on port {port} (publicly accessible)")
app.run_server(debug=True, host='0.0.0.0', port=port)
if __name__ == "__main__":
main()