From f6a954b97e8bd45887267a56366f4418050bd484 Mon Sep 17 00:00:00 2001 From: Ben Meadors Date: Sun, 10 May 2026 09:22:40 -0500 Subject: [PATCH] Implement rotating JSONL recorder for persistent logging (#10428) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Implement rotating JSONL recorder for persistent logging * Fixes * Update documentation and clean up imports in command files * Address remaining recorder review feedback Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/2541773c-869a-463f-9fae-8505272c06ff Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com> * recorder: fix lock re-entry deadlock on start() and force_rotate_all() The previous "Fixes" commit added `_files_snapshot()` which acquires `self._lock` so handlers don't race with `stop()` clearing `_files`. But two callers were already holding `self._lock` when they invoked methods that go through the snapshot: - `start()` writes the `recorder_start` event from inside its `with self._lock:` block. `_write_event` -> `_files_snapshot` re-acquires the same non-reentrant `threading.Lock`, freezing process startup. - `force_rotate_all()` calls `self.status()` (which also acquires `self._lock`) while still holding the lock from rotating each file. Both fixes release the lock before the call. The recorder_start marker still lands in events.jsonl because the started/started_at flags are already set when we write it. Verified end-to-end against the standalone /tmp/verify_pr_fixes.py harness — all 9 PR review-comment fixes pass, including pause/resume event ordering and concurrent start/stop without KeyError. Co-Authored-By: Claude Opus 4.7 (1M context) * Fix markdown linting issues in leakhunt.md and repro.md * Handle recorder startup and query review fixes Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com> * Tighten recorder follow-up tests Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com> * Stabilize recorder startup tests Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com> * Remove brittle recorder startup test Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com> * Polish recorder follow-up errors Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com> * Refine recorder startup and regex errors Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com> * Clean up recorder follow-up nits Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com> * Trunk --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: Claude Opus 4.7 (1M context) --- .claude/commands/diagnose.md | 8 +- .claude/commands/leakhunt.md | 103 ++++ .claude/commands/repro.md | 4 + mcp-server/.gitignore | 6 + mcp-server/scripts/datadog-dashboard.json | 217 +++++++ mcp-server/scripts/mtlog_to_datadog.py | 389 +++++++++++++ mcp-server/src/meshtastic_mcp/flash.py | 45 +- mcp-server/src/meshtastic_mcp/log_query.py | 410 +++++++++++++ mcp-server/src/meshtastic_mcp/pio.py | 15 + .../src/meshtastic_mcp/recorder/__init__.py | 19 + .../src/meshtastic_mcp/recorder/parsers.py | 309 ++++++++++ .../src/meshtastic_mcp/recorder/recorder.py | 467 +++++++++++++++ .../src/meshtastic_mcp/recorder/rotating.py | 163 ++++++ .../src/meshtastic_mcp/serial_session.py | 28 +- mcp-server/src/meshtastic_mcp/server.py | 233 +++++++- mcp-server/tests/unit/test_build_flags.py | 88 +++ mcp-server/tests/unit/test_recorder.py | 548 ++++++++++++++++++ 17 files changed, 3047 insertions(+), 5 deletions(-) create mode 100644 .claude/commands/leakhunt.md create mode 100644 mcp-server/scripts/datadog-dashboard.json create mode 100755 mcp-server/scripts/mtlog_to_datadog.py create mode 100644 mcp-server/src/meshtastic_mcp/log_query.py create mode 100644 mcp-server/src/meshtastic_mcp/recorder/__init__.py create mode 100644 mcp-server/src/meshtastic_mcp/recorder/parsers.py create mode 100644 mcp-server/src/meshtastic_mcp/recorder/recorder.py create mode 100644 mcp-server/src/meshtastic_mcp/recorder/rotating.py create mode 100644 mcp-server/tests/unit/test_build_flags.py create mode 100644 mcp-server/tests/unit/test_recorder.py diff --git a/.claude/commands/diagnose.md b/.claude/commands/diagnose.md index 749668956..d664f6312 100644 --- a/.claude/commands/diagnose.md +++ b/.claude/commands/diagnose.md @@ -49,11 +49,17 @@ Call the meshtastic MCP tool bundle and format a structured health report for on - Do the LoRa configs match? (region, channel_num, modem_preset should all agree; mismatch = no mesh) - Do the primary channel NAMES match? Mismatch = different PSK = no decode. -7. **Suggest next actions only for specific, recognisable failure modes**: +7. **Recorder slice (cheap, always available).** The mcp-server runs an autouse log recorder that's been collecting from every connected device. Pull two short slices to surface anything weird that's already happened: + - `mcp__meshtastic__logs_window(start="-2m", level="WARN|ERROR|CRIT", max_lines=20)` — recent firmware errors. If empty, say "no recent errors"; don't manufacture concern. + - `mcp__meshtastic__telemetry_timeline(window="1h", field="free_heap", max_points=60)` — heap trend. If `slope_per_min < -50`, flag it and recommend `/leakhunt window=6h` for a deeper read; otherwise just note the current free heap. + - If `recorder_status` shows `running:false` or `files.telemetry.last_ts` is null, note "recorder has no telemetry yet — enable `set_debug_log_api(True)` to populate" and skip this step gracefully. + +8. **Suggest next actions only for specific, recognisable failure modes**: - Stale PKI pubkey one-way → "run `/test tests/mesh/test_direct_with_ack.py` — the retry + nodeinfo-ping heals this in the test path." - Region mismatch → "re-bake one side via `./mcp-server/run-tests.sh --force-bake`." - Device unreachable, reachable via DFU → `touch_1200bps(port=...)` + `pio_flash`. If not even DFU responds AND the device is on a PPPS hub, escalate to `uhubctl_cycle(role=..., confirm=True)`. - CP2102-wedged-driver on macOS → see the note in `run-tests.sh`. + - Heap slope strongly negative → "run `/leakhunt window=6h` for a full timeline + classification." ## What NOT to do diff --git a/.claude/commands/leakhunt.md b/.claude/commands/leakhunt.md new file mode 100644 index 000000000..ef90b133e --- /dev/null +++ b/.claude/commands/leakhunt.md @@ -0,0 +1,103 @@ +--- +description: Hunt for memory leaks (and other slow degradations) by reading the persistent recorder's heap timeline + log slice over a window +argument-hint: [window=1h] [field=free_heap] [variant=local] +--- + + + +# `/leakhunt` — read the recorder, classify a memory leak + +Use the always-on recorder (`mcp-server/.mtlog/`) to read a heap timeline plus the matching log slice and produce a one-page verdict: **steady / slow leak / fragmentation / OOM-imminent**. No firmware changes, no special build flags — the LocalStats telemetry packet that the firmware already broadcasts every ~60 s carries `heap_free_bytes` and `heap_total_bytes`. + +## Two signal paths — pick the right one + +| Path | Build flag | Cadence | Per-thread attribution | Cost | +| --------------------- | ---------------- | -------------- | ---------------------- | ------------------------- | +| LocalStats packet | (default) | ~60 s | No | Free — always on | +| `[heap N]` log prefix | `-DDEBUG_HEAP=1` | every log line | Yes (Thread X leaked) | Bigger flash + log volume | + +Both feed the same `telemetry_timeline(field="free_heap")` query — when DEBUG_HEAP is on, the recorder synthesizes telemetry rows from log prefixes (tagged `source: debug_heap`), so a single timeline call gets whichever signal is available. **For a slow leak diagnosis, the default path is plenty** (60 s cadence over 6 h = 360 points; linear regression over that nails sub-100-byte/min slopes). **DEBUG_HEAP is for attribution** — when the slope is real and you need to know which thread is leaking. + +## What to do + +1. **Parse `$ARGUMENTS`**: optional `window` (default `1h`, accepts `30m`/`6h`/`-3d`/etc.), optional `field` (default `free_heap`; alternates: `total_heap`, `battery_level`, anything in the LocalStats variant), optional `variant` (default `local`; alternates: `device`, `environment`, `power`, `airQuality`, `health`). + +2. **Verify the recorder is alive** — call `mcp__meshtastic__recorder_status`. Check: + - `running == True` + - `files.telemetry.lines > 0` (at least one telemetry packet recorded — if zero, the device hasn't broadcast LocalStats yet OR `set_debug_log_api` has never been on; tell the operator to run `mcp__meshtastic__set_debug_log_api(enabled=True)` and wait one device-update interval) + - `files.telemetry.last_ts` within the last 5 minutes (if older, the device is silent — log that, not "leak detected") + +3. **Detect whether DEBUG_HEAP is active** — `mcp__meshtastic__logs_window(start="-2m", grep=r"\\[heap \\d+\\]", max_lines=3)`. If any line matches, the firmware has the prefix → DEBUG_HEAP is on, expect higher-cadence data and `heap_event` rows. If zero matches over the last 2 minutes, you're on the LocalStats-only path. + +4. **Pull the timeline** — `mcp__meshtastic__telemetry_timeline(window=$window, variant=$variant, field=$field, max_points=200)`. Read: + - `samples` — how many raw points contributed + - `min`, `max` — total swing + - `slope_per_min` — units per minute (linear regression over the whole window) + +5. **Pull the log context for the same window** — `mcp__meshtastic__logs_window(start="-${window}", grep="Heap status|leaked heap|freed heap|out of memory|Alloc an err|panic|abort", max_lines=200)`. These are the strings the firmware emits when something memory-related happens (`DEBUG_HEAP` builds emit `"Heap status:"` and `"leaked heap"` lines; production builds emit `"Alloc an err"` on failure and `"out of memory"` on OOM). + +6. **Pull marker events** so we know if the operator labeled phases — `mcp__meshtastic__events_window(start="-${window}", kind="mark|connection_lost|connection_established")`. If a `connection_lost` overlaps a sharp drop, that's not a leak; that's a reboot. + +6a. **(DEBUG_HEAP only) Per-thread attribution** — `mcp__meshtastic__logs_window(start="-${window}", grep="leaked heap", max_lines=200)`. Each row has a structured `heap_event` field with `{kind, thread, before, after, delta}`. Aggregate by thread: sum the `delta` over the window per thread name. The thread with the largest cumulative negative delta is your suspect. Note the count too — a thread with 50× small leaks is different from 1× big leak. + +7. **Classify** based on what the data says, NOT on what you wish it said. Use these rules in order: + - **Insufficient data** (< 5 samples): say so. Suggest a longer window or longer wait. Stop. + - **Reboot mid-window**: if any `connection_lost` event is present AND `free_heap` jumped UP at that timestamp, the device rebooted. Note it; pre-reboot trend may be a leak but you only have part of the curve. + - **OOM-imminent**: any `Alloc an err=` or `out of memory` line in the log slice. This trumps everything; flag urgently. + - **Slow leak**: `slope_per_min < -50` AND `max - min > 1000` AND no reboot. The heap is monotonically (or near-monotonically) declining. Estimate time-to-zero: `min / -slope_per_min` minutes. Surface it. + - **Fragmentation suspect**: `slope_per_min` close to zero (|x| < 50) BUT min trends down across the window AND the log slice shows `Alloc an err` warnings WITHOUT total OOM. Means free total is OK but largest contiguous block is shrinking. Recommend a `DEBUG_HEAP` build to confirm. + - **Steady**: |slope_per_min| < 50, no error lines. Heap is fine. + - **Recovery curve**: slope is POSITIVE — heap recovered. Either a workload completed or GC fired. Note it; not a leak. + +8. **Report**: + + ```text + /leakhunt window=6h field=free_heap variant=local + ──────────────────────────────────────────────────── + recorder : running, telem last_ts 8s ago + build : DEBUG_HEAP=ON (per-line prefix detected) + samples : 14,200 over 6h (cadence ~1.5s, log-line synth) + free_heap : min 92,344 / max 124,008 / range 31,664 + slope : -82 bytes/min (negative — heap declining) + reboots : none in window + OOM events : none + error lines : 3× "Alloc an err=ESP_ERR_NO_MEM" at +4h12m, +5h08m, +5h44m + thread leaks : (DEBUG_HEAP) MeshPacket -3,124 B over 18 events + Router -1,408 B over 4 events + others -240 B + verdict : SLOW LEAK — primary suspect MeshPacket thread + est. time-to-OOM: ~1,127 min (~18.8 h) at current slope + evidence : (3 log line citations with uptimes) + ``` + + Then: **what to do next.** + - SLOW LEAK, **DEBUG_HEAP off** → recommend rebuilding with the flag and re-running this skill. Concrete one-liner the operator can copy: + ```text + mcp__meshtastic__build(env="", build_flags={"DEBUG_HEAP": 1}) + mcp__meshtastic__pio_flash(env="", port="", confirm=True) + ``` + After flash, set debug_log_api back on and wait one window; re-run `/leakhunt`. + - SLOW LEAK, **DEBUG_HEAP on** → cite the top-leaking thread name from step 6a. Point at the corresponding source file (`grep -rn "ThreadName(\"\")" src/`); the operator decides what to fix. + - FRAGMENTATION SUSPECT → propose pre-allocating any per-packet buffers; or rebuilding with `CONFIG_HEAP_TASK_TRACKING=y` on ESP32 to see who's holding the largest blocks. + - OOM-IMMINENT → flag for immediate attention; don't wait for the next telemetry interval. + - STEADY → say so; stop. Don't invent problems. + +## What NOT to do + +- Don't assume a leak from a single dip. LocalStats fires every ~60 s and the firmware naturally allocates+frees on each broadcast cycle; one packet sees the trough. Look at the slope, not the deltas. +- Don't recommend code changes. This skill diagnoses; the operator decides what to fix. +- Don't enable `set_debug_log_api` automatically — if it's off, telemetry isn't reaching pubsub anyway, and the recorder will be empty. Tell the operator to flip it on and wait, then re-run. +- Don't run heavy workloads to "trigger the leak." The recorder is passive; we read what's there. + +## Companion: `mark_event` for stress runs + +If the operator wants to test under stimulus (e.g. blast 50 broadcasts and see what the heap does), they can frame the experiment with markers: + +```text +mark_event("burst-start") +… run the workload … +mark_event("burst-end") +/leakhunt window=15m +``` + +The markers land in both `events.jsonl` and `logs.jsonl`, so the report can show "free_heap dipped 8 KB during the burst window, recovered to baseline within 2 LocalStats cycles" → not a leak. diff --git a/.claude/commands/repro.md b/.claude/commands/repro.md index c5f466ce6..84513e45b 100644 --- a/.claude/commands/repro.md +++ b/.claude/commands/repro.md @@ -3,6 +3,8 @@ description: Re-run a specific test N times in isolation to triage flakes, diff argument-hint: [count=5] --- + + # `/repro` — flakiness triage for one test Re-run a single pytest node ID N times in isolation, track pass rate, and surface what's _different_ in the firmware logs between the passing attempts and the failing ones. Turns "it's flaky, I guess" into "it fails when X, passes when Y." @@ -40,6 +42,8 @@ Re-run a single pytest node ID N times in isolation, track pass rate, and surfac Surface the top 3 differences as a "passes when / fails when" table. Don't dump full logs — pull specific lines with uptime timestamps. +5a. **Archive recorder slices per attempt** (no extra device interaction; the recorder runs autouse). Right after each attempt finishes, capture its `(start_ts, end_ts)` and call `mcp__meshtastic__recorder_export(start=, end=, dest_dir="mcp-server/tests/repro_artifacts//attempt_/")`. This drops a `logs.jsonl`, `telemetry.jsonl`, `packets.jsonl`, and `events.jsonl` snapshot scoped to the attempt window. Use these for cross-attempt diffs in step 5: `jq '.line' logs.jsonl` is faster than re-running the test, and the telemetry slice lets you compare heap behavior across attempts. + 6. **Classify the flake** into one of: - **LoRa airtime collision** → pass rate improves with fewer concurrent transmitters; propose a `time.sleep` gap or retry bump in the test body. - **PKI key staleness** → fails on first attempt, passes after self-heal; existing retry loop in `test_direct_with_ack.py` handles this. diff --git a/mcp-server/.gitignore b/mcp-server/.gitignore index 4cc892b2a..744a4401d 100644 --- a/mcp-server/.gitignore +++ b/mcp-server/.gitignore @@ -7,6 +7,12 @@ __pycache__/ dist/ build/ +# Persistent device-log capture (recorder + Datadog cursor). +# Cross-session JSONL streams written by the autouse Recorder singleton +# (see src/meshtastic_mcp/recorder/). Lives outside tests/ so the pytest +# fixture truncate doesn't touch it. +.mtlog/ + # Test harness artifacts tests/report.html tests/junit.xml diff --git a/mcp-server/scripts/datadog-dashboard.json b/mcp-server/scripts/datadog-dashboard.json new file mode 100644 index 000000000..73aa35201 --- /dev/null +++ b/mcp-server/scripts/datadog-dashboard.json @@ -0,0 +1,217 @@ +{ + "title": "Meshtastic Firmware — Recorder Stream", + "description": "Live view of `.mtlog/` streams shipped by `mtlog_to_datadog.py`. Heap, packet volume, log levels, errors. One row per port.", + "widgets": [ + { + "definition": { + "title": "Free heap (bytes)", + "type": "timeseries", + "show_legend": true, + "requests": [ + { + "queries": [ + { + "name": "free_heap", + "data_source": "metrics", + "query": "avg:mesh.local.heap_free_bytes{service:meshtastic-firmware} by {port}" + } + ], + "response_format": "timeseries", + "display_type": "line" + } + ], + "yaxis": { "label": "bytes" } + } + }, + { + "definition": { + "title": "Heap slope (bytes/min) — last 1h", + "type": "query_value", + "precision": 0, + "requests": [ + { + "queries": [ + { + "name": "slope", + "data_source": "metrics", + "query": "derivative(avg:mesh.local.heap_free_bytes{service:meshtastic-firmware})", + "aggregator": "avg" + } + ], + "response_format": "scalar" + } + ], + "conditional_formats": [ + { "comparator": "<", "value": -100, "palette": "white_on_red" }, + { "comparator": "<", "value": 0, "palette": "white_on_yellow" }, + { "comparator": ">=", "value": 0, "palette": "white_on_green" } + ] + } + }, + { + "definition": { + "title": "Total heap (bytes)", + "type": "timeseries", + "requests": [ + { + "queries": [ + { + "name": "total_heap", + "data_source": "metrics", + "query": "avg:mesh.local.heap_total_bytes{service:meshtastic-firmware} by {port}" + } + ], + "response_format": "timeseries", + "display_type": "line" + } + ] + } + }, + { + "definition": { + "title": "Battery level (%)", + "type": "timeseries", + "requests": [ + { + "queries": [ + { + "name": "battery", + "data_source": "metrics", + "query": "avg:mesh.device.battery_level{service:meshtastic-firmware} by {port}" + } + ], + "response_format": "timeseries", + "display_type": "line" + } + ], + "yaxis": { "min": "0", "max": "105" } + } + }, + { + "definition": { + "title": "Air utilization (TX %)", + "type": "timeseries", + "requests": [ + { + "queries": [ + { + "name": "airutil", + "data_source": "metrics", + "query": "avg:mesh.device.air_util_tx{service:meshtastic-firmware} by {port}" + } + ], + "response_format": "timeseries", + "display_type": "line" + } + ] + } + }, + { + "definition": { + "title": "Channel utilization (%)", + "type": "timeseries", + "requests": [ + { + "queries": [ + { + "name": "chutil", + "data_source": "metrics", + "query": "avg:mesh.device.channel_utilization{service:meshtastic-firmware} by {port}" + } + ], + "response_format": "timeseries", + "display_type": "line" + } + ] + } + }, + { + "definition": { + "title": "Log volume by level", + "type": "timeseries", + "show_legend": true, + "requests": [ + { + "response_format": "timeseries", + "display_type": "bars", + "queries": [ + { + "name": "log_count", + "data_source": "logs", + "indexes": ["*"], + "compute": { "aggregation": "count" }, + "search": { "query": "service:meshtastic-firmware" }, + "group_by": [ + { + "facet": "@level", + "limit": 10, + "sort": { "order": "desc", "aggregation": "count" } + } + ] + } + ] + } + ] + } + }, + { + "definition": { + "title": "Recent ERROR / CRIT firmware logs", + "type": "list_stream", + "requests": [ + { + "response_format": "event_list", + "query": { + "data_source": "logs_stream", + "query_string": "service:meshtastic-firmware (status:error OR @level:ERROR OR @level:CRIT)", + "indexes": [], + "sort": { "column": "timestamp", "order": "desc" } + }, + "columns": [ + { "field": "timestamp", "width": "auto" }, + { "field": "host", "width": "auto" }, + { "field": "@port", "width": "auto" }, + { "field": "@level", "width": "auto" }, + { "field": "@thread", "width": "auto" }, + { "field": "message", "width": "stretch" } + ] + } + ] + } + }, + { + "definition": { + "title": "Recorder marker events", + "type": "list_stream", + "requests": [ + { + "response_format": "event_list", + "query": { + "data_source": "logs_stream", + "query_string": "service:meshtastic-firmware @level:MARK", + "indexes": [], + "sort": { "column": "timestamp", "order": "desc" } + }, + "columns": [ + { "field": "timestamp", "width": "auto" }, + { "field": "host", "width": "auto" }, + { "field": "message", "width": "stretch" } + ] + } + ] + } + } + ], + "template_variables": [ + { + "name": "port", + "prefix": "port", + "available_values": [], + "default": "*" + }, + { "name": "host", "prefix": "host", "available_values": [], "default": "*" } + ], + "layout_type": "ordered", + "notify_list": [], + "reflow_type": "auto" +} diff --git a/mcp-server/scripts/mtlog_to_datadog.py b/mcp-server/scripts/mtlog_to_datadog.py new file mode 100755 index 000000000..51496adc4 --- /dev/null +++ b/mcp-server/scripts/mtlog_to_datadog.py @@ -0,0 +1,389 @@ +#!/usr/bin/env python3 +"""Forward selected recorder JSONL streams to Datadog. + +Reads `.mtlog/logs.jsonl` and `.mtlog/telemetry.jsonl`, ships logs to the +Logs Intake API and telemetry numerics to the Metrics v2 series API. +Resumes from `.mtlog/.dd-cursor.json` so a daemon restart doesn't +duplicate rows already shipped from the current live files. + +This forwarder does not currently backfill rotated `.jsonl.gz` archives. +If the recorder rotates before this process drains the live file, or the +forwarder is down across a rotation, those older rows are skipped. + +Usage: + DD_API_KEY=... ./scripts/mtlog_to_datadog.py --tail + ./scripts/mtlog_to_datadog.py --once # catch up + exit + ./scripts/mtlog_to_datadog.py --since 3600 # backfill last hour from start + +Default `DD_SITE` is `us5.datadoghq.com` — the team's Datadog instance. +Override via `DD_SITE=...` env var or `--site` flag for one-offs. + +The forwarder is a separate process by design — a Datadog outage or +auth failure must not backpressure the recorder. We exit non-zero on +fatal config errors (missing API key) and keep retrying on transient +network/HTTP errors. +""" + +from __future__ import annotations + +import argparse +import json +import os +import socket +import sys +import time +from pathlib import Path +from typing import Any, Iterator + +try: + import requests +except ImportError: + print( + "requests is required. Install it in the mcp-server venv: " + "uv pip install requests", + file=sys.stderr, + ) + sys.exit(2) + + +_DEFAULT_LOG_DIR = Path(__file__).resolve().parents[1] / ".mtlog" +_LOG_INTAKE_TPL = "https://http-intake.logs.{site}/api/v2/logs" +_METRICS_TPL = "https://api.{site}/api/v2/series" +_LOG_BATCH = 50 +_METRICS_BATCH = 100 +_MAX_RETRIES = 5 +_RETRY_BASE_S = 1.5 + + +# --- streaming JSONL with byte-position cursor ------------------------- + + +class _StreamReader: + """Reads a single rotating JSONL with cursor-based resume. + + This tails only the live `.jsonl` file. The recorder rotates files + (live `.jsonl` → `.YYYYMMDD-HHMMSS-uuuuuu-NNNNN.jsonl.gz`), which means + the live file shrinks abruptly. We detect that via inode change OR live + size < cursor position, and reset the live-file cursor to 0. + """ + + def __init__(self, path: Path, cursor: dict[str, Any]): + self.path = path + self.cursor = cursor + + def _state(self) -> tuple[int, int]: + """Return (inode, size) for the live file. (0, 0) if missing.""" + try: + st = self.path.stat() + return (st.st_ino, st.st_size) + except FileNotFoundError: + return (0, 0) + + def iter_new_records(self) -> Iterator[dict[str, Any]]: + ino, size = self._state() + last_ino = self.cursor.get("ino") + last_pos = int(self.cursor.get("pos") or 0) + if ino == 0: + return + if last_ino is not None and last_ino != ino: + # Rotation happened. Start over. + last_pos = 0 + if last_pos > size: + # Live file truncated/shrunk under us — recorder rotated. + last_pos = 0 + try: + with self.path.open("r", encoding="utf-8") as fh: + fh.seek(last_pos) + for line in fh: + line = line.rstrip("\n") + if not line: + continue + try: + yield json.loads(line) + except json.JSONDecodeError: + continue + last_pos = fh.tell() + except FileNotFoundError: + return + self.cursor["ino"] = ino + self.cursor["pos"] = last_pos + + +def _load_cursor(path: Path) -> dict[str, Any]: + if not path.exists(): + return {} + try: + return json.loads(path.read_text()) + except (OSError, json.JSONDecodeError): + return {} + + +def _save_cursor(path: Path, data: dict[str, Any]) -> None: + tmp = path.with_suffix(".json.tmp") + tmp.write_text(json.dumps(data, separators=(",", ":"))) + tmp.replace(path) + + +# --- Datadog clients --------------------------------------------------- + + +class _DDSession: + """Pool one HTTPS session, share retry logic.""" + + def __init__(self, api_key: str, site: str, hostname: str) -> None: + self.api_key = api_key + self.site = site + self.hostname = hostname + self.session = requests.Session() + self.session.headers.update( + { + "DD-API-KEY": api_key, + "Content-Type": "application/json", + } + ) + + def _post(self, url: str, payload: Any) -> bool: + for attempt in range(_MAX_RETRIES): + try: + resp = self.session.post(url, json=payload, timeout=30) + except requests.RequestException as e: + _wait_retry(attempt, f"network error: {e}") + continue + if 200 <= resp.status_code < 300: + return True + if resp.status_code in (408, 429, 500, 502, 503, 504): + _wait_retry( + attempt, + f"HTTP {resp.status_code} retrying", + ) + continue + print( + f"datadog refused: {resp.status_code} {resp.text[:200]}", + file=sys.stderr, + ) + return False + return False + + def send_logs(self, records: list[dict[str, Any]]) -> int: + if not records: + return 0 + url = _LOG_INTAKE_TPL.format(site=self.site) + sent = 0 + for i in range(0, len(records), _LOG_BATCH): + batch = records[i : i + _LOG_BATCH] + if self._post(url, batch): + sent += len(batch) + return sent + + def send_metrics(self, series: list[dict[str, Any]]) -> int: + if not series: + return 0 + url = _METRICS_TPL.format(site=self.site) + sent = 0 + for i in range(0, len(series), _METRICS_BATCH): + batch = series[i : i + _METRICS_BATCH] + if self._post(url, {"series": batch}): + sent += len(batch) + return sent + + +def _wait_retry(attempt: int, reason: str) -> None: + wait = _RETRY_BASE_S * (2**attempt) + print( + f" retry {attempt + 1}/{_MAX_RETRIES} in {wait:.1f}s ({reason})", + file=sys.stderr, + ) + time.sleep(wait) + + +# --- record → datadog payload ------------------------------------------ + + +def _log_record_to_dd(rec: dict[str, Any], host: str) -> dict[str, Any]: + line = rec.get("line") or "" + tags = [ + f"role:{rec.get('role')}", + f"port:{rec.get('port')}", + ] + level = rec.get("level") + if level: + tags.append(f"level:{level}") + tag = rec.get("tag") + if tag: + tags.append(f"thread:{tag}") + return { + "ddsource": "meshtastic-firmware", + "service": "meshtastic-firmware", + "hostname": host, + "message": line, + "ddtags": ",".join(t for t in tags if t and "None" not in t), + "timestamp": int((rec.get("ts") or time.time()) * 1000), + "level": level, + } + + +def _telemetry_record_to_metrics( + rec: dict[str, Any], host: str +) -> list[dict[str, Any]]: + fields = rec.get("fields") or {} + if not isinstance(fields, dict): + return [] + variant = rec.get("variant") or "unknown" + ts = int(rec.get("ts") or time.time()) + out: list[dict[str, Any]] = [] + tags = [] + if rec.get("port"): + tags.append(f"port:{rec['port']}") + if rec.get("role"): + tags.append(f"role:{rec['role']}") + if rec.get("from_node"): + tags.append(f"from_node:{rec['from_node']}") + tags.append(f"variant:{variant}") + for field, value in fields.items(): + if not isinstance(value, (int, float)) or isinstance(value, bool): + continue + metric = f"mesh.{variant}.{_metric_safe(field)}" + out.append( + { + "metric": metric, + "type": 3, # GAUGE + "points": [{"timestamp": ts, "value": float(value)}], + "tags": tags, + "resources": [{"type": "host", "name": host}], + } + ) + return out + + +def _metric_safe(name: str) -> str: + # Lowercase, replace non-alnum with underscore for safe metric names. + return "".join(c.lower() if c.isalnum() else "_" for c in name) + + +# --- main loop --------------------------------------------------------- + + +def run( + log_dir: Path, + *, + once: bool, + since_seconds: float | None, + poll_interval: float, + dd: _DDSession, +) -> int: + cursor_path = log_dir / ".dd-cursor.json" + cursors = _load_cursor(cursor_path) + + # `--since` overrides cursor: rewind to (now-since) timestamp. + # We can't seek by timestamp directly (cursor is byte position), so + # we just reset cursors to 0 and let the time filter in iter_new + # drop older records. + cutoff_ts: float | None = None + if since_seconds is not None: + cursors = {} + cutoff_ts = time.time() - since_seconds + + sent_total = {"logs": 0, "telemetry": 0} + + while True: + # logs.jsonl → DD logs + log_cursor = cursors.setdefault("logs", {}) + log_batch: list[dict[str, Any]] = [] + for rec in _StreamReader(log_dir / "logs.jsonl", log_cursor).iter_new_records(): + if cutoff_ts and (rec.get("ts") or 0) < cutoff_ts: + continue + log_batch.append(_log_record_to_dd(rec, dd.hostname)) + if log_batch: + n = dd.send_logs(log_batch) + sent_total["logs"] += n + print(f"logs: sent {n}/{len(log_batch)}") + + # telemetry.jsonl → DD metrics + telem_cursor = cursors.setdefault("telemetry", {}) + metric_series: list[dict[str, Any]] = [] + for rec in _StreamReader( + log_dir / "telemetry.jsonl", telem_cursor + ).iter_new_records(): + if cutoff_ts and (rec.get("ts") or 0) < cutoff_ts: + continue + metric_series.extend(_telemetry_record_to_metrics(rec, dd.hostname)) + if metric_series: + n = dd.send_metrics(metric_series) + sent_total["telemetry"] += n + print(f"telemetry: sent {n}/{len(metric_series)} metric points") + + _save_cursor(cursor_path, cursors) + + if once: + print(f"done. logs={sent_total['logs']} metrics={sent_total['telemetry']}") + return 0 + time.sleep(poll_interval) + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--log-dir", + default=str(_DEFAULT_LOG_DIR), + help="Path to .mtlog/ (default: mcp-server/.mtlog)", + ) + mode = parser.add_mutually_exclusive_group() + mode.add_argument("--once", action="store_true", help="Catch up then exit") + mode.add_argument( + "--tail", + action="store_true", + help="Daemon: poll forever (default)", + ) + parser.add_argument( + "--since", + type=float, + default=None, + help="Backfill last N seconds. Resets cursor.", + ) + parser.add_argument( + "--poll-interval", + type=float, + default=5.0, + help="Seconds between tail polls (default 5)", + ) + parser.add_argument( + "--site", + default=os.environ.get("DD_SITE", "us5.datadoghq.com"), + help=( + "Datadog site. Default is the team's instance (us5.datadoghq.com). " + "Override via DD_SITE env var or this flag." + ), + ) + parser.add_argument( + "--host", + default=socket.gethostname(), + help="Hostname tag (default: socket.gethostname())", + ) + args = parser.parse_args(argv) + + api_key = os.environ.get("DD_API_KEY") + if not api_key: + print("DD_API_KEY env var required.", file=sys.stderr) + return 2 + + log_dir = Path(args.log_dir) + if not log_dir.exists(): + print( + f"log dir {log_dir} does not exist — start the mcp-server first.", + file=sys.stderr, + ) + return 2 + + dd = _DDSession(api_key=api_key, site=args.site, hostname=args.host) + once = args.once and not args.tail + return run( + log_dir, + once=once, + since_seconds=args.since, + poll_interval=args.poll_interval, + dd=dd, + ) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/mcp-server/src/meshtastic_mcp/flash.py b/mcp-server/src/meshtastic_mcp/flash.py index e11197d5f..debc2cf91 100644 --- a/mcp-server/src/meshtastic_mcp/flash.py +++ b/mcp-server/src/meshtastic_mcp/flash.py @@ -108,18 +108,33 @@ def build( env: str, with_manifest: bool = True, userprefs_overrides: dict[str, Any] | None = None, + build_flags: dict[str, Any] | None = None, ) -> dict[str, Any]: """Run `pio run -e ` and return artifact paths. `userprefs_overrides` (optional): dict of `USERPREFS_: value` to inject into userPrefs.jsonc for this build only. File is restored byte-for-byte on exit. Use `userprefs_set()` for persistent changes. + + `build_flags` (optional): dict of `-D=` macros to set for + this build only via `PLATFORMIO_BUILD_FLAGS`. Common useful flag: + `{"DEBUG_HEAP": 1}` enables per-thread leak detection + `[heap N]` + prefix on every log line. Combines with the recorder so heap shows + up at log cadence (much higher resolution than the ~60 s LocalStats + packet) — see `recorder/parsers.py:_HEAP_PREFIX_RE`. Bool values + expand to bare `-D` (presence-only flags). """ args = ["run", "-e", env] if with_manifest: args.extend(["-t", "mtjson"]) + extra_env = _build_flags_env(build_flags) if build_flags else None with userprefs.temporary_overrides(userprefs_overrides) as effective: - result = pio.run(args, timeout=pio.TIMEOUT_BUILD, check=False) + result = pio.run( + args, + timeout=pio.TIMEOUT_BUILD, + check=False, + extra_env=extra_env, + ) return { "exit_code": result.returncode, "artifacts": [str(p) for p in _artifacts_for(env)], @@ -127,9 +142,27 @@ def build( "stderr_tail": pio.tail_lines(result.stderr, 200), "duration_s": round(result.duration_s, 2), "userprefs": _userprefs_summary(effective), + "build_flags": dict(build_flags) if build_flags else None, } +def _build_flags_env(build_flags: dict[str, Any]) -> dict[str, str]: + """Translate `{"DEBUG_HEAP": 1, "FOO": "bar"}` → `{"PLATFORMIO_BUILD_FLAGS": + "-DDEBUG_HEAP=1 -DFOO=bar"}`. Bool True → bare `-D`; False/None drop + the flag entirely. Other types stringify.""" + parts: list[str] = [] + for key, value in build_flags.items(): + if value is False or value is None: + continue + if value is True: + parts.append(f"-D{key}") + else: + parts.append(f"-D{key}={value}") + if not parts: + return {} + return {"PLATFORMIO_BUILD_FLAGS": " ".join(parts)} + + def clean(env: str) -> dict[str, Any]: """Run `pio run -e -t clean`.""" result = pio.run(["run", "-e", env, "-t", "clean"], timeout=120, check=False) @@ -146,20 +179,29 @@ def flash( port: str, confirm: bool = False, userprefs_overrides: dict[str, Any] | None = None, + build_flags: dict[str, Any] | None = None, ) -> dict[str, Any]: """`pio run -e -t upload --upload-port `. All architectures. `userprefs_overrides` (optional): see `build()` — the rebuild-before-upload that pio performs will pick up the injected values. + + `build_flags` (optional): same shape as `build()` — `PLATFORMIO_BUILD_FLAGS` + is exported for the rebuild-before-upload, so the uploaded firmware + actually carries the flags. Without this propagation, `pio run -t upload` + would relink without the env var and silently drop them. Common use: + `build_flags={"DEBUG_HEAP": 1}` for the leak-hunt path. """ _require_confirm(confirm, "flash") _reject_native_env(env, "flash") connection.reject_if_tcp(port, "flash") + extra_env = _build_flags_env(build_flags) if build_flags else None with userprefs.temporary_overrides(userprefs_overrides) as effective: result = pio.run( ["run", "-e", env, "-t", "upload", "--upload-port", port], timeout=pio.TIMEOUT_UPLOAD, check=False, + extra_env=extra_env, ) return { "exit_code": result.returncode, @@ -167,6 +209,7 @@ def flash( "stderr_tail": pio.tail_lines(result.stderr, 200), "duration_s": round(result.duration_s, 2), "userprefs": _userprefs_summary(effective), + "build_flags": dict(build_flags) if build_flags else None, } diff --git a/mcp-server/src/meshtastic_mcp/log_query.py b/mcp-server/src/meshtastic_mcp/log_query.py new file mode 100644 index 000000000..0f3ad7b69 --- /dev/null +++ b/mcp-server/src/meshtastic_mcp/log_query.py @@ -0,0 +1,410 @@ +"""Read-side queries over the recorder's JSONL streams. + +Pure functions over `mcp-server/.mtlog/`. Streaming JSONL reader: never +loads a whole file. Time-bound queries short-circuit as soon as `ts` +exceeds the requested end. The recorder writes monotonically, so a +forward scan is cheap; we don't need an index. + +All time arguments accept: + - epoch seconds (int/float) + - relative strings: "-15m", "-2h", "-3d", "now" + - ISO-ish absolute strings: "2026-05-07T14:30:00" (naive timestamps are + treated as UTC) + +Tools that return data ALWAYS cap their output (max_lines / max_points +/ max), and report whether more matched than was returned. +""" + +from __future__ import annotations + +import gzip +import json +import re +import statistics +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Iterator + +from .recorder.recorder import get_recorder + +_REL_RE = re.compile(r"^\s*-\s*(\d+(?:\.\d+)?)\s*([smhd])\s*$") +_REGEX_PREVIEW_MAX = 100 +_REGEX_PREVIEW_TRUNCATE = 97 + + +def _parse_time(value: Any, *, now: float | None = None) -> float: + """Coerce to epoch seconds. Defaults `now` to `time.time()`.""" + if value is None: + return time.time() + if isinstance(value, (int, float)): + return float(value) + if not isinstance(value, str): + raise ValueError(f"invalid time: {value!r}") + s = value.strip().lower() + if s in ("", "now"): + return time.time() if now is None else now + m = _REL_RE.match(s) + if m: + n = float(m.group(1)) + unit = m.group(2) + secs = n * {"s": 1, "m": 60, "h": 3600, "d": 86400}[unit] + base = time.time() if now is None else now + return base - secs + # Try ISO 8601. Accept naive (assume UTC) and Z-suffixed. + try: + if s.endswith("z"): + s = s[:-1] + "+00:00" + dt = datetime.fromisoformat(s) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.timestamp() + except ValueError as e: + raise ValueError(f"unparseable time: {value!r}") from e + + +def _iter_jsonl(path: Path, *, since: float, until: float) -> Iterator[dict[str, Any]]: + """Stream records in chronological order: rotated archives first + (oldest → newest by lex sort, which is chronological for our + `YYYYMMDD-HHMMSS-uuuuuu-NNNNN` archive naming), then the live file + last. The "keep last N" pop-front logic in the window queries + relies on records arriving in time order across files. + """ + files: list[Path] = [] + # Gzipped archives are named ".YYYYMMDD-HHMMSS-uuuuuu-NNNNN.jsonl.gz". + for archive in sorted(path.parent.glob(f"{path.stem}.*.jsonl.gz")): + files.append(archive) + if path.exists(): + files.append(path) + for f in files: + opener = gzip.open if f.suffix == ".gz" else open + try: + with opener(f, "rt", encoding="utf-8") as fh: # type: ignore[arg-type] + for line in fh: + line = line.strip() + if not line: + continue + try: + rec = json.loads(line) + except json.JSONDecodeError: + continue + ts = rec.get("ts") + if not isinstance(ts, (int, float)): + continue + if ts < since: + continue + if ts > until: + # Records are append-monotonic within a file, so + # the rest of this file is also past `until`. + # Archives can still overlap each other, so only + # short-circuit this file, not the whole scan. + break + yield rec + except (FileNotFoundError, OSError): + continue + + +# -- queries ------------------------------------------------------------ + + +def logs_window( + start: Any = "-15m", + end: Any = "now", + *, + grep: str | None = None, + level: str | None = None, + tag: str | None = None, + port: str | None = None, + max_lines: int = 200, +) -> dict[str, Any]: + """Recent firmware log lines, filtered. + + `level` accepts a single level name or pipe-separated set + ("WARN|ERROR|CRIT"). `grep` is a regex (Python re) over the raw + `line` field. Returns the last `max_lines` matches. + """ + s = _parse_time(start) + e = _parse_time(end) + levels = _split_set(level) + if grep: + try: + grep_re = re.compile(grep) + except re.error as exc: + preview = ( + grep + if len(grep) <= _REGEX_PREVIEW_MAX + else f"{grep[:_REGEX_PREVIEW_TRUNCATE]}..." + ) + raise ValueError(f"invalid grep regex {preview!r}: {exc}") from exc + else: + grep_re = None + + base = get_recorder().base_dir + matched = 0 + out: list[dict[str, Any]] = [] + for rec in _iter_jsonl(base / "logs.jsonl", since=s, until=e): + if levels and rec.get("level") not in levels: + continue + if tag and rec.get("tag") != tag: + continue + if port and rec.get("port") != port: + continue + if grep_re and not grep_re.search(rec.get("line") or ""): + continue + matched += 1 + out.append(rec) + if len(out) > max_lines: + out.pop(0) # keep the most recent N + return { + "lines": out, + "total_matched": matched, + "dropped": max(0, matched - max_lines), + "window": {"start": s, "end": e}, + } + + +def telemetry_timeline( + window: Any = "1h", + *, + variant: str = "local", + field: str = "free_heap", + port: str | None = None, + max_points: int = 200, +) -> dict[str, Any]: + """Timeseries of one telemetry field, downsampled. + + `field` matches both the protobuf snake_case name (`free_heap`, + `heap_free_bytes`, `battery_level`) and camelCase (`freeHeap`). + Server-side bucket-mean downsamples to ≤ `max_points`. Returns + `slope_per_min` (linear regression slope, units/min) so a leak + detector can read one number. + """ + end = time.time() + if isinstance(window, (int, float)): + # Numeric `window` is a duration in seconds — "last N seconds". + # Without this branch, `_parse_time(-N)` would treat -N as an + # absolute epoch timestamp (i.e., Jan 1 1970 minus N seconds), + # producing a wildly negative `start` and matching nothing. + start = end - float(window) + elif isinstance(window, str) and not window.startswith("-"): + # Bare string like "1h" is sugar for "-1h". + start = _parse_time(f"-{window}", now=end) + else: + start = _parse_time(window, now=end) + + base = get_recorder().base_dir + raw: list[tuple[float, float]] = [] + field_aliases = _field_aliases(field) + for rec in _iter_jsonl(base / "telemetry.jsonl", since=start, until=end): + if rec.get("variant") != variant: + continue + if port and rec.get("port") != port: + continue + fields = rec.get("fields") or {} + value: Any = None + for alias in field_aliases: + if alias in fields: + value = fields[alias] + break + if not isinstance(value, (int, float)): + continue + raw.append((float(rec["ts"]), float(value))) + + if not raw: + return { + "points": [], + "samples": 0, + "min": None, + "max": None, + "slope_per_min": None, + "window": {"start": start, "end": end, "variant": variant, "field": field}, + } + + points = _downsample(raw, max_points=max_points) + values = [v for _, v in raw] + return { + "points": [{"ts": ts, "value": v} for ts, v in points], + "samples": len(raw), + "min": min(values), + "max": max(values), + "slope_per_min": _slope_per_min(raw), + "window": {"start": start, "end": end, "variant": variant, "field": field}, + } + + +def packets_window( + start: Any = "-5m", + end: Any = "now", + *, + portnum: str | None = None, + from_node: str | None = None, + to_node: str | None = None, + max: int = 200, +) -> dict[str, Any]: + s = _parse_time(start) + e = _parse_time(end) + portnums = _split_set(portnum) + base = get_recorder().base_dir + matched = 0 + out: list[dict[str, Any]] = [] + for rec in _iter_jsonl(base / "packets.jsonl", since=s, until=e): + if portnums and rec.get("portnum") not in portnums: + continue + if from_node and str(rec.get("from_node")) != str(from_node): + continue + if to_node and str(rec.get("to_node")) != str(to_node): + continue + matched += 1 + out.append(rec) + if len(out) > max: + out.pop(0) + return { + "packets": out, + "total_matched": matched, + "dropped": matched - max if matched > max else 0, + "window": {"start": s, "end": e}, + } + + +def events_window( + start: Any = "-1h", + end: Any = "now", + *, + kind: str | None = None, + max: int = 200, +) -> dict[str, Any]: + s = _parse_time(start) + e = _parse_time(end) + kinds = _split_set(kind) + base = get_recorder().base_dir + matched = 0 + out: list[dict[str, Any]] = [] + for rec in _iter_jsonl(base / "events.jsonl", since=s, until=e): + if kinds and rec.get("kind") not in kinds: + continue + matched += 1 + out.append(rec) + if len(out) > max: + out.pop(0) + return { + "events": out, + "total_matched": matched, + "dropped": matched - max if matched > max else 0, + "window": {"start": s, "end": e}, + } + + +def export( + start: Any, + end: Any, + dest_dir: str, + *, + streams: list[str] | None = None, +) -> dict[str, Any]: + """Bundle a slice of each requested stream into `dest_dir`. + + For a notebook, a bug report, or a Datadog backfill. Output files + are uncompressed JSONL (callers gzip themselves if they want to). + """ + s = _parse_time(start) + e = _parse_time(end) + selected = streams or ["logs", "telemetry", "packets", "events"] + dest = Path(dest_dir) + dest.mkdir(parents=True, exist_ok=True) + + base = get_recorder().base_dir + paths: dict[str, str] = {} + for stream in selected: + src = base / f"{stream}.jsonl" + if not src.exists() and not list(base.glob(f"{stream}.*.jsonl.gz")): + continue + out_path = dest / f"{stream}.jsonl" + n = 0 + with out_path.open("w", encoding="utf-8") as fh: + for rec in _iter_jsonl(src, since=s, until=e): + fh.write(json.dumps(rec, separators=(",", ":")) + "\n") + n += 1 + paths[stream] = str(out_path) + paths[f"{stream}_count"] = str(n) + return {"dest_dir": str(dest), "paths": paths, "window": {"start": s, "end": e}} + + +# -- helpers ------------------------------------------------------------ + + +def _split_set(value: str | None) -> set[str] | None: + if not value: + return None + return {v.strip() for v in value.split("|") if v.strip()} + + +def _field_aliases(field: str) -> list[str]: + """Accept snake_case OR camelCase, plus a few legacy aliases.""" + snake = field + camel = _snake_to_camel(field) + aliases = {snake, camel} + # Old protobuf fields (pre-LocalStats) used different names + legacy = { + "free_heap": ["free_heap", "freeHeap", "heap_free_bytes", "heapFreeBytes"], + "heap_free_bytes": [ + "heap_free_bytes", + "heapFreeBytes", + "free_heap", + "freeHeap", + ], + "total_heap": ["total_heap", "totalHeap", "heap_total_bytes", "heapTotalBytes"], + "heap_total_bytes": [ + "heap_total_bytes", + "heapTotalBytes", + "total_heap", + "totalHeap", + ], + } + if field in legacy: + aliases.update(legacy[field]) + return list(aliases) + + +def _snake_to_camel(name: str) -> str: + parts = name.split("_") + return parts[0] + "".join(p.title() for p in parts[1:]) + + +def _downsample( + points: list[tuple[float, float]], *, max_points: int +) -> list[tuple[float, float]]: + if len(points) <= max_points: + return points + # Even-bucket mean. Preserves shape better than nth-sample picking. + n = len(points) + bucket = n / max_points + out: list[tuple[float, float]] = [] + i = 0 + for k in range(max_points): + end = int((k + 1) * bucket) + end = min(end, n) + if end <= i: + continue + chunk = points[i:end] + ts = chunk[len(chunk) // 2][0] + val = statistics.fmean(v for _, v in chunk) + out.append((ts, val)) + i = end + return out + + +def _slope_per_min(points: list[tuple[float, float]]) -> float | None: + """Least-squares slope (units per minute). None if too few points.""" + if len(points) < 2: + return None + xs = [t for t, _ in points] + ys = [v for _, v in points] + n = len(xs) + mean_x = sum(xs) / n + mean_y = sum(ys) / n + num = sum((xs[i] - mean_x) * (ys[i] - mean_y) for i in range(n)) + den = sum((x - mean_x) ** 2 for x in xs) + if den == 0: + return None + slope_per_sec = num / den + return slope_per_sec * 60.0 diff --git a/mcp-server/src/meshtastic_mcp/pio.py b/mcp-server/src/meshtastic_mcp/pio.py index c0c23f9bb..c984d7a42 100644 --- a/mcp-server/src/meshtastic_mcp/pio.py +++ b/mcp-server/src/meshtastic_mcp/pio.py @@ -92,6 +92,7 @@ def _run_capturing( cwd: Path | None = None, timeout: float | None = None, tee_header: str | None = None, + extra_env: dict[str, str] | None = None, ) -> tuple[int, str, str, float]: """Run a subprocess, capture stdout+stderr, optionally tee to the flash log. @@ -99,6 +100,9 @@ def _run_capturing( `subprocess.TimeoutExpired` on timeout (callers map this to their own domain-specific error). + `extra_env` merges into the subprocess environment (parent env stays + intact). Used for `PLATFORMIO_BUILD_FLAGS=-DDEBUG_HEAP=1` and similar. + Fast path: `subprocess.run(capture_output=True)` when no flash log is configured (unchanged behavior). @@ -110,6 +114,9 @@ def _run_capturing( """ log_path = _flash_log_path() t0 = time.monotonic() + env = None + if extra_env: + env = {**os.environ, **extra_env} if log_path is None: # Fast path — unchanged. @@ -119,6 +126,7 @@ def _run_capturing( capture_output=True, text=True, timeout=timeout, + env=env, ) return ( proc.returncode, @@ -145,6 +153,7 @@ def _run_capturing( stderr=subprocess.PIPE, text=True, bufsize=1, # line-buffered + env=env, ) stdout_chunks: list[str] = [] stderr_chunks: list[str] = [] @@ -232,12 +241,17 @@ def run( cwd: Path | None = None, timeout: float | None = TIMEOUT_DEFAULT, check: bool = True, + extra_env: dict[str, str] | None = None, ) -> PioResult: """Invoke `pio ` and return captured output. `cwd` defaults to the firmware root. `check=True` raises `PioError` on non-zero exit; set `check=False` to inspect `returncode` manually. + `extra_env` merges into the subprocess environment — used for + `PLATFORMIO_BUILD_FLAGS=-DDEBUG_HEAP=1` and similar build-time + toggles that can't be expressed as command-line args. + If `MESHTASTIC_MCP_FLASH_LOG` is set, output is also tee'd to that file line-by-line as it arrives (for live flash progress in the TUI). """ @@ -250,6 +264,7 @@ def run( cwd=work_dir, timeout=timeout, tee_header=f"pio {' '.join(args)}", + extra_env=extra_env, ) except subprocess.TimeoutExpired as exc: raise PioTimeout(f"pio {' '.join(args)} timed out after {timeout}s") from exc diff --git a/mcp-server/src/meshtastic_mcp/recorder/__init__.py b/mcp-server/src/meshtastic_mcp/recorder/__init__.py new file mode 100644 index 000000000..874d59d0e --- /dev/null +++ b/mcp-server/src/meshtastic_mcp/recorder/__init__.py @@ -0,0 +1,19 @@ +"""Persistent device-log capture. + +Singleton `Recorder` subscribes once to the meshtastic pubsub fan-out +(`meshtastic.log.line`, `meshtastic.receive.*`, `meshtastic.connection.*`) +and appends to four JSONL files under `mcp-server/.mtlog/`. Pubsub is +process-global so a single subscription captures every active interface +(serial / TCP / BLE) without any per-connection bookkeeping. + +The recorder is opt-in-by-import: importing this package is a no-op; call +`get_recorder().start()` (which `server.py` does at FastMCP app init) to +begin writing. `pause()` / `resume()` exist for the rare case the user +wants a clean stretch of file (e.g. capturing a known-good baseline). +""" + +from __future__ import annotations + +from .recorder import Recorder, get_recorder + +__all__ = ["Recorder", "get_recorder"] diff --git a/mcp-server/src/meshtastic_mcp/recorder/parsers.py b/mcp-server/src/meshtastic_mcp/recorder/parsers.py new file mode 100644 index 000000000..1936ccbda --- /dev/null +++ b/mcp-server/src/meshtastic_mcp/recorder/parsers.py @@ -0,0 +1,309 @@ +"""Best-effort parsers for log lines and telemetry packets. + +Two flavors of log line cross our pubsub subscription: + 1. Text-mode path (debug_log_api disabled): the meshtastic Python lib + accumulates bytes between protobuf frames and emits the full + firmware-formatted line, e.g. + "INFO | 12:34:56 12345 [Main] Booting" + — level, HH:MM:SS, uptime seconds, thread bracket, then message. + 2. LogRecord protobuf path (debug_log_api enabled): the lib calls + `_handleLogLine(record.message)` with ONLY the message body. The + level/source/time fields on the LogRecord are dropped before + pubsub fan-out. We get e.g. just "Booting". + +Both arrive on `meshtastic.log.line`. The parser tries to recover a +level + thread when the prefix is present and falls back to level=None +otherwise. Consumers who want level filtering on protobuf-mode hosts +should grep the raw `line` field instead. + +Telemetry: `meshtastic.receive.telemetry` packets carry one of several +metric variants in `packet["decoded"]["telemetry"]`. We flatten the +chosen variant into a {field: value} dict so callers don't have to +know the protobuf shape. +""" + +from __future__ import annotations + +import re +from typing import Any + +# Match: LEVEL | HH:MM:SS UPTIME [Thread] message +# HH:MM:SS may be ??:??:?? when RTC isn't valid. The level alternation +# below is the canonical list — DebugConfiguration.h's MESHTASTIC_LOG_LEVEL_* +# macros must stay in sync with these strings. +_LINE_RE = re.compile( + r""" + ^ + (?PDEBUG|INFO\ |WARN\ |ERROR|CRIT\ |TRACE|HEAP\ ) + \s*\|\s* + (?P(?:\d{2}:\d{2}:\d{2})|(?:\?{2}:\?{2}:\?{2})) + \s+ + (?P\d+) + \s+ + (?:\[(?P[^\]]+)\]\s+)? + (?P.*) + $ + """, + re.VERBOSE, +) + +# DEBUG_HEAP build prepends `[heap N] ` to every message body, AFTER the +# thread bracket. See src/RedirectablePrint.cpp:175. +_HEAP_PREFIX_RE = re.compile(r"^\[heap\s+(?P\d+)\]\s+(?P.*)$") + +# OSThread leak/free detection. See src/concurrency/OSThread.cpp:89-91. +# Format: "------ Thread NAME leaked heap A -> B (delta) ------" +# "++++++ Thread NAME freed heap A -> B (delta) ++++++" +_THREAD_HEAP_RE = re.compile( + r""" + ^[\-+]+\s* + Thread\s+(?P\S+)\s+ + (?Pleaked|freed)\s+heap\s+ + (?P-?\d+)\s*->\s*(?P-?\d+)\s+ + \((?P-?\d+)\) + """, + re.VERBOSE, +) + +# Power.cpp:908 periodic heap status (DEBUG_HEAP only). +# Format: "Heap status: FREE/TOTAL bytes free (DELTA), running R/N threads" +_HEAP_STATUS_RE = re.compile( + r""" + Heap\s+status:\s+ + (?P\d+)\s*/\s*(?P\d+)\s+bytes\s+free + (?:\s+\((?P-?\d+)\))? + """, + re.VERBOSE, +) + + +_ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]") +_HEAP_BRACKET_RE = re.compile(r"^heap\s+(?P\d+)$") + + +def parse_log_line(line: str) -> dict[str, Any]: + """Best-effort decompose a raw firmware log line. + + Returns a dict with at least `line` (the original, unmodified — ANSI + codes preserved for fidelity). Adds `level`, `tag`, `clock`, + `uptime_s`, and `msg` when the full prefix is present. + + Handles two firmware quirks: + - LogRecord.message can carry ANSI color escapes from RedirectablePrint + (the BLE/StreamAPI path inherited the colored body in some builds). + We strip ANSI before regex matching so the prefix survives. + - DEBUG_HEAP injects `[heap N]` after the thread bracket. When NO + thread name is set, the heap takes the thread bracket position — + looks like `[heap 12345] msg`. We detect that shape and move it + out of `tag` and into `heap_free`. + + DEBUG_HEAP-build extras (when `[heap N]` is injected): `heap_free` + (bytes), and when a `Thread X leaked|freed heap` line is recognized, + `heap_event` = {kind, thread, before, after, delta}. + + Never raises. + """ + out: dict[str, Any] = {"line": line} + if not line: + return out + + # Strip ANSI escapes BEFORE any regex matching. The original `line` + # stays in `out["line"]` for fidelity / future grep. + clean = _ANSI_RE.sub("", line) + + m = _LINE_RE.match(clean) + msg: str | None = None + if m: + level = m.group("level").rstrip() + out["level"] = level + out["clock"] = m.group("clock") + try: + out["uptime_s"] = int(m.group("uptime")) + except (TypeError, ValueError): + out["uptime_s"] = None + thread = m.group("thread") + if thread: + # If "thread" is actually the heap prefix taking the bracket + # position (DEBUG_HEAP build, no thread set), capture heap + # and leave tag unset. + hb = _HEAP_BRACKET_RE.match(thread.strip()) + if hb: + try: + out["heap_free"] = int(hb.group("heap")) + except (TypeError, ValueError): + pass + else: + out["tag"] = thread + msg = m.group("msg") + out["msg"] = msg + else: + # No prefix — bare LogRecord.message body. Inspect the whole + # line for DEBUG_HEAP-style content; the heap-prefix and + # thread-leak patterns can survive on either path. + msg = clean + + # DEBUG_HEAP per-line heap prefix: `[heap 92344] message`. + # Sits AFTER the thread bracket and BEFORE the message body, but + # for bare LogRecord lines it's at the start. Match it at the + # head of `msg`. + if msg: + hp = _HEAP_PREFIX_RE.match(msg) + if hp: + try: + out["heap_free"] = int(hp.group("heap")) + except (TypeError, ValueError): + pass + else: + # Strip the prefix from `msg` so a grep on the message + # body doesn't have to know about it. + out["msg"] = hp.group("rest") + msg = hp.group("rest") + + # Thread-level leak/free detection. + thr = _THREAD_HEAP_RE.search(msg) + if thr: + try: + out["heap_event"] = { + "kind": thr.group("kind"), + "thread": thr.group("thread"), + "before": int(thr.group("before")), + "after": int(thr.group("after")), + "delta": int(thr.group("delta")), + } + except (TypeError, ValueError): + pass + + # Power.cpp periodic "Heap status: F/T bytes free (D), running ..." + hs = _HEAP_STATUS_RE.search(msg) + if hs: + try: + out["heap_free"] = int(hs.group("free")) + out["heap_total"] = int(hs.group("total")) + if hs.group("delta") is not None: + out["heap_delta"] = int(hs.group("delta")) + except (TypeError, ValueError): + pass + + return out + + +# -- Telemetry ---------------------------------------------------------- + +# Order matters: meshtastic-python decoded packets use the protobuf +# `oneof variant` field name (snake_case) as the dict key. +_TELEMETRY_VARIANTS = ( + ("device_metrics", "device"), + ("local_stats", "local"), + ("environment_metrics", "environment"), + ("power_metrics", "power"), + ("air_quality_metrics", "airQuality"), + ("health_metrics", "health"), + ("host_metrics", "host"), +) + + +def extract_telemetry(packet: dict[str, Any]) -> dict[str, Any] | None: + """Pull the telemetry variant + flat fields out of a `meshtastic.receive.telemetry` + packet. Returns None when the shape isn't what we expect — so the + caller can fall back to a generic packets.jsonl row. + """ + if not isinstance(packet, dict): + return None + decoded = packet.get("decoded") + if not isinstance(decoded, dict): + return None + telem = decoded.get("telemetry") + if not isinstance(telem, dict): + return None + # The Python lib produces dict-of-camelCase keys via MessageToDict. + # Try both camelCase and snake_case to be robust to lib version drift. + for snake, label in _TELEMETRY_VARIANTS: + camel = _snake_to_camel(snake) + for key in (snake, camel): + value = telem.get(key) + if isinstance(value, dict): + return { + "variant": label, + "fields": {k: _scalarize(v) for k, v in value.items()}, + "time": telem.get("time"), + } + return None + + +def _snake_to_camel(name: str) -> str: + parts = name.split("_") + return parts[0] + "".join(p.title() for p in parts[1:]) + + +def _scalarize(value: Any) -> Any: + """Keep telemetry fields JSON-friendly. Lists/dicts pass through + untouched; bytes -> hex string; protobuf enums occasionally arrive + as ints (fine) or strings (also fine).""" + if isinstance(value, (bytes, bytearray, memoryview)): + return bytes(value).hex() + return value + + +# -- Generic packet summary --------------------------------------------- + + +def summarize_packet( + packet: dict[str, Any], *, payload_hex_len: int = 64 +) -> dict[str, Any]: + """Reduce a packet dict to a stable, queryable summary. Drops the + full payload bytes — the recorder records summaries, not pcaps. + """ + if not isinstance(packet, dict): + return {"raw_type": type(packet).__name__} + decoded = packet.get("decoded") if isinstance(packet.get("decoded"), dict) else {} + portnum = decoded.get("portnum") if isinstance(decoded, dict) else None + payload = decoded.get("payload") if isinstance(decoded, dict) else None + payload_hex = None + payload_size = None + if isinstance(payload, (bytes, bytearray, memoryview)): + b = bytes(payload) + payload_size = len(b) + payload_hex = b[:payload_hex_len].hex() if b else "" + elif isinstance(payload, str): + # Some decoded payloads (text messages) come as decoded strings. + payload_size = len(payload) + payload_hex = None # not bytes + return { + "from_node": packet.get("fromId") or packet.get("from"), + "to_node": packet.get("toId") or packet.get("to"), + "portnum": portnum, + "hop_limit": packet.get("hopLimit"), + "want_ack": packet.get("wantAck"), + "rx_rssi": packet.get("rxRssi"), + "rx_snr": packet.get("rxSnr"), + "channel": packet.get("channel"), + "id": packet.get("id"), + "payload_size": payload_size, + "payload_hex_prefix": payload_hex, + } + + +# -- Interface identification ------------------------------------------ + + +def interface_label(interface: Any) -> dict[str, Any]: + """Stable identifier for the meshtastic interface that emitted an event. + + Used as the `port`/`role` tag on every recorded row. SerialInterface + has `devPath`; TCPInterface has `hostname`+`portNumber`; BLEInterface + has `address`. Falls back to the class name when none of those exist. + """ + if interface is None: + return {"port": None, "role": None} + dev_path = getattr(interface, "devPath", None) + if dev_path: + return {"port": str(dev_path), "role": "serial"} + hostname = getattr(interface, "hostname", None) + if hostname: + port_num = getattr(interface, "portNumber", None) + endpoint = f"tcp://{hostname}:{port_num}" if port_num else f"tcp://{hostname}" + return {"port": endpoint, "role": "tcp"} + address = getattr(interface, "address", None) + if address: + return {"port": str(address), "role": "ble"} + return {"port": type(interface).__name__, "role": None} diff --git a/mcp-server/src/meshtastic_mcp/recorder/recorder.py b/mcp-server/src/meshtastic_mcp/recorder/recorder.py new file mode 100644 index 000000000..2b8a5b481 --- /dev/null +++ b/mcp-server/src/meshtastic_mcp/recorder/recorder.py @@ -0,0 +1,467 @@ +"""Process-global recorder singleton. + +Subscribes once to the meshtastic pubsub fan-out and writes four append-only +JSONL streams under `mcp-server/.mtlog/`. The pubsub fan-out is +process-global — a single subscription captures every active interface +without per-connection bookkeeping. + +Files: + logs.jsonl — every `meshtastic.log.line` event (best-effort prefix + parsed for level/tag/uptime; raw `line` always preserved) + telemetry.jsonl — `meshtastic.receive.telemetry` packets, flattened by + variant (device / local / environment / power / etc.) + packets.jsonl — every other `meshtastic.receive.*` packet, summarized + (portnum, hops, RSSI/SNR, payload size + 64-byte hex) + events.jsonl — connection lifecycle, node-DB updates, and manual + `mark_event` rows. Lower volume; useful for aligning + timelines. + +Pause/resume: `pause()` flips a flag; subscriptions stay registered. The +write methods short-circuit when paused, so we don't lose ordering when +resumed (we just have a gap). No queueing. +""" + +from __future__ import annotations + +import logging +import os +import threading +import time +from pathlib import Path +from typing import Any + +from . import parsers +from .rotating import _RotatingJsonl + +_DEFAULT_DIR = Path(__file__).resolve().parents[3] / ".mtlog" +log = logging.getLogger(__name__) + + +class Recorder: + """Singleton write-side of the persistent log capture system.""" + + def __init__(self, base_dir: Path | None = None) -> None: + self.base_dir = Path(base_dir) if base_dir else _DEFAULT_DIR + self._lock = threading.RLock() + self._started = False + self._paused = False + self._pause_reason: str | None = None + self._started_at: float | None = None + self._handlers: list[tuple[str, Any]] = [] + self._files: dict[str, _RotatingJsonl] = {} + + # -- lifecycle ---------------------------------------------------- + + def start(self) -> None: + """Idempotent. Safe to call from FastMCP app startup.""" + with self._lock: + if self._started: + return + self.base_dir.mkdir(parents=True, exist_ok=True) + self._files = { + "logs": _RotatingJsonl(self.base_dir / "logs.jsonl"), + "telemetry": _RotatingJsonl(self.base_dir / "telemetry.jsonl"), + "packets": _RotatingJsonl(self.base_dir / "packets.jsonl"), + "events": _RotatingJsonl(self.base_dir / "events.jsonl"), + } + self._wire_pubsub() + self._started = True + self._started_at = time.time() + # Write the recorder_start marker after the initialization block. + # `_write_event()` re-checks recorder state via `_files_snapshot()`, + # so keeping this out of the setup block avoids nested lifecycle work. + self._write_event(kind="recorder_start", label="recorder_started") + + def stop(self) -> None: + with self._lock: + if not self._started: + return + self._unwire_pubsub() + for f in self._files.values(): + f.close() + self._files = {} + self._started = False + + def pause(self, reason: str | None = None) -> None: + # Write the pause marker BEFORE flipping the flag — `_write_event` + # short-circuits when paused, so the order matters for this event + # to actually land in events.jsonl. + self._write_event( + kind="recorder_pause", + label="paused", + note=reason, + ) + with self._lock: + self._paused = True + self._pause_reason = reason + + def resume(self) -> None: + # Mirror of `pause()`: clear the flag first, then write the marker + # so it isn't suppressed by the still-paused short-circuit. + with self._lock: + self._paused = False + self._pause_reason = None + self._write_event(kind="recorder_resume", label="resumed") + + # -- pubsub wiring ------------------------------------------------ + + def _wire_pubsub(self) -> None: + from pubsub import pub # type: ignore[import-untyped] + + # Subscribers — one per topic. Each pubsub publisher sends + # keyword args matching its handler's signature; pubsub + # introspects the function signature to route args. + bindings = [ + ("meshtastic.log.line", self._on_log_line), + ("meshtastic.serial.line", self._on_serial_line), + ("meshtastic.receive", self._on_receive), + ("meshtastic.receive.telemetry", self._on_telemetry), + ("meshtastic.connection.established", self._on_connection_established), + ("meshtastic.connection.lost", self._on_connection_lost), + ("meshtastic.node.updated", self._on_node_updated), + ] + for topic, handler in bindings: + try: + pub.subscribe(handler, topic) + self._handlers.append((topic, handler)) + except Exception as exc: + # If pubsub refuses one binding (signature mismatch on + # an old lib version), log it and keep the rest. + log.warning("Recorder failed to subscribe to %s: %s", topic, exc) + + def _unwire_pubsub(self) -> None: + from pubsub import pub # type: ignore[import-untyped] + + for topic, handler in self._handlers: + try: + pub.unsubscribe(handler, topic) + except Exception: + pass + self._handlers.clear() + + # -- handlers ----------------------------------------------------- + # + # Pubsub callbacks must never raise. Every handler is wrapped in a + # try/except that swallows so a bug here can't take down the + # SerialInterface receive thread. + # + # Threading: handlers fire on whatever thread the meshtastic library + # dispatches from (varies by interface), while `stop()` clears + # `self._files` under `self._lock`. We snapshot `_files` under the + # lock at the top of each handler so a concurrent stop can't + # KeyError us mid-write. The actual file write goes through + # `_RotatingJsonl` which has its own lock. + + def _files_snapshot(self) -> dict[str, _RotatingJsonl] | None: + """Atomic-ish view of `self._files`. Returns None when the recorder + is paused or stopped, so handlers can early-exit cleanly without + racing `stop()`'s clear.""" + with self._lock: + if not self._started or self._paused: + return None + return dict(self._files) + + def _on_log_line(self, line: str, interface: Any = None) -> None: + files = self._files_snapshot() + if files is None: + return + try: + tags = parsers.interface_label(interface) + parsed = parsers.parse_log_line(str(line)) + ts = time.time() + record: dict[str, Any] = { + "ts": ts, + "port": tags["port"], + "role": tags["role"], + "level": parsed.get("level"), + "tag": parsed.get("tag"), + "uptime_s": parsed.get("uptime_s"), + "line": parsed["line"], + } + # DEBUG_HEAP enrichments (only present when the firmware + # was built with -DDEBUG_HEAP=1). Surface as first-class + # fields so logs_window can grep/filter on them and so + # heap_free synthesizes a telemetry point below. + if "heap_free" in parsed: + record["heap_free"] = parsed["heap_free"] + if "heap_total" in parsed: + record["heap_total"] = parsed["heap_total"] + if "heap_delta" in parsed: + record["heap_delta"] = parsed["heap_delta"] + heap_event = parsed.get("heap_event") + if heap_event: + record["heap_event"] = heap_event + files["logs"].write(record) + + # If the line carried a heap snapshot, also write it as a + # synthesized LocalStats-shaped row so telemetry_timeline + # picks it up at log cadence (much higher resolution than + # the ~60 s LocalStats packet). Tagged source=debug_heap so + # consumers can filter if mixing scales is unwanted. + heap_free = parsed.get("heap_free") + if isinstance(heap_free, int): + fields: dict[str, Any] = {"heap_free_bytes": heap_free} + heap_total = parsed.get("heap_total") + if isinstance(heap_total, int): + fields["heap_total_bytes"] = heap_total + files["telemetry"].write( + { + "ts": ts, + "port": tags["port"], + "role": tags["role"], + "from_node": None, + "variant": "local", + "fields": fields, + "source": "debug_heap", + } + ) + except Exception: + pass + + def _on_serial_line(self, line: str, port: str | None = None) -> None: + """Text-mode passive tap. Fired from `serial_session._drain` when a + `pio device monitor` subprocess is running. + + Same parse + heap-synthesis path as `_on_log_line`, but receives + the raw text-formatted line (full level/clock/uptime/thread/`[heap N]`/ + body). On DEBUG_HEAP builds in text mode this gives us per-log-line + heap data — far higher cadence than LocalStats, and works without + protobuf API mode (no SerialInterface required). + """ + files = self._files_snapshot() + if files is None: + return + try: + parsed = parsers.parse_log_line(str(line)) + ts = time.time() + record: dict[str, Any] = { + "ts": ts, + "port": port, + "role": "serial_session", + "level": parsed.get("level"), + "tag": parsed.get("tag"), + "uptime_s": parsed.get("uptime_s"), + "line": parsed["line"], + } + if "heap_free" in parsed: + record["heap_free"] = parsed["heap_free"] + if "heap_total" in parsed: + record["heap_total"] = parsed["heap_total"] + if "heap_delta" in parsed: + record["heap_delta"] = parsed["heap_delta"] + heap_event = parsed.get("heap_event") + if heap_event: + record["heap_event"] = heap_event + files["logs"].write(record) + + # Synthesize a heap_free telemetry sample whenever the line + # carries one — same logic as _on_log_line, tagged source so + # consumers can distinguish text-mode tap from protobuf path. + heap_free = parsed.get("heap_free") + if isinstance(heap_free, int): + fields: dict[str, Any] = {"heap_free_bytes": heap_free} + heap_total = parsed.get("heap_total") + if isinstance(heap_total, int): + fields["heap_total_bytes"] = heap_total + files["telemetry"].write( + { + "ts": ts, + "port": port, + "role": "serial_session", + "from_node": None, + "variant": "local", + "fields": fields, + "source": "debug_heap_serial", + } + ) + except Exception: + pass + + def _on_telemetry(self, packet: dict[str, Any], interface: Any = None) -> None: + files = self._files_snapshot() + if files is None: + return + try: + tags = parsers.interface_label(interface) + extracted = parsers.extract_telemetry(packet) + if extracted is None: + # Couldn't extract a known variant — fall through to the + # generic `_on_receive` path, which will still fire for + # this packet via the parent topic. + return + record = { + "ts": time.time(), + "port": tags["port"], + "role": tags["role"], + "from_node": packet.get("fromId") or packet.get("from"), + "variant": extracted["variant"], + "fields": extracted["fields"], + "device_time": extracted.get("time"), + } + files["telemetry"].write(record) + except Exception: + pass + + def _on_receive(self, packet: dict[str, Any], interface: Any = None) -> None: + # Generic-receive fires for EVERY packet. Telemetry packets get + # recorded twice (here and in _on_telemetry) — that's intentional: + # packets.jsonl is the universal record, telemetry.jsonl is the + # structured timeseries view. + files = self._files_snapshot() + if files is None: + return + try: + tags = parsers.interface_label(interface) + summary = parsers.summarize_packet(packet) + record = { + "ts": time.time(), + "port": tags["port"], + "role": tags["role"], + **summary, + } + files["packets"].write(record) + except Exception: + pass + + def _on_connection_established(self, interface: Any = None) -> None: + self._write_event( + kind="connection_established", + interface=interface, + ) + + def _on_connection_lost(self, interface: Any = None) -> None: + self._write_event( + kind="connection_lost", + interface=interface, + ) + + def _on_node_updated( + self, node: dict[str, Any] | None = None, interface: Any = None + ) -> None: + # Lower-volume than packets but informative — node ID, hops away, + # last heard. Skip the user dict if absent. + try: + user = (node or {}).get("user") if isinstance(node, dict) else None + self._write_event( + kind="node_updated", + interface=interface, + data={ + "num": (node or {}).get("num"), + "id": (user or {}).get("id"), + "short": (user or {}).get("shortName"), + "long": (user or {}).get("longName"), + "hops_away": (node or {}).get("hopsAway"), + "snr": (node or {}).get("snr"), + "last_heard": (node or {}).get("lastHeard"), + }, + ) + except Exception: + pass + + # -- public write helpers ----------------------------------------- + + def mark_event( + self, + label: str, + note: str | None = None, + data: dict[str, Any] | None = None, + ) -> dict[str, Any]: + """User-facing marker. Writes to events.jsonl AND emits a + synthetic logs.jsonl row tagged level=MARK so timelines align. + """ + ts = self._write_event(kind="mark", label=label, note=note, data=data) + # Mirror into logs so a single logs_window grep finds it. + files = self._files_snapshot() + if files is not None: + try: + files["logs"].write( + { + "ts": ts, + "port": None, + "role": "marker", + "level": "MARK", + "tag": "mark_event", + "line": f"[mark] {label}" + (f" — {note}" if note else ""), + } + ) + except Exception: + pass + return {"ts": ts, "label": label} + + def _write_event( + self, + *, + kind: str, + label: str | None = None, + note: str | None = None, + interface: Any = None, + data: dict[str, Any] | None = None, + ) -> float: + ts = time.time() + # Lifecycle markers (recorder_start, recorder_pause, recorder_resume) + # arrive at choreographed moments — `pause()` writes BEFORE flipping + # the flag and `resume()` writes AFTER clearing it, so those calls + # see _paused=False here. Other event kinds short-circuit when + # paused via the snapshot guard below. + files = self._files_snapshot() + if files is None: + return ts + try: + tags = parsers.interface_label(interface) + files["events"].write( + { + "ts": ts, + "kind": kind, + "label": label, + "note": note, + "port": tags["port"], + "role": tags["role"], + "data": data, + } + ) + except Exception: + pass + return ts + + # -- introspection ------------------------------------------------ + + def status(self) -> dict[str, Any]: + with self._lock: + return { + "running": self._started, + "paused": self._paused, + "pause_reason": self._pause_reason, + "started_at": self._started_at, + "base_dir": str(self.base_dir), + "files": {name: f.status() for name, f in self._files.items()}, + } + + def force_rotate_all(self) -> dict[str, Any]: + """Test/admin hook: rotate every stream right now.""" + with self._lock: + files = list(self._files.values()) + for f in files: + f.force_rotate() + # `status()` re-acquires `self._lock`; release before calling it. + return self.status() + + +# -- module-level singleton accessor ------------------------------------ + +_INSTANCE_LOCK = threading.Lock() +_INSTANCE: Recorder | None = None + + +def get_recorder() -> Recorder: + """Return the process-global Recorder. Created on first call. + + Honors `MESHTASTIC_MCP_LOG_DIR` env var for the base directory + (used by tests to redirect to a tmpdir). + """ + global _INSTANCE + with _INSTANCE_LOCK: + if _INSTANCE is None: + override = os.environ.get("MESHTASTIC_MCP_LOG_DIR") + base = Path(override) if override else None + _INSTANCE = Recorder(base_dir=base) + return _INSTANCE diff --git a/mcp-server/src/meshtastic_mcp/recorder/rotating.py b/mcp-server/src/meshtastic_mcp/recorder/rotating.py new file mode 100644 index 000000000..631317790 --- /dev/null +++ b/mcp-server/src/meshtastic_mcp/recorder/rotating.py @@ -0,0 +1,163 @@ +"""Append-only JSONL writer with size-capped rotation. + +A `_RotatingJsonl` owns one live `.jsonl` file. Writes are line-delimited +JSON objects (one row per call). When the live file exceeds `max_bytes`, +it is closed, gzipped to `.YYYYMMDD-HHMMSS-uuuuuu-NNNNN.jsonl.gz`, +and the live file resets to empty. Old archives past `keep_archives` are +unlinked oldest-first. + +Size check is amortized — `os.fstat` runs every `check_every` writes, +not per-write, so the hot path stays at one `fh.write` + one `fh.flush`. + +Threading: every public method acquires `self._lock`. The recorder runs +several pubsub handlers on whatever thread the meshtastic library +dispatches from (varies by interface), and queries from MCP tool calls +arrive on the FastMCP request thread, so this lock is not optional. +""" + +from __future__ import annotations + +import gzip +import json +import os +import shutil +import threading +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + + +class _RotatingJsonl: + """Append-only JSONL with size rotation. Thread-safe.""" + + def __init__( + self, + path: Path, + *, + max_bytes: int = 100 * 1024 * 1024, + keep_archives: int = 5, + check_every: int = 1000, + ) -> None: + self.path = path + self.max_bytes = max_bytes + self.keep_archives = keep_archives + self.check_every = check_every + self._lock = threading.Lock() + self._fh: Any = None + self._writes_since_check = 0 + self._rotations = 0 + self._lines_written = 0 + self._last_ts: float | None = None + self._open() + + # -- lifecycle ---------------------------------------------------- + + def _open(self) -> None: + self.path.parent.mkdir(parents=True, exist_ok=True) + self._fh = self.path.open("a", encoding="utf-8") + + def close(self) -> None: + with self._lock: + if self._fh is not None: + try: + self._fh.close() + finally: + self._fh = None + + # -- write -------------------------------------------------------- + + def write(self, record: dict[str, Any]) -> None: + """Append one JSON object as a line. Triggers rotation if oversized.""" + line = json.dumps(record, separators=(",", ":"), default=str) + "\n" + with self._lock: + if self._fh is None: + return + try: + self._fh.write(line) + self._fh.flush() + except Exception: + # Best-effort: a failed write must not crash the pubsub + # handler. Caller has no way to react anyway. + return + self._lines_written += 1 + ts = record.get("ts") + if isinstance(ts, (int, float)): + self._last_ts = float(ts) + self._writes_since_check += 1 + if self._writes_since_check >= self.check_every: + self._writes_since_check = 0 + self._maybe_rotate() + + # -- rotation ----------------------------------------------------- + + def _maybe_rotate(self) -> None: + # Caller holds self._lock. + try: + size = os.fstat(self._fh.fileno()).st_size + except OSError: + return + if size < self.max_bytes: + return + self._rotate_locked() + + def _rotate_locked(self) -> None: + # Close, gzip-rename, reopen empty, prune oldest archives. + try: + self._fh.close() + except Exception: + pass + self._fh = None + # Microsecond-resolution timestamp + per-instance counter so back- + # to-back rotations (small max_bytes, repeated `force_rotate()`, + # or chatty test loops) get unique archive filenames. The lex + # sort order of `YYYYMMDD-HHMMSS-uuuuuu-NNNNN` is chronological, + # which `_prune_archives()` and `log_query._iter_jsonl()` both + # rely on. + stamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S-%f") + archive = self.path.with_suffix(f".{stamp}-{self._rotations:05d}.jsonl.gz") + try: + with self.path.open("rb") as src, gzip.open(archive, "wb") as dst: + shutil.copyfileobj(src, dst, length=1024 * 1024) + self.path.unlink() + except Exception: + # Rotation is best-effort. If gzip fails, leave the file + # in place and re-open it; we'll try again next check. + pass + self._open() + self._rotations += 1 + self._prune_archives() + + def _prune_archives(self) -> None: + # Match siblings of self.path.name with `.jsonl.gz` suffix. + prefix = self.path.stem # "logs" for "logs.jsonl" + # Archive filenames are already lexicographically chronological. + # Prune by name, not mtime, so copied/restored files don't reorder. + archives = sorted(self.path.parent.glob(f"{prefix}.*.jsonl.gz")) + excess = len(archives) - self.keep_archives + for old in archives[: max(0, excess)]: + try: + old.unlink() + except OSError: + pass + + def force_rotate(self) -> None: + """Test/admin hook: rotate immediately regardless of size.""" + with self._lock: + if self._fh is not None: + self._rotate_locked() + + # -- introspection ------------------------------------------------ + + def status(self) -> dict[str, Any]: + with self._lock: + try: + size = os.fstat(self._fh.fileno()).st_size if self._fh else 0 + except OSError: + size = 0 + return { + "path": str(self.path), + "size": size, + "lines": self._lines_written, + "last_ts": self._last_ts, + "rotations": self._rotations, + } diff --git a/mcp-server/src/meshtastic_mcp/serial_session.py b/mcp-server/src/meshtastic_mcp/serial_session.py index 43537323f..fe1a452d0 100644 --- a/mcp-server/src/meshtastic_mcp/serial_session.py +++ b/mcp-server/src/meshtastic_mcp/serial_session.py @@ -46,7 +46,23 @@ class SerialSession: def _drain(session: SerialSession) -> None: - """Reader thread: line-by-line pull stdout into buffer.""" + """Reader thread: line-by-line pull stdout into buffer. + + Each line is also published to the `meshtastic.serial.line` pubsub + topic so the persistent recorder can capture it without holding its + own port. This is the text-mode tap path: when no SerialInterface is + open, the firmware emits full formatted lines (level + clock + uptime + + thread + `[heap N]` prefix on DEBUG_HEAP builds + body), and we + fan them out to whoever is listening. Pubsub is best-effort — + publish failures must never block the reader. + """ + # Lazy import: pubsub isn't required just to import this module + # (e.g., during static analysis), and we want a clean test surface. + try: + from pubsub import pub # type: ignore[import-untyped] + except Exception: # pragma: no cover - defensive + pub = None + assert session.proc.stdout is not None try: for line in session.proc.stdout: @@ -54,6 +70,16 @@ def _drain(session: SerialSession) -> None: with session.lock: session.buffer.append(line_stripped) session.total_lines += 1 + if pub is not None: + try: + pub.sendMessage( + "meshtastic.serial.line", + line=line_stripped, + port=session.port, + ) + except Exception: + # A subscriber raising must not break the reader. + pass except Exception: # pragma: no cover - defensive pass finally: diff --git a/mcp-server/src/meshtastic_mcp/server.py b/mcp-server/src/meshtastic_mcp/server.py index 83aa80c45..573765e26 100644 --- a/mcp-server/src/meshtastic_mcp/server.py +++ b/mcp-server/src/meshtastic_mcp/server.py @@ -6,6 +6,7 @@ etc.). Business logic does not live here. from __future__ import annotations +import logging from typing import Any from mcp.server.fastmcp import FastMCP @@ -17,14 +18,34 @@ from . import ( flash, hw_tools, info, + log_query, registry, serial_session, ) from . import userprefs as userprefs_mod +from .recorder import get_recorder + +log = logging.getLogger(__name__) app = FastMCP("meshtastic-mcp") +def _start_recorder() -> None: + # Persistent device-log capture. Starts on first import — pubsub fan-out + # is process-global, so subscribing here captures every active interface + # (whether opened by an MCP tool, a pytest fixture, or a serial_session). + # Files land in mcp-server/.mtlog/ (gitignored). See recorder/recorder.py + # for the full design. Recorder startup is best-effort: an unwritable + # log dir or pubsub mismatch should not take the MCP server down. + try: + get_recorder().start() + except Exception as exc: + log.warning("Failed to start persistent recorder: %s", exc) + + +_start_recorder() + + # ---------- Discovery & metadata ------------------------------------------ @@ -75,6 +96,7 @@ def build( env: str, with_manifest: bool = True, userprefs: dict[str, Any] | None = None, + build_flags: dict[str, Any] | None = None, ) -> dict[str, Any]: """Build firmware for one env via `pio run -e `. @@ -86,8 +108,21 @@ def build( build via userPrefs.jsonc injection. The file is restored after the build completes. Use `userprefs_manifest` to discover available keys. Use `userprefs_set` for persistent changes. + + `build_flags` (optional): dict of `-D=` macros for this build + only, injected via `PLATFORMIO_BUILD_FLAGS`. Common pattern: + `build_flags={"DEBUG_HEAP": 1}` enables per-thread leak detection + a + `[heap N]` prefix on every log line. The recorder picks the prefix up + automatically and synthesizes a high-resolution heap timeline that + `telemetry_timeline(field="free_heap")` can read alongside the normal + ~60 s LocalStats packets. Pair with `/leakhunt` for classification. """ - return flash.build(env, with_manifest=with_manifest, userprefs_overrides=userprefs) + return flash.build( + env, + with_manifest=with_manifest, + userprefs_overrides=userprefs, + build_flags=build_flags, + ) @app.tool() @@ -105,6 +140,7 @@ def pio_flash( port: str, confirm: bool = False, userprefs: dict[str, Any] | None = None, + build_flags: dict[str, Any] | None = None, ) -> dict[str, Any]: """Flash firmware via `pio run -e -t upload --upload-port `. @@ -114,8 +150,19 @@ def pio_flash( `userprefs` (optional): dict of `USERPREFS_: value` baked into this build via userPrefs.jsonc injection; restored after upload. + + `build_flags` (optional): dict of `-D=` macros for the + rebuild-before-upload, e.g. `{"DEBUG_HEAP": 1}`. Required for the flags + to actually land in the uploaded firmware — without it, the implicit + rebuild relinks without the env var and silently drops them. """ - return flash.flash(env, port, confirm=confirm, userprefs_overrides=userprefs) + return flash.flash( + env, + port, + confirm=confirm, + userprefs_overrides=userprefs, + build_flags=build_flags, + ) @app.tool() @@ -734,3 +781,185 @@ def picotool_load(uf2_path: str, confirm: bool = False) -> dict[str, Any]: def picotool_raw(args: list[str], confirm: bool = False) -> dict[str, Any]: """Pass-through to `picotool`. load/reboot/save/erase require confirm=True.""" return hw_tools.picotool_raw(args, confirm=confirm) + + +# ---------- Persistent device-log capture (recorder) ---------------------- +# +# The recorder is autouse — it starts at server import and continuously +# writes every meshtastic pubsub event to JSONL files under .mtlog/. These +# tools are query-only over those files, plus a few lifecycle controls. + + +@app.tool() +def logs_window( + start: str = "-15m", + end: str = "now", + grep: str | None = None, + level: str | None = None, + tag: str | None = None, + port: str | None = None, + max_lines: int = 200, +) -> dict[str, Any]: + """Recent firmware log lines from the persistent recorder. + + Filters by time window, regex over the line, level (single or + pipe-separated set like "WARN|ERROR|CRIT"), thread-name tag, and + interface port. Returns up to max_lines most-recent matches. + + Time strings: "-15m", "-2h", "-3d", "now", or ISO 8601. + + Note: lines arriving via the LogRecord protobuf path (when + set_debug_log_api(True) is on) come without level prefix — the + meshtastic Python lib drops record.level before fan-out. For those, + `level` filter won't match; use `grep` instead. + """ + return log_query.logs_window( + start=start, + end=end, + grep=grep, + level=level, + tag=tag, + port=port, + max_lines=max_lines, + ) + + +@app.tool() +def telemetry_timeline( + window: str = "1h", + variant: str = "local", + field: str = "free_heap", + port: str | None = None, + max_points: int = 200, +) -> dict[str, Any]: + """Time series of one telemetry field, downsampled to <= max_points. + + `variant` ∈ device, local, environment, power, airQuality, health, host. + `field` accepts snake_case or camelCase; common aliases (free_heap ↔ + heap_free_bytes) are normalized. + + Returns slope_per_min (linear-regression slope, units/minute) so a + leak detector can read one number — negative slope on free_heap over + a long window indicates a real leak. + + LocalStats variant ("local") cadence is ~60 s (whatever the device's + `device_update_interval` is set to), so a 1 h window gives ~60 raw + points. Bucket-mean downsampling preserves shape. + """ + return log_query.telemetry_timeline( + window=window, + variant=variant, + field=field, + port=port, + max_points=max_points, + ) + + +@app.tool() +def packets_window( + start: str = "-5m", + end: str = "now", + portnum: str | None = None, + from_node: str | None = None, + to_node: str | None = None, + max: int = 200, +) -> dict[str, Any]: + """Recent mesh packets recorded by the recorder. + + Each row is a summary (portnum, from/to, hop_limit, RSSI/SNR, payload + size + first 64 bytes hex) — full payload bytes are not stored. + `portnum` accepts a pipe-separated set like "TEXT_MESSAGE_APP|POSITION_APP". + """ + return log_query.packets_window( + start=start, + end=end, + portnum=portnum, + from_node=from_node, + to_node=to_node, + max=max, + ) + + +@app.tool() +def events_window( + start: str = "-1h", + end: str = "now", + kind: str | None = None, + max: int = 200, +) -> dict[str, Any]: + """Return recorder events: connection lifecycle, node updates, and `mark_event` markers. + + `kind` ∈ recorder_start, recorder_pause, recorder_resume, + connection_established, connection_lost, node_updated, mark. + Pipe-separated sets ("connection_lost|connection_established") work. + """ + return log_query.events_window(start=start, end=end, kind=kind, max=max) + + +@app.tool() +def mark_event( + label: str, + note: str | None = None, + data: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Drop a named marker into events.jsonl AND logs.jsonl. + + Useful for aligning a timeline around a known stimulus: call before + and after a stress workload, then query telemetry_timeline / + logs_window with the markers' timestamps as bounds. + + The marker also lands in logs.jsonl with level=MARK so a single + grep over logs picks it up. + """ + return get_recorder().mark_event(label=label, note=note, data=data) + + +@app.tool() +def recorder_status() -> dict[str, Any]: + """Return recorder runtime info: running, paused, file sizes, last_ts per stream. + + Use this to sanity-check that capture is working before you trust a + `logs_window` / `telemetry_timeline` result. + """ + return get_recorder().status() + + +@app.tool() +def recorder_pause(reason: str | None = None) -> dict[str, Any]: + """Pause writes to all four streams. Pubsub subscriptions stay active — + we just drop events on the floor while paused. Resume with `recorder_resume`. + + Use when capturing a known-good baseline that you don't want to + pollute with pre-test noise. Default state is recording; this is + rarely needed. + """ + get_recorder().pause(reason=reason) + return {"ok": True, "paused": True, "reason": reason} + + +@app.tool() +def recorder_resume() -> dict[str, Any]: + """Resume writes after `recorder_pause`. No-op if already running.""" + get_recorder().resume() + return {"ok": True, "paused": False} + + +@app.tool() +def recorder_export( + start: str, + end: str, + dest_dir: str, + streams: list[str] | None = None, +) -> dict[str, Any]: + """Bundle a slice of the recorder's streams into `dest_dir`. + + Writes one uncompressed JSONL per requested stream (logs / telemetry / + packets / events). Useful for: attaching to a bug report, feeding a + notebook, or backfilling Datadog after the fact. + """ + return log_query.export( + start=start, + end=end, + dest_dir=dest_dir, + streams=streams, + ) diff --git a/mcp-server/tests/unit/test_build_flags.py b/mcp-server/tests/unit/test_build_flags.py new file mode 100644 index 000000000..bace81fae --- /dev/null +++ b/mcp-server/tests/unit/test_build_flags.py @@ -0,0 +1,88 @@ +"""Unit tests for the `build_flags` injection on `flash.build()`. + +We don't actually run pio here — too slow, requires hardware-aware envs. +We test the translation layer (`_build_flags_env`) and that the env vars +are threaded through pio.run correctly via mock. +""" + +from __future__ import annotations + +from unittest.mock import patch + +from meshtastic_mcp import flash, pio + + +class TestBuildFlagsEnv: + def test_simple_value(self) -> None: + out = flash._build_flags_env({"DEBUG_HEAP": 1}) + assert out == {"PLATFORMIO_BUILD_FLAGS": "-DDEBUG_HEAP=1"} + + def test_string_value(self) -> None: + out = flash._build_flags_env({"FOO": "bar"}) + assert out == {"PLATFORMIO_BUILD_FLAGS": "-DFOO=bar"} + + def test_bool_true_is_bare_flag(self) -> None: + out = flash._build_flags_env({"DEBUG_HEAP": True}) + assert out == {"PLATFORMIO_BUILD_FLAGS": "-DDEBUG_HEAP"} + + def test_bool_false_dropped(self) -> None: + out = flash._build_flags_env({"DEBUG_HEAP": False, "OTHER": 1}) + assert out == {"PLATFORMIO_BUILD_FLAGS": "-DOTHER=1"} + + def test_none_dropped(self) -> None: + out = flash._build_flags_env({"DEBUG_HEAP": None}) + assert out == {} + + def test_multiple_combined(self) -> None: + out = flash._build_flags_env({"DEBUG_HEAP": 1, "FOO": "x", "BAR": True}) + # Order isn't guaranteed in dict iteration, so check membership. + flags = out["PLATFORMIO_BUILD_FLAGS"].split() + assert set(flags) == {"-DDEBUG_HEAP=1", "-DFOO=x", "-DBAR"} + + +class TestBuildPropagatesFlags: + def test_extra_env_passed_to_pio_run(self) -> None: + # Mock pio.run so we don't actually invoke pio. Capture extra_env. + captured = {} + + class _StubResult: + returncode = 0 + stdout = "" + stderr = "" + duration_s = 0.1 + + def _stub(args, **kwargs): + captured["args"] = args + captured["kwargs"] = kwargs + return _StubResult() + + with patch.object(pio, "run", side_effect=_stub): + with patch.object(flash, "_artifacts_for", return_value=[]): + out = flash.build( + "fake-env", + with_manifest=False, + build_flags={"DEBUG_HEAP": 1}, + ) + assert captured["args"] == ["run", "-e", "fake-env"] + assert captured["kwargs"]["extra_env"] == { + "PLATFORMIO_BUILD_FLAGS": "-DDEBUG_HEAP=1" + } + assert out["build_flags"] == {"DEBUG_HEAP": 1} + + def test_no_flags_means_no_extra_env(self) -> None: + captured = {} + + class _StubResult: + returncode = 0 + stdout = "" + stderr = "" + duration_s = 0.1 + + def _stub(args, **kwargs): + captured["kwargs"] = kwargs + return _StubResult() + + with patch.object(pio, "run", side_effect=_stub): + with patch.object(flash, "_artifacts_for", return_value=[]): + flash.build("fake-env", with_manifest=False) + assert captured["kwargs"]["extra_env"] is None diff --git a/mcp-server/tests/unit/test_recorder.py b/mcp-server/tests/unit/test_recorder.py new file mode 100644 index 000000000..a7d93d78f --- /dev/null +++ b/mcp-server/tests/unit/test_recorder.py @@ -0,0 +1,548 @@ +"""Unit tests for the persistent device-log recorder. + +Hardware-free: drives the Recorder through its `_on_*` handlers with +synthetic packet/line dicts, then queries via log_query. Validates +prefix parsing, telemetry variant dispatch, marker round-trip, time +window filtering, downsampling, slope estimation, and gzip rotation ++ archive pruning. +""" + +from __future__ import annotations + +import gzip +import json +import logging +import os +import time +from pathlib import Path + +import pubsub +import pytest +from meshtastic_mcp import log_query +from meshtastic_mcp.recorder.parsers import ( + extract_telemetry, + interface_label, + parse_log_line, + summarize_packet, +) +from meshtastic_mcp.recorder.recorder import Recorder +from meshtastic_mcp.recorder.rotating import _RotatingJsonl + +# -- isolation: every test gets a fresh Recorder + tmp dir ----------- + + +@pytest.fixture +def recorder(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Recorder: + # Redirect both the Recorder and the module-level singleton lookup + # to the same tmp dir so log_query queries the same files we write. + monkeypatch.setenv("MESHTASTIC_MCP_LOG_DIR", str(tmp_path)) + monkeypatch.setattr( + "meshtastic_mcp.recorder.recorder._INSTANCE", None, raising=False + ) + r = Recorder(base_dir=tmp_path) + r.start() + monkeypatch.setattr("meshtastic_mcp.recorder.recorder._INSTANCE", r, raising=False) + yield r + r.stop() + + +class _FakeIface: + devPath = "/dev/cu.fake" + + +# -- parsers --------------------------------------------------------- + + +class TestParseLogLine: + def test_full_prefix(self) -> None: + out = parse_log_line("INFO | 12:34:56 12345 [Main] Booting") + assert out["level"] == "INFO" + assert out["tag"] == "Main" + assert out["uptime_s"] == 12345 + assert out["msg"] == "Booting" + assert out["clock"] == "12:34:56" + + def test_invalid_clock(self) -> None: + out = parse_log_line("WARN | ??:??:?? 7 [SerialConsole] Boot") + assert out["level"] == "WARN" + assert out["clock"] == "??:??:??" + assert out["uptime_s"] == 7 + + def test_no_thread_bracket(self) -> None: + out = parse_log_line("DEBUG | 00:00:00 0 raw message body") + assert out["level"] == "DEBUG" + assert out.get("tag") is None + assert out["msg"] == "raw message body" + + def test_bare_message(self) -> None: + # LogRecord.message path — no level prefix at all. + out = parse_log_line("just a bare message") + assert "level" not in out or out.get("level") is None + assert out["line"] == "just a bare message" + + def test_empty(self) -> None: + assert parse_log_line("") == {"line": ""} + + def test_debug_heap_prefix_extracted(self) -> None: + out = parse_log_line("INFO | 12:34:56 12345 [Main] [heap 92344] Booting") + assert out["level"] == "INFO" + assert out["tag"] == "Main" + assert out["heap_free"] == 92344 + assert out["msg"] == "Booting" + + def test_debug_heap_prefix_on_bare_line(self) -> None: + # LogRecord.message path: no level prefix but still has [heap N]. + out = parse_log_line("[heap 12345] some message") + assert out["heap_free"] == 12345 + assert out["msg"] == "some message" + + def test_thread_leak_event(self) -> None: + out = parse_log_line( + "HEAP | 00:00:01 100 [Power] [heap 90000] " + "------ Thread MeshPacket leaked heap 92344 -> 90000 (-2344) ------" + ) + assert out["level"] == "HEAP" + assert out["heap_free"] == 90000 + ev = out["heap_event"] + assert ev["kind"] == "leaked" + assert ev["thread"] == "MeshPacket" + assert ev["before"] == 92344 + assert ev["after"] == 90000 + assert ev["delta"] == -2344 + + def test_thread_freed_event(self) -> None: + out = parse_log_line( + "++++++ Thread Router freed heap 1000 -> 1500 (500) ++++++" + ) + ev = out["heap_event"] + assert ev["kind"] == "freed" + assert ev["thread"] == "Router" + assert ev["delta"] == 500 + + def test_heap_status_periodic(self) -> None: + out = parse_log_line( + "HEAP | 00:00:30 30 [Power] " + "Heap status: 92344/200000 bytes free (-128), running 8/12 threads" + ) + assert out["heap_free"] == 92344 + assert out["heap_total"] == 200000 + assert out["heap_delta"] == -128 + + +class TestRecorderDebugHeapSynthesis: + def test_log_with_heap_writes_telemetry(self, recorder: "Recorder") -> None: + # When a log line carries [heap N], the recorder should also + # emit a synthesized telemetry row tagged source=debug_heap. + recorder._on_log_line( + "INFO | 00:00:00 1 [Main] [heap 88888] hello", + _FakeIface(), + ) + telem = (recorder.base_dir / "telemetry.jsonl").read_text().splitlines() + synth = [json.loads(r) for r in telem if '"source":"debug_heap"' in r] + assert len(synth) == 1 + assert synth[0]["fields"]["heap_free_bytes"] == 88888 + assert synth[0]["variant"] == "local" + + def test_heap_status_writes_total_too(self, recorder: "Recorder") -> None: + recorder._on_log_line( + "HEAP | 00:00:30 30 [Power] " + "Heap status: 50000/200000 bytes free (-100), running 8/12 threads", + _FakeIface(), + ) + telem = (recorder.base_dir / "telemetry.jsonl").read_text().splitlines() + synth = [json.loads(r) for r in telem if '"source":"debug_heap"' in r] + assert synth[-1]["fields"]["heap_free_bytes"] == 50000 + assert synth[-1]["fields"]["heap_total_bytes"] == 200000 + + def test_no_heap_no_synthesis(self, recorder: "Recorder") -> None: + # Plain log line (no [heap N], no Heap status) — telemetry.jsonl + # should NOT gain a synth row. + before = (recorder.base_dir / "telemetry.jsonl").read_text().count("\n") + recorder._on_log_line("INFO | 00:00:00 1 [Main] just a message", _FakeIface()) + after = (recorder.base_dir / "telemetry.jsonl").read_text().count("\n") + assert after == before + + def test_thread_leak_event_persists_on_log_row(self, recorder: "Recorder") -> None: + recorder._on_log_line( + "HEAP | 00:00:01 100 [Power] [heap 90000] " + "------ Thread MeshPacket leaked heap 92344 -> 90000 (-2344) ------", + _FakeIface(), + ) + rows = [ + json.loads(r) + for r in (recorder.base_dir / "logs.jsonl").read_text().splitlines() + if r + ] + evt_rows = [r for r in rows if r.get("heap_event")] + assert len(evt_rows) == 1 + assert evt_rows[0]["heap_event"]["thread"] == "MeshPacket" + assert evt_rows[0]["heap_event"]["delta"] == -2344 + + +class TestSerialTap: + def test_serial_line_records_log_and_synthesizes_heap( + self, recorder: "Recorder" + ) -> None: + recorder._on_serial_line( + "INFO | 00:00:00 5 [Main] [heap 88888] tap-line", + port="/dev/cu.tap", + ) + logs = (recorder.base_dir / "logs.jsonl").read_text().splitlines() + telem = (recorder.base_dir / "telemetry.jsonl").read_text().splitlines() + log_rows = [json.loads(r) for r in logs if r] + # Find the row from this call (port=/dev/cu.tap, role=serial_session) + tap_rows = [r for r in log_rows if r.get("port") == "/dev/cu.tap"] + assert len(tap_rows) == 1 + assert tap_rows[0]["role"] == "serial_session" + assert tap_rows[0]["level"] == "INFO" + assert tap_rows[0]["tag"] == "Main" + assert tap_rows[0]["heap_free"] == 88888 + synth = [json.loads(r) for r in telem if '"source":"debug_heap_serial"' in r] + assert len(synth) == 1 + assert synth[0]["fields"]["heap_free_bytes"] == 88888 + assert synth[0]["role"] == "serial_session" + + def test_serial_line_thread_leak_event(self, recorder: "Recorder") -> None: + recorder._on_serial_line( + "HEAP | 00:00:30 30 [Power] [heap 53484] " + "------ Thread Router leaked heap 53612 -> 53484 (-128) ------", + port="/dev/cu.tap", + ) + rows = [ + json.loads(r) + for r in (recorder.base_dir / "logs.jsonl").read_text().splitlines() + if r + ] + evt = [r for r in rows if r.get("heap_event")] + assert len(evt) == 1 + assert evt[0]["heap_event"]["thread"] == "Router" + assert evt[0]["heap_event"]["delta"] == -128 + # Heap also synthesized. + telem = (recorder.base_dir / "telemetry.jsonl").read_text() + assert '"source":"debug_heap_serial"' in telem + + def test_serial_line_pause(self, recorder: "Recorder") -> None: + recorder.pause("baseline") + recorder._on_serial_line( + "INFO | 00:00:00 1 [t] [heap 1000] dropped", + port="/dev/cu.tap", + ) + # Only the pause event row should exist; no tap row. + logs = (recorder.base_dir / "logs.jsonl").read_text() + assert "dropped" not in logs + + def test_serial_line_handler_swallows_exceptions( + self, recorder: "Recorder" + ) -> None: + # Hostile input — should not raise. + recorder._on_serial_line(None, port="/dev/cu.tap") # type: ignore[arg-type] + recorder._on_serial_line(b"\x00\x01\x02\x03", port="/dev/cu.tap") # type: ignore[arg-type] + # Survived. + + +class TestExtractTelemetry: + def test_local_stats_camel(self) -> None: + pkt = { + "decoded": { + "telemetry": { + "localStats": {"heap_total_bytes": 1000, "heap_free_bytes": 600} + } + } + } + out = extract_telemetry(pkt) + assert out is not None + assert out["variant"] == "local" + assert out["fields"]["heap_free_bytes"] == 600 + + def test_device_metrics_snake(self) -> None: + pkt = { + "decoded": { + "telemetry": {"device_metrics": {"battery_level": 88, "voltage": 4.1}} + } + } + out = extract_telemetry(pkt) + assert out is not None + assert out["variant"] == "device" + assert out["fields"]["battery_level"] == 88 + + def test_unknown_variant_returns_none(self) -> None: + assert extract_telemetry({"decoded": {"telemetry": {"weird": {}}}}) is None + assert extract_telemetry({}) is None + assert extract_telemetry({"decoded": "not-a-dict"}) is None + + +class TestSummarizePacket: + def test_text_with_payload(self) -> None: + pkt = { + "fromId": "!abc", + "toId": "!def", + "decoded": {"portnum": "TEXT_MESSAGE_APP", "payload": b"hello"}, + "hopLimit": 3, + } + out = summarize_packet(pkt) + assert out["from_node"] == "!abc" + assert out["portnum"] == "TEXT_MESSAGE_APP" + assert out["payload_size"] == 5 + assert out["payload_hex_prefix"] == "68656c6c6f" + + def test_no_decoded(self) -> None: + out = summarize_packet({"fromId": "!abc"}) + assert out["from_node"] == "!abc" + assert out["portnum"] is None + + +class TestInterfaceLabel: + def test_serial(self) -> None: + assert interface_label(_FakeIface()) == { + "port": "/dev/cu.fake", + "role": "serial", + } + + def test_tcp(self) -> None: + class T: + hostname = "node.lan" + portNumber = 4403 + + assert interface_label(T()) == {"port": "tcp://node.lan:4403", "role": "tcp"} + + def test_unknown(self) -> None: + assert interface_label(object()) == {"port": "object", "role": None} + + def test_none(self) -> None: + assert interface_label(None) == {"port": None, "role": None} + + +# -- recorder write side --------------------------------------------- + + +class TestRecorderWrites: + def test_log_line_is_recorded(self, recorder: Recorder) -> None: + recorder._on_log_line("INFO | 12:34:56 99 [T] hi", _FakeIface()) + path = recorder.base_dir / "logs.jsonl" + rows = [json.loads(line) for line in path.read_text().splitlines() if line] + # First row is recorder_start_event mirror? No — that's events.jsonl only. + assert any(r.get("level") == "INFO" and r.get("tag") == "T" for r in rows) + + def test_telemetry_recorded_and_packet_double(self, recorder: Recorder) -> None: + # _on_telemetry alone — only telemetry.jsonl + recorder._on_telemetry( + { + "fromId": "!abc", + "decoded": {"telemetry": {"localStats": {"heap_free_bytes": 600}}}, + }, + _FakeIface(), + ) + telem_rows = (recorder.base_dir / "telemetry.jsonl").read_text().splitlines() + assert any('"variant":"local"' in r for r in telem_rows) + + def test_packets_summary(self, recorder: Recorder) -> None: + recorder._on_receive( + { + "fromId": "!abc", + "toId": "!def", + "decoded": {"portnum": "TEXT_MESSAGE_APP", "payload": b"hi"}, + }, + _FakeIface(), + ) + rows = (recorder.base_dir / "packets.jsonl").read_text().splitlines() + assert any('"portnum":"TEXT_MESSAGE_APP"' in r for r in rows) + + def test_mark_event_round_trip(self, recorder: Recorder) -> None: + out = recorder.mark_event("checkpoint", note="midpoint") + assert "ts" in out + events = (recorder.base_dir / "events.jsonl").read_text().splitlines() + logs = (recorder.base_dir / "logs.jsonl").read_text().splitlines() + assert any('"label":"checkpoint"' in r and '"kind":"mark"' in r for r in events) + assert any('"level":"MARK"' in r and "checkpoint" in r for r in logs) + + def test_pause_drops_writes(self, recorder: Recorder) -> None: + before = len((recorder.base_dir / "logs.jsonl").read_text().splitlines()) + recorder.pause(reason="baseline") + recorder._on_log_line("INFO | 00:00:00 1 [t] swallowed", _FakeIface()) + after = len((recorder.base_dir / "logs.jsonl").read_text().splitlines()) + assert after == before + recorder.resume() + recorder._on_log_line("INFO | 00:00:00 2 [t] kept", _FakeIface()) + post_resume = (recorder.base_dir / "logs.jsonl").read_text() + assert "kept" in post_resume + + def test_pubsub_handler_swallows_exceptions(self, recorder: Recorder) -> None: + # If the writer dies, the pubsub callback must NOT raise — that + # would crash the meshtastic receive thread. + bad_packet = object() # not a dict + recorder._on_receive(bad_packet, _FakeIface()) # type: ignore[arg-type] + recorder._on_telemetry(bad_packet, _FakeIface()) # type: ignore[arg-type] + recorder._on_log_line(None, _FakeIface()) # type: ignore[arg-type] + # No assertion needed — survival is the test. + + +# -- log_query read side --------------------------------------------- + + +class TestLogQuery: + def test_logs_window_grep_and_level(self, recorder: Recorder) -> None: + recorder._on_log_line("INFO | 12:00:00 1 [A] alpha", _FakeIface()) + recorder._on_log_line("WARN | 12:00:01 2 [B] bravo failed", _FakeIface()) + recorder._on_log_line("ERROR | 12:00:02 3 [C] charlie failed", _FakeIface()) + + out = log_query.logs_window(start="-1m", level="WARN|ERROR", max_lines=10) + assert out["total_matched"] == 2 + levels = {r["level"] for r in out["lines"]} + assert levels == {"WARN", "ERROR"} + + out2 = log_query.logs_window(start="-1m", grep=r"failed$", max_lines=10) + assert out2["total_matched"] == 2 + + def test_logs_window_invalid_regex(self, recorder: Recorder) -> None: + recorder._on_log_line("INFO | 12:00:00 1 [A] alpha", _FakeIface()) + with pytest.raises(ValueError, match="invalid grep regex"): + log_query.logs_window(start="-1m", grep="(") + + def test_telemetry_timeline_slope_and_downsample(self, recorder: Recorder) -> None: + # Synthesize a downward leak: 100 points, free_heap drops 1 byte/sample. + base_ts = time.time() - 60 + for i in range(100): + recorder._files["telemetry"].write( + { + "ts": base_ts + i * 0.5, + "port": "/dev/cu.fake", + "role": "serial", + "from_node": "!abc", + "variant": "local", + "fields": {"heap_free_bytes": 10000 - i}, + } + ) + + out = log_query.telemetry_timeline( + window="2m", variant="local", field="free_heap", max_points=10 + ) + assert out["samples"] == 100 + assert len(out["points"]) <= 10 + # Negative slope (heap dropping). Magnitude: 1 byte every 0.5s = 120/min. + assert out["slope_per_min"] is not None + assert out["slope_per_min"] < -100 + + def test_export_bundles_slice(self, recorder: Recorder, tmp_path: Path) -> None: + recorder._on_log_line("INFO | 00:00:00 1 [t] one", _FakeIface()) + recorder._on_log_line("INFO | 00:00:00 2 [t] two", _FakeIface()) + dest = tmp_path / "bundle" + out = log_query.export(start="-1m", end="now", dest_dir=str(dest)) + assert (dest / "logs.jsonl").exists() + assert "logs" in out["paths"] + + +# -- time parser ----------------------------------------------------- + + +class TestParseTime: + def test_relative(self) -> None: + now = 1_000_000.0 + assert log_query._parse_time("-15m", now=now) == now - 900 + assert log_query._parse_time("-2h", now=now) == now - 7200 + assert log_query._parse_time("-1d", now=now) == now - 86400 + + def test_now_and_epoch(self) -> None: + now = 1_000_000.0 + assert log_query._parse_time("now", now=now) == now + assert log_query._parse_time(now) == now + + def test_iso(self) -> None: + ts = log_query._parse_time("2026-01-01T00:00:00Z") + assert isinstance(ts, float) and ts > 1_700_000_000 + + def test_naive_iso_assumes_utc(self) -> None: + assert log_query._parse_time("2026-01-01T00:00:00") == log_query._parse_time( + "2026-01-01T00:00:00Z" + ) + + def test_invalid(self) -> None: + with pytest.raises(ValueError): + log_query._parse_time("not a time") + + +# -- rotation -------------------------------------------------------- + + +class TestRotation: + def test_size_cap_rotates_and_gzips(self, tmp_path: Path) -> None: + path = tmp_path / "rot.jsonl" + r = _RotatingJsonl(path, max_bytes=512, keep_archives=5, check_every=1) + for i in range(100): + r.write({"ts": float(i), "i": i, "pad": "x" * 40}) + r.close() + archives = sorted(tmp_path.glob("rot.*.jsonl.gz")) + assert archives, "expected at least one rotation" + # Archive content is valid gzip + valid JSONL + with gzip.open(archives[0], "rt") as fh: + first = json.loads(fh.readline()) + assert "ts" in first + + def test_archive_pruning(self, tmp_path: Path) -> None: + path = tmp_path / "rot.jsonl" + r = _RotatingJsonl(path, max_bytes=200, keep_archives=2, check_every=1) + # Force several rotations. + for _ in range(8): + for i in range(20): + r.write({"ts": float(i), "pad": "x" * 30}) + r.force_rotate() + r.close() + archives = sorted(tmp_path.glob("rot.*.jsonl.gz")) + assert len(archives) <= 2, f"expected ≤2 kept archives, got {len(archives)}" + + def test_archive_pruning_uses_filename_order(self, tmp_path: Path) -> None: + path = tmp_path / "rot.jsonl" + r = _RotatingJsonl(path, keep_archives=2) + old = tmp_path / "rot.20260101-000000-000000-00000.jsonl.gz" + mid = tmp_path / "rot.20260101-000001-000000-00000.jsonl.gz" + new = tmp_path / "rot.20260101-000002-000000-00000.jsonl.gz" + for archive in (old, mid, new): + with gzip.open(archive, "wt", encoding="utf-8") as fh: + fh.write('{"ts":1}\n') + # Deliberately scramble mtimes so lexicographic filename order is + # the only stable chronological signal. + os.utime(old, (300, 300)) + os.utime(mid, (100, 100)) + os.utime(new, (200, 200)) + + r._prune_archives() + r.close() + + archives = sorted(p.name for p in tmp_path.glob("rot.*.jsonl.gz")) + assert archives == [mid.name, new.name] + + def test_force_rotate_when_below_threshold(self, tmp_path: Path) -> None: + path = tmp_path / "rot.jsonl" + r = _RotatingJsonl(path, max_bytes=10_000_000, check_every=999_999) + r.write({"ts": 1.0, "msg": "tiny"}) + r.force_rotate() + r.write({"ts": 2.0, "msg": "after-rotate"}) + r.close() + archives = sorted(tmp_path.glob("rot.*.jsonl.gz")) + assert len(archives) == 1 + assert path.exists() + assert "after-rotate" in path.read_text() + + +class TestRecorderLocks: + def test_force_rotate_all_returns_status(self, recorder: Recorder) -> None: + out = recorder.force_rotate_all() + assert out["running"] is True + assert out["files"] + + def test_wire_pubsub_logs_subscription_failure( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, + ) -> None: + class FailingPubSubMock: + def subscribe(self, callback: object, topic: str) -> None: + raise RuntimeError("boom") + + monkeypatch.setattr(pubsub, "pub", FailingPubSubMock()) + recorder = Recorder(base_dir=tmp_path) + with caplog.at_level(logging.WARNING): + recorder._wire_pubsub() + assert ( + "Recorder failed to subscribe to meshtastic.log.line: boom" in caplog.text + )