diff --git a/meshtastic/analysis/__main__.py b/meshtastic/analysis/__main__.py index 7d080b5..b15b893 100644 --- a/meshtastic/analysis/__main__.py +++ b/meshtastic/analysis/__main__.py @@ -29,42 +29,16 @@ dtype_mapping = { pa.string(): pd.StringDtype(), } -# sdir = '/home/kevinh/.local/share/meshtastic/slogs/20240626-152804' -sdir = "/home/kevinh/.local/share/meshtastic/slogs/latest" -dpwr = feather.read_table(f"{sdir}/power.feather").to_pandas( - types_mapper=dtype_mapping.get -) -dslog = feather.read_table(f"{sdir}/slog.feather").to_pandas( - types_mapper=dtype_mapping.get -) - - -def get_board_info(): - """Get the board information from the slog dataframe. - - tuple: A tuple containing the board ID and software version. - """ - board_info = dslog[dslog["sw_version"].notnull()] - sw_version = board_info.iloc[0]["sw_version"] - board_id = mesh_pb2.HardwareModel.Name(board_info.iloc[0]["board_id"]) - return (board_id, sw_version) - - -pmon_events = dslog[dslog["pm_mask"].notnull()] - - -pm_masks = pd.Series(pmon_events["pm_mask"]).to_numpy() - -# possible to do this with pandas rolling windows if I was smarter? -pm_changes = [(pm_masks[i - 1] ^ x if i != 0 else x) for i, x in enumerate(pm_masks)] -pm_raises = [(pm_masks[i] & x) for i, x in enumerate(pm_changes)] -pm_falls = [(~pm_masks[i] & x if i != 0 else 0) for i, x in enumerate(pm_changes)] - +# Configure panda options +pd.options.mode.copy_on_write = True def to_pmon_names(arr) -> list[str]: """Convert the power monitor state numbers to their corresponding names. - """ + arr (list): List of power monitor state numbers. + + Returns the List of corresponding power monitor state names. + """ def to_pmon_name(n): try: s = powermon_pb2.PowerMon.State.Name(int(n)) @@ -74,46 +48,97 @@ def to_pmon_names(arr) -> list[str]: return [to_pmon_name(x) for x in arr] +def read_pandas(filepath: str) -> pd.DataFrame: + """Read a feather file and convert it to a pandas DataFrame. -pd.options.mode.copy_on_write = True -pmon_events["pm_raises"] = to_pmon_names(pm_raises) -pmon_events["pm_falls"] = to_pmon_names(pm_falls) + filepath (str): Path to the feather file. -pmon_raises = pmon_events[pmon_events["pm_raises"].notnull()] + Returns the pandas DataFrame. + """ + return feather.read_table(filepath).to_pandas(types_mapper=dtype_mapping.get) +def get_pmon_raises(dslog: pd.DataFrame) -> pd.DataFrame: + """Get the power monitor raises from the slog DataFrame. -def create_dash(): - """Create a Dash application for visualizing power consumption data.""" + dslog (pd.DataFrame): The slog DataFrame. + + Returns the DataFrame containing the power monitor raises. + """ + pmon_events = dslog[dslog["pm_mask"].notnull()] + + pm_masks = pd.Series(pmon_events["pm_mask"]).to_numpy() + + # possible to do this with pandas rolling windows if I was smarter? + pm_changes = [(pm_masks[i - 1] ^ x if i != 0 else x) for i, x in enumerate(pm_masks)] + pm_raises = [(pm_masks[i] & x) for i, x in enumerate(pm_changes)] + pm_falls = [(~pm_masks[i] & x if i != 0 else 0) for i, x in enumerate(pm_changes)] + + pmon_events["pm_raises"] = to_pmon_names(pm_raises) + pmon_events["pm_falls"] = to_pmon_names(pm_falls) + + pmon_raises = pmon_events[pmon_events["pm_raises"].notnull()][["time", "pm_raises"]] + pmon_falls = pmon_events[pmon_events["pm_falls"].notnull()] + + def get_endtime(row): + """Find the corresponding fall event.""" + following = pmon_falls[(pmon_falls["pm_falls"] == row["pm_raises"]) & + (pmon_falls["time"] > row["time"])] + return following.iloc[0] if not following.empty else None + + # HMM - setting end_time doesn't work yet - leave off for now + # pmon_raises['end_time'] = pmon_raises.apply(get_endtime, axis=1) + + return pmon_raises + +def get_board_info(dslog: pd.DataFrame) -> tuple: + """Get the board information from the slog DataFrame. + + dslog (pd.DataFrame): The slog DataFrame. + + Returns a tuple containing the board ID and software version. + """ + board_info = dslog[dslog["sw_version"].notnull()] + sw_version = board_info.iloc[0]["sw_version"] + board_id = mesh_pb2.HardwareModel.Name(board_info.iloc[0]["board_id"]) + return (board_id, sw_version) + +def create_dash(slog_path: str) -> Dash: + """Create a Dash application for visualizing power consumption data. + + slog_path (str): Path to the slog directory. + + Returns the Dash application. + """ app = Dash( external_stylesheets=[dbc.themes.BOOTSTRAP] ) + dpwr = read_pandas(f"{slog_path}/power.feather") + dslog = read_pandas(f"{slog_path}/slog.feather") + + pmon_raises = get_pmon_raises(dslog) + def set_legend(f, name): f["data"][0]["showlegend"] = True f["data"][0]["name"] = name return f - df = dpwr - avg_pwr_lines = px.line(df, x="time", y="average_mW").update_traces( + avg_pwr_lines = px.line(dpwr, x="time", y="average_mW").update_traces( line_color="red" ) set_legend(avg_pwr_lines, "avg power") - max_pwr_points = px.scatter(df, x="time", y="max_mW").update_traces( + max_pwr_points = px.scatter(dpwr, x="time", y="max_mW").update_traces( marker_color="blue" ) set_legend(max_pwr_points, "max power") - min_pwr_points = px.scatter(df, x="time", y="min_mW").update_traces( + min_pwr_points = px.scatter(dpwr, x="time", y="min_mW").update_traces( marker_color="green" ) set_legend(min_pwr_points, "min power") - pmon = pmon_raises - fake_y = np.full(len(pmon), 10.0) - pmon_points = px.scatter(pmon, x="time", y=fake_y, text="pm_raises") + fake_y = np.full(len(pmon_raises), 10.0) + pmon_points = px.scatter(pmon_raises, x="time", y=fake_y, text="pm_raises") - # fig = avg_pwr_lines - # fig.add_trace(max_pwr_points) - # don't show minpower because not that interesting: min_pwr_points.data fig = go.Figure(data=max_pwr_points.data + avg_pwr_lines.data + pmon_points.data) fig.update_layout(legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01)) @@ -121,20 +146,17 @@ def create_dash(): # App layout app.layout = [ html.Div(children="Early Meshtastic power analysis tool testing..."), - # dash_table.DataTable(data=df.to_dict('records'), page_size=10), dcc.Graph(figure=fig), ] return app - def main(): """Entry point of the script.""" - app = create_dash() + app = create_dash(slog_path="/home/kevinh/.local/share/meshtastic/slogs/latest") port = 8051 logging.info(f"Running Dash visualization webapp on port {port} (publicly accessible)") app.run_server(debug=True, host='0.0.0.0', port=port) - if __name__ == "__main__": main()