Implement rotating JSONL recorder for persistent logging (#10428)

* Implement rotating JSONL recorder for persistent logging

* Fixes

* Update documentation and clean up imports in command files

* Address remaining recorder review feedback

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/2541773c-869a-463f-9fae-8505272c06ff

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* recorder: fix lock re-entry deadlock on start() and force_rotate_all()

The previous "Fixes" commit added `_files_snapshot()` which acquires
`self._lock` so handlers don't race with `stop()` clearing `_files`.
But two callers were already holding `self._lock` when they invoked
methods that go through the snapshot:

  - `start()` writes the `recorder_start` event from inside its `with
    self._lock:` block. `_write_event` -> `_files_snapshot` re-acquires
    the same non-reentrant `threading.Lock`, freezing process startup.

  - `force_rotate_all()` calls `self.status()` (which also acquires
    `self._lock`) while still holding the lock from rotating each file.

Both fixes release the lock before the call. The recorder_start marker
still lands in events.jsonl because the started/started_at flags are
already set when we write it.

Verified end-to-end against the standalone /tmp/verify_pr_fixes.py
harness — all 9 PR review-comment fixes pass, including pause/resume
event ordering and concurrent start/stop without KeyError.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Fix markdown linting issues in leakhunt.md and repro.md

* Handle recorder startup and query review fixes

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* Tighten recorder follow-up tests

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* Stabilize recorder startup tests

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* Remove brittle recorder startup test

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* Polish recorder follow-up errors

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* Refine recorder startup and regex errors

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* Clean up recorder follow-up nits

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* Trunk

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ben Meadors
2026-05-10 09:22:40 -05:00
committed by GitHub
parent 10a7f1042b
commit f6a954b97e
17 changed files with 3047 additions and 5 deletions

View File

@@ -49,11 +49,17 @@ Call the meshtastic MCP tool bundle and format a structured health report for on
- Do the LoRa configs match? (region, channel_num, modem_preset should all agree; mismatch = no mesh)
- Do the primary channel NAMES match? Mismatch = different PSK = no decode.
7. **Suggest next actions only for specific, recognisable failure modes**:
7. **Recorder slice (cheap, always available).** The mcp-server runs an autouse log recorder that's been collecting from every connected device. Pull two short slices to surface anything weird that's already happened:
- `mcp__meshtastic__logs_window(start="-2m", level="WARN|ERROR|CRIT", max_lines=20)` — recent firmware errors. If empty, say "no recent errors"; don't manufacture concern.
- `mcp__meshtastic__telemetry_timeline(window="1h", field="free_heap", max_points=60)` — heap trend. If `slope_per_min < -50`, flag it and recommend `/leakhunt window=6h` for a deeper read; otherwise just note the current free heap.
- If `recorder_status` shows `running:false` or `files.telemetry.last_ts` is null, note "recorder has no telemetry yet — enable `set_debug_log_api(True)` to populate" and skip this step gracefully.
8. **Suggest next actions only for specific, recognisable failure modes**:
- Stale PKI pubkey one-way → "run `/test tests/mesh/test_direct_with_ack.py` — the retry + nodeinfo-ping heals this in the test path."
- Region mismatch → "re-bake one side via `./mcp-server/run-tests.sh --force-bake`."
- Device unreachable, reachable via DFU → `touch_1200bps(port=...)` + `pio_flash`. If not even DFU responds AND the device is on a PPPS hub, escalate to `uhubctl_cycle(role=..., confirm=True)`.
- CP2102-wedged-driver on macOS → see the note in `run-tests.sh`.
- Heap slope strongly negative → "run `/leakhunt window=6h` for a full timeline + classification."
## What NOT to do

View File

@@ -0,0 +1,103 @@
---
description: Hunt for memory leaks (and other slow degradations) by reading the persistent recorder's heap timeline + log slice over a window
argument-hint: [window=1h] [field=free_heap] [variant=local]
---
<!-- markdownlint-disable MD029 -->
# `/leakhunt` — read the recorder, classify a memory leak
Use the always-on recorder (`mcp-server/.mtlog/`) to read a heap timeline plus the matching log slice and produce a one-page verdict: **steady / slow leak / fragmentation / OOM-imminent**. No firmware changes, no special build flags — the LocalStats telemetry packet that the firmware already broadcasts every ~60 s carries `heap_free_bytes` and `heap_total_bytes`.
## Two signal paths — pick the right one
| Path | Build flag | Cadence | Per-thread attribution | Cost |
| --------------------- | ---------------- | -------------- | ---------------------- | ------------------------- |
| LocalStats packet | (default) | ~60 s | No | Free — always on |
| `[heap N]` log prefix | `-DDEBUG_HEAP=1` | every log line | Yes (Thread X leaked) | Bigger flash + log volume |
Both feed the same `telemetry_timeline(field="free_heap")` query — when DEBUG_HEAP is on, the recorder synthesizes telemetry rows from log prefixes (tagged `source: debug_heap`), so a single timeline call gets whichever signal is available. **For a slow leak diagnosis, the default path is plenty** (60 s cadence over 6 h = 360 points; linear regression over that nails sub-100-byte/min slopes). **DEBUG_HEAP is for attribution** — when the slope is real and you need to know which thread is leaking.
## What to do
1. **Parse `$ARGUMENTS`**: optional `window` (default `1h`, accepts `30m`/`6h`/`-3d`/etc.), optional `field` (default `free_heap`; alternates: `total_heap`, `battery_level`, anything in the LocalStats variant), optional `variant` (default `local`; alternates: `device`, `environment`, `power`, `airQuality`, `health`).
2. **Verify the recorder is alive** — call `mcp__meshtastic__recorder_status`. Check:
- `running == True`
- `files.telemetry.lines > 0` (at least one telemetry packet recorded — if zero, the device hasn't broadcast LocalStats yet OR `set_debug_log_api` has never been on; tell the operator to run `mcp__meshtastic__set_debug_log_api(enabled=True)` and wait one device-update interval)
- `files.telemetry.last_ts` within the last 5 minutes (if older, the device is silent — log that, not "leak detected")
3. **Detect whether DEBUG_HEAP is active**`mcp__meshtastic__logs_window(start="-2m", grep=r"\\[heap \\d+\\]", max_lines=3)`. If any line matches, the firmware has the prefix → DEBUG_HEAP is on, expect higher-cadence data and `heap_event` rows. If zero matches over the last 2 minutes, you're on the LocalStats-only path.
4. **Pull the timeline**`mcp__meshtastic__telemetry_timeline(window=$window, variant=$variant, field=$field, max_points=200)`. Read:
- `samples` — how many raw points contributed
- `min`, `max` — total swing
- `slope_per_min` — units per minute (linear regression over the whole window)
5. **Pull the log context for the same window**`mcp__meshtastic__logs_window(start="-${window}", grep="Heap status|leaked heap|freed heap|out of memory|Alloc an err|panic|abort", max_lines=200)`. These are the strings the firmware emits when something memory-related happens (`DEBUG_HEAP` builds emit `"Heap status:"` and `"leaked heap"` lines; production builds emit `"Alloc an err"` on failure and `"out of memory"` on OOM).
6. **Pull marker events** so we know if the operator labeled phases — `mcp__meshtastic__events_window(start="-${window}", kind="mark|connection_lost|connection_established")`. If a `connection_lost` overlaps a sharp drop, that's not a leak; that's a reboot.
6a. **(DEBUG_HEAP only) Per-thread attribution** — `mcp__meshtastic__logs_window(start="-${window}", grep="leaked heap", max_lines=200)`. Each row has a structured `heap_event` field with `{kind, thread, before, after, delta}`. Aggregate by thread: sum the `delta` over the window per thread name. The thread with the largest cumulative negative delta is your suspect. Note the count too — a thread with 50× small leaks is different from 1× big leak.
7. **Classify** based on what the data says, NOT on what you wish it said. Use these rules in order:
- **Insufficient data** (< 5 samples): say so. Suggest a longer window or longer wait. Stop.
- **Reboot mid-window**: if any `connection_lost` event is present AND `free_heap` jumped UP at that timestamp, the device rebooted. Note it; pre-reboot trend may be a leak but you only have part of the curve.
- **OOM-imminent**: any `Alloc an err=` or `out of memory` line in the log slice. This trumps everything; flag urgently.
- **Slow leak**: `slope_per_min < -50` AND `max - min > 1000` AND no reboot. The heap is monotonically (or near-monotonically) declining. Estimate time-to-zero: `min / -slope_per_min` minutes. Surface it.
- **Fragmentation suspect**: `slope_per_min` close to zero (|x| < 50) BUT min trends down across the window AND the log slice shows `Alloc an err` warnings WITHOUT total OOM. Means free total is OK but largest contiguous block is shrinking. Recommend a `DEBUG_HEAP` build to confirm.
- **Steady**: |slope_per_min| < 50, no error lines. Heap is fine.
- **Recovery curve**: slope is POSITIVE — heap recovered. Either a workload completed or GC fired. Note it; not a leak.
8. **Report**:
```text
/leakhunt window=6h field=free_heap variant=local
────────────────────────────────────────────────────
recorder : running, telem last_ts 8s ago
build : DEBUG_HEAP=ON (per-line prefix detected)
samples : 14,200 over 6h (cadence ~1.5s, log-line synth)
free_heap : min 92,344 / max 124,008 / range 31,664
slope : -82 bytes/min (negative — heap declining)
reboots : none in window
OOM events : none
error lines : 3× "Alloc an err=ESP_ERR_NO_MEM" at +4h12m, +5h08m, +5h44m
thread leaks : (DEBUG_HEAP) MeshPacket -3,124 B over 18 events
Router -1,408 B over 4 events
others -240 B
verdict : SLOW LEAK — primary suspect MeshPacket thread
est. time-to-OOM: ~1,127 min (~18.8 h) at current slope
evidence : (3 log line citations with uptimes)
```
Then: **what to do next.**
- SLOW LEAK, **DEBUG_HEAP off** → recommend rebuilding with the flag and re-running this skill. Concrete one-liner the operator can copy:
```text
mcp__meshtastic__build(env="<env>", build_flags={"DEBUG_HEAP": 1})
mcp__meshtastic__pio_flash(env="<env>", port="<port>", confirm=True)
```
After flash, set debug_log_api back on and wait one window; re-run `/leakhunt`.
- SLOW LEAK, **DEBUG_HEAP on** → cite the top-leaking thread name from step 6a. Point at the corresponding source file (`grep -rn "ThreadName(\"<name>\")" src/`); the operator decides what to fix.
- FRAGMENTATION SUSPECT → propose pre-allocating any per-packet buffers; or rebuilding with `CONFIG_HEAP_TASK_TRACKING=y` on ESP32 to see who's holding the largest blocks.
- OOM-IMMINENT → flag for immediate attention; don't wait for the next telemetry interval.
- STEADY → say so; stop. Don't invent problems.
## What NOT to do
- Don't assume a leak from a single dip. LocalStats fires every ~60 s and the firmware naturally allocates+frees on each broadcast cycle; one packet sees the trough. Look at the slope, not the deltas.
- Don't recommend code changes. This skill diagnoses; the operator decides what to fix.
- Don't enable `set_debug_log_api` automatically — if it's off, telemetry isn't reaching pubsub anyway, and the recorder will be empty. Tell the operator to flip it on and wait, then re-run.
- Don't run heavy workloads to "trigger the leak." The recorder is passive; we read what's there.
## Companion: `mark_event` for stress runs
If the operator wants to test under stimulus (e.g. blast 50 broadcasts and see what the heap does), they can frame the experiment with markers:
```text
mark_event("burst-start")
… run the workload …
mark_event("burst-end")
/leakhunt window=15m
```
The markers land in both `events.jsonl` and `logs.jsonl`, so the report can show "free_heap dipped 8 KB during the burst window, recovered to baseline within 2 LocalStats cycles" → not a leak.

View File

@@ -3,6 +3,8 @@ description: Re-run a specific test N times in isolation to triage flakes, diff
argument-hint: <test-node-id> [count=5]
---
<!-- markdownlint-disable MD029 -->
# `/repro` — flakiness triage for one test
Re-run a single pytest node ID N times in isolation, track pass rate, and surface what's _different_ in the firmware logs between the passing attempts and the failing ones. Turns "it's flaky, I guess" into "it fails when X, passes when Y."
@@ -40,6 +42,8 @@ Re-run a single pytest node ID N times in isolation, track pass rate, and surfac
Surface the top 3 differences as a "passes when / fails when" table. Don't dump full logs — pull specific lines with uptime timestamps.
5a. **Archive recorder slices per attempt** (no extra device interaction; the recorder runs autouse). Right after each attempt finishes, capture its `(start_ts, end_ts)` and call `mcp__meshtastic__recorder_export(start=<start>, end=<end>, dest_dir="mcp-server/tests/repro_artifacts/<safe-test-id>/attempt_<n>/")`. This drops a `logs.jsonl`, `telemetry.jsonl`, `packets.jsonl`, and `events.jsonl` snapshot scoped to the attempt window. Use these for cross-attempt diffs in step 5: `jq '.line' logs.jsonl` is faster than re-running the test, and the telemetry slice lets you compare heap behavior across attempts.
6. **Classify the flake** into one of:
- **LoRa airtime collision** → pass rate improves with fewer concurrent transmitters; propose a `time.sleep` gap or retry bump in the test body.
- **PKI key staleness** → fails on first attempt, passes after self-heal; existing retry loop in `test_direct_with_ack.py` handles this.

View File

@@ -7,6 +7,12 @@ __pycache__/
dist/
build/
# Persistent device-log capture (recorder + Datadog cursor).
# Cross-session JSONL streams written by the autouse Recorder singleton
# (see src/meshtastic_mcp/recorder/). Lives outside tests/ so the pytest
# fixture truncate doesn't touch it.
.mtlog/
# Test harness artifacts
tests/report.html
tests/junit.xml

View File

@@ -0,0 +1,217 @@
{
"title": "Meshtastic Firmware — Recorder Stream",
"description": "Live view of `.mtlog/` streams shipped by `mtlog_to_datadog.py`. Heap, packet volume, log levels, errors. One row per port.",
"widgets": [
{
"definition": {
"title": "Free heap (bytes)",
"type": "timeseries",
"show_legend": true,
"requests": [
{
"queries": [
{
"name": "free_heap",
"data_source": "metrics",
"query": "avg:mesh.local.heap_free_bytes{service:meshtastic-firmware} by {port}"
}
],
"response_format": "timeseries",
"display_type": "line"
}
],
"yaxis": { "label": "bytes" }
}
},
{
"definition": {
"title": "Heap slope (bytes/min) — last 1h",
"type": "query_value",
"precision": 0,
"requests": [
{
"queries": [
{
"name": "slope",
"data_source": "metrics",
"query": "derivative(avg:mesh.local.heap_free_bytes{service:meshtastic-firmware})",
"aggregator": "avg"
}
],
"response_format": "scalar"
}
],
"conditional_formats": [
{ "comparator": "<", "value": -100, "palette": "white_on_red" },
{ "comparator": "<", "value": 0, "palette": "white_on_yellow" },
{ "comparator": ">=", "value": 0, "palette": "white_on_green" }
]
}
},
{
"definition": {
"title": "Total heap (bytes)",
"type": "timeseries",
"requests": [
{
"queries": [
{
"name": "total_heap",
"data_source": "metrics",
"query": "avg:mesh.local.heap_total_bytes{service:meshtastic-firmware} by {port}"
}
],
"response_format": "timeseries",
"display_type": "line"
}
]
}
},
{
"definition": {
"title": "Battery level (%)",
"type": "timeseries",
"requests": [
{
"queries": [
{
"name": "battery",
"data_source": "metrics",
"query": "avg:mesh.device.battery_level{service:meshtastic-firmware} by {port}"
}
],
"response_format": "timeseries",
"display_type": "line"
}
],
"yaxis": { "min": "0", "max": "105" }
}
},
{
"definition": {
"title": "Air utilization (TX %)",
"type": "timeseries",
"requests": [
{
"queries": [
{
"name": "airutil",
"data_source": "metrics",
"query": "avg:mesh.device.air_util_tx{service:meshtastic-firmware} by {port}"
}
],
"response_format": "timeseries",
"display_type": "line"
}
]
}
},
{
"definition": {
"title": "Channel utilization (%)",
"type": "timeseries",
"requests": [
{
"queries": [
{
"name": "chutil",
"data_source": "metrics",
"query": "avg:mesh.device.channel_utilization{service:meshtastic-firmware} by {port}"
}
],
"response_format": "timeseries",
"display_type": "line"
}
]
}
},
{
"definition": {
"title": "Log volume by level",
"type": "timeseries",
"show_legend": true,
"requests": [
{
"response_format": "timeseries",
"display_type": "bars",
"queries": [
{
"name": "log_count",
"data_source": "logs",
"indexes": ["*"],
"compute": { "aggregation": "count" },
"search": { "query": "service:meshtastic-firmware" },
"group_by": [
{
"facet": "@level",
"limit": 10,
"sort": { "order": "desc", "aggregation": "count" }
}
]
}
]
}
]
}
},
{
"definition": {
"title": "Recent ERROR / CRIT firmware logs",
"type": "list_stream",
"requests": [
{
"response_format": "event_list",
"query": {
"data_source": "logs_stream",
"query_string": "service:meshtastic-firmware (status:error OR @level:ERROR OR @level:CRIT)",
"indexes": [],
"sort": { "column": "timestamp", "order": "desc" }
},
"columns": [
{ "field": "timestamp", "width": "auto" },
{ "field": "host", "width": "auto" },
{ "field": "@port", "width": "auto" },
{ "field": "@level", "width": "auto" },
{ "field": "@thread", "width": "auto" },
{ "field": "message", "width": "stretch" }
]
}
]
}
},
{
"definition": {
"title": "Recorder marker events",
"type": "list_stream",
"requests": [
{
"response_format": "event_list",
"query": {
"data_source": "logs_stream",
"query_string": "service:meshtastic-firmware @level:MARK",
"indexes": [],
"sort": { "column": "timestamp", "order": "desc" }
},
"columns": [
{ "field": "timestamp", "width": "auto" },
{ "field": "host", "width": "auto" },
{ "field": "message", "width": "stretch" }
]
}
]
}
}
],
"template_variables": [
{
"name": "port",
"prefix": "port",
"available_values": [],
"default": "*"
},
{ "name": "host", "prefix": "host", "available_values": [], "default": "*" }
],
"layout_type": "ordered",
"notify_list": [],
"reflow_type": "auto"
}

View File

@@ -0,0 +1,389 @@
#!/usr/bin/env python3
"""Forward selected recorder JSONL streams to Datadog.
Reads `.mtlog/logs.jsonl` and `.mtlog/telemetry.jsonl`, ships logs to the
Logs Intake API and telemetry numerics to the Metrics v2 series API.
Resumes from `.mtlog/.dd-cursor.json` so a daemon restart doesn't
duplicate rows already shipped from the current live files.
This forwarder does not currently backfill rotated `.jsonl.gz` archives.
If the recorder rotates before this process drains the live file, or the
forwarder is down across a rotation, those older rows are skipped.
Usage:
DD_API_KEY=... ./scripts/mtlog_to_datadog.py --tail
./scripts/mtlog_to_datadog.py --once # catch up + exit
./scripts/mtlog_to_datadog.py --since 3600 # backfill last hour from start
Default `DD_SITE` is `us5.datadoghq.com` — the team's Datadog instance.
Override via `DD_SITE=...` env var or `--site` flag for one-offs.
The forwarder is a separate process by design — a Datadog outage or
auth failure must not backpressure the recorder. We exit non-zero on
fatal config errors (missing API key) and keep retrying on transient
network/HTTP errors.
"""
from __future__ import annotations
import argparse
import json
import os
import socket
import sys
import time
from pathlib import Path
from typing import Any, Iterator
try:
import requests
except ImportError:
print(
"requests is required. Install it in the mcp-server venv: "
"uv pip install requests",
file=sys.stderr,
)
sys.exit(2)
_DEFAULT_LOG_DIR = Path(__file__).resolve().parents[1] / ".mtlog"
_LOG_INTAKE_TPL = "https://http-intake.logs.{site}/api/v2/logs"
_METRICS_TPL = "https://api.{site}/api/v2/series"
_LOG_BATCH = 50
_METRICS_BATCH = 100
_MAX_RETRIES = 5
_RETRY_BASE_S = 1.5
# --- streaming JSONL with byte-position cursor -------------------------
class _StreamReader:
"""Reads a single rotating JSONL with cursor-based resume.
This tails only the live `.jsonl` file. The recorder rotates files
(live `.jsonl` → `.YYYYMMDD-HHMMSS-uuuuuu-NNNNN.jsonl.gz`), which means
the live file shrinks abruptly. We detect that via inode change OR live
size < cursor position, and reset the live-file cursor to 0.
"""
def __init__(self, path: Path, cursor: dict[str, Any]):
self.path = path
self.cursor = cursor
def _state(self) -> tuple[int, int]:
"""Return (inode, size) for the live file. (0, 0) if missing."""
try:
st = self.path.stat()
return (st.st_ino, st.st_size)
except FileNotFoundError:
return (0, 0)
def iter_new_records(self) -> Iterator[dict[str, Any]]:
ino, size = self._state()
last_ino = self.cursor.get("ino")
last_pos = int(self.cursor.get("pos") or 0)
if ino == 0:
return
if last_ino is not None and last_ino != ino:
# Rotation happened. Start over.
last_pos = 0
if last_pos > size:
# Live file truncated/shrunk under us — recorder rotated.
last_pos = 0
try:
with self.path.open("r", encoding="utf-8") as fh:
fh.seek(last_pos)
for line in fh:
line = line.rstrip("\n")
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError:
continue
last_pos = fh.tell()
except FileNotFoundError:
return
self.cursor["ino"] = ino
self.cursor["pos"] = last_pos
def _load_cursor(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
return json.loads(path.read_text())
except (OSError, json.JSONDecodeError):
return {}
def _save_cursor(path: Path, data: dict[str, Any]) -> None:
tmp = path.with_suffix(".json.tmp")
tmp.write_text(json.dumps(data, separators=(",", ":")))
tmp.replace(path)
# --- Datadog clients ---------------------------------------------------
class _DDSession:
"""Pool one HTTPS session, share retry logic."""
def __init__(self, api_key: str, site: str, hostname: str) -> None:
self.api_key = api_key
self.site = site
self.hostname = hostname
self.session = requests.Session()
self.session.headers.update(
{
"DD-API-KEY": api_key,
"Content-Type": "application/json",
}
)
def _post(self, url: str, payload: Any) -> bool:
for attempt in range(_MAX_RETRIES):
try:
resp = self.session.post(url, json=payload, timeout=30)
except requests.RequestException as e:
_wait_retry(attempt, f"network error: {e}")
continue
if 200 <= resp.status_code < 300:
return True
if resp.status_code in (408, 429, 500, 502, 503, 504):
_wait_retry(
attempt,
f"HTTP {resp.status_code} retrying",
)
continue
print(
f"datadog refused: {resp.status_code} {resp.text[:200]}",
file=sys.stderr,
)
return False
return False
def send_logs(self, records: list[dict[str, Any]]) -> int:
if not records:
return 0
url = _LOG_INTAKE_TPL.format(site=self.site)
sent = 0
for i in range(0, len(records), _LOG_BATCH):
batch = records[i : i + _LOG_BATCH]
if self._post(url, batch):
sent += len(batch)
return sent
def send_metrics(self, series: list[dict[str, Any]]) -> int:
if not series:
return 0
url = _METRICS_TPL.format(site=self.site)
sent = 0
for i in range(0, len(series), _METRICS_BATCH):
batch = series[i : i + _METRICS_BATCH]
if self._post(url, {"series": batch}):
sent += len(batch)
return sent
def _wait_retry(attempt: int, reason: str) -> None:
wait = _RETRY_BASE_S * (2**attempt)
print(
f" retry {attempt + 1}/{_MAX_RETRIES} in {wait:.1f}s ({reason})",
file=sys.stderr,
)
time.sleep(wait)
# --- record → datadog payload ------------------------------------------
def _log_record_to_dd(rec: dict[str, Any], host: str) -> dict[str, Any]:
line = rec.get("line") or ""
tags = [
f"role:{rec.get('role')}",
f"port:{rec.get('port')}",
]
level = rec.get("level")
if level:
tags.append(f"level:{level}")
tag = rec.get("tag")
if tag:
tags.append(f"thread:{tag}")
return {
"ddsource": "meshtastic-firmware",
"service": "meshtastic-firmware",
"hostname": host,
"message": line,
"ddtags": ",".join(t for t in tags if t and "None" not in t),
"timestamp": int((rec.get("ts") or time.time()) * 1000),
"level": level,
}
def _telemetry_record_to_metrics(
rec: dict[str, Any], host: str
) -> list[dict[str, Any]]:
fields = rec.get("fields") or {}
if not isinstance(fields, dict):
return []
variant = rec.get("variant") or "unknown"
ts = int(rec.get("ts") or time.time())
out: list[dict[str, Any]] = []
tags = []
if rec.get("port"):
tags.append(f"port:{rec['port']}")
if rec.get("role"):
tags.append(f"role:{rec['role']}")
if rec.get("from_node"):
tags.append(f"from_node:{rec['from_node']}")
tags.append(f"variant:{variant}")
for field, value in fields.items():
if not isinstance(value, (int, float)) or isinstance(value, bool):
continue
metric = f"mesh.{variant}.{_metric_safe(field)}"
out.append(
{
"metric": metric,
"type": 3, # GAUGE
"points": [{"timestamp": ts, "value": float(value)}],
"tags": tags,
"resources": [{"type": "host", "name": host}],
}
)
return out
def _metric_safe(name: str) -> str:
# Lowercase, replace non-alnum with underscore for safe metric names.
return "".join(c.lower() if c.isalnum() else "_" for c in name)
# --- main loop ---------------------------------------------------------
def run(
log_dir: Path,
*,
once: bool,
since_seconds: float | None,
poll_interval: float,
dd: _DDSession,
) -> int:
cursor_path = log_dir / ".dd-cursor.json"
cursors = _load_cursor(cursor_path)
# `--since` overrides cursor: rewind to (now-since) timestamp.
# We can't seek by timestamp directly (cursor is byte position), so
# we just reset cursors to 0 and let the time filter in iter_new
# drop older records.
cutoff_ts: float | None = None
if since_seconds is not None:
cursors = {}
cutoff_ts = time.time() - since_seconds
sent_total = {"logs": 0, "telemetry": 0}
while True:
# logs.jsonl → DD logs
log_cursor = cursors.setdefault("logs", {})
log_batch: list[dict[str, Any]] = []
for rec in _StreamReader(log_dir / "logs.jsonl", log_cursor).iter_new_records():
if cutoff_ts and (rec.get("ts") or 0) < cutoff_ts:
continue
log_batch.append(_log_record_to_dd(rec, dd.hostname))
if log_batch:
n = dd.send_logs(log_batch)
sent_total["logs"] += n
print(f"logs: sent {n}/{len(log_batch)}")
# telemetry.jsonl → DD metrics
telem_cursor = cursors.setdefault("telemetry", {})
metric_series: list[dict[str, Any]] = []
for rec in _StreamReader(
log_dir / "telemetry.jsonl", telem_cursor
).iter_new_records():
if cutoff_ts and (rec.get("ts") or 0) < cutoff_ts:
continue
metric_series.extend(_telemetry_record_to_metrics(rec, dd.hostname))
if metric_series:
n = dd.send_metrics(metric_series)
sent_total["telemetry"] += n
print(f"telemetry: sent {n}/{len(metric_series)} metric points")
_save_cursor(cursor_path, cursors)
if once:
print(f"done. logs={sent_total['logs']} metrics={sent_total['telemetry']}")
return 0
time.sleep(poll_interval)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--log-dir",
default=str(_DEFAULT_LOG_DIR),
help="Path to .mtlog/ (default: mcp-server/.mtlog)",
)
mode = parser.add_mutually_exclusive_group()
mode.add_argument("--once", action="store_true", help="Catch up then exit")
mode.add_argument(
"--tail",
action="store_true",
help="Daemon: poll forever (default)",
)
parser.add_argument(
"--since",
type=float,
default=None,
help="Backfill last N seconds. Resets cursor.",
)
parser.add_argument(
"--poll-interval",
type=float,
default=5.0,
help="Seconds between tail polls (default 5)",
)
parser.add_argument(
"--site",
default=os.environ.get("DD_SITE", "us5.datadoghq.com"),
help=(
"Datadog site. Default is the team's instance (us5.datadoghq.com). "
"Override via DD_SITE env var or this flag."
),
)
parser.add_argument(
"--host",
default=socket.gethostname(),
help="Hostname tag (default: socket.gethostname())",
)
args = parser.parse_args(argv)
api_key = os.environ.get("DD_API_KEY")
if not api_key:
print("DD_API_KEY env var required.", file=sys.stderr)
return 2
log_dir = Path(args.log_dir)
if not log_dir.exists():
print(
f"log dir {log_dir} does not exist — start the mcp-server first.",
file=sys.stderr,
)
return 2
dd = _DDSession(api_key=api_key, site=args.site, hostname=args.host)
once = args.once and not args.tail
return run(
log_dir,
once=once,
since_seconds=args.since,
poll_interval=args.poll_interval,
dd=dd,
)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -108,18 +108,33 @@ def build(
env: str,
with_manifest: bool = True,
userprefs_overrides: dict[str, Any] | None = None,
build_flags: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Run `pio run -e <env>` and return artifact paths.
`userprefs_overrides` (optional): dict of `USERPREFS_<KEY>: value` to inject
into userPrefs.jsonc for this build only. File is restored byte-for-byte
on exit. Use `userprefs_set()` for persistent changes.
`build_flags` (optional): dict of `-D<NAME>=<VALUE>` macros to set for
this build only via `PLATFORMIO_BUILD_FLAGS`. Common useful flag:
`{"DEBUG_HEAP": 1}` enables per-thread leak detection + `[heap N]`
prefix on every log line. Combines with the recorder so heap shows
up at log cadence (much higher resolution than the ~60 s LocalStats
packet) — see `recorder/parsers.py:_HEAP_PREFIX_RE`. Bool values
expand to bare `-D<NAME>` (presence-only flags).
"""
args = ["run", "-e", env]
if with_manifest:
args.extend(["-t", "mtjson"])
extra_env = _build_flags_env(build_flags) if build_flags else None
with userprefs.temporary_overrides(userprefs_overrides) as effective:
result = pio.run(args, timeout=pio.TIMEOUT_BUILD, check=False)
result = pio.run(
args,
timeout=pio.TIMEOUT_BUILD,
check=False,
extra_env=extra_env,
)
return {
"exit_code": result.returncode,
"artifacts": [str(p) for p in _artifacts_for(env)],
@@ -127,9 +142,27 @@ def build(
"stderr_tail": pio.tail_lines(result.stderr, 200),
"duration_s": round(result.duration_s, 2),
"userprefs": _userprefs_summary(effective),
"build_flags": dict(build_flags) if build_flags else None,
}
def _build_flags_env(build_flags: dict[str, Any]) -> dict[str, str]:
"""Translate `{"DEBUG_HEAP": 1, "FOO": "bar"}` → `{"PLATFORMIO_BUILD_FLAGS":
"-DDEBUG_HEAP=1 -DFOO=bar"}`. Bool True → bare `-D<NAME>`; False/None drop
the flag entirely. Other types stringify."""
parts: list[str] = []
for key, value in build_flags.items():
if value is False or value is None:
continue
if value is True:
parts.append(f"-D{key}")
else:
parts.append(f"-D{key}={value}")
if not parts:
return {}
return {"PLATFORMIO_BUILD_FLAGS": " ".join(parts)}
def clean(env: str) -> dict[str, Any]:
"""Run `pio run -e <env> -t clean`."""
result = pio.run(["run", "-e", env, "-t", "clean"], timeout=120, check=False)
@@ -146,20 +179,29 @@ def flash(
port: str,
confirm: bool = False,
userprefs_overrides: dict[str, Any] | None = None,
build_flags: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""`pio run -e <env> -t upload --upload-port <port>`. All architectures.
`userprefs_overrides` (optional): see `build()` — the rebuild-before-upload
that pio performs will pick up the injected values.
`build_flags` (optional): same shape as `build()` — `PLATFORMIO_BUILD_FLAGS`
is exported for the rebuild-before-upload, so the uploaded firmware
actually carries the flags. Without this propagation, `pio run -t upload`
would relink without the env var and silently drop them. Common use:
`build_flags={"DEBUG_HEAP": 1}` for the leak-hunt path.
"""
_require_confirm(confirm, "flash")
_reject_native_env(env, "flash")
connection.reject_if_tcp(port, "flash")
extra_env = _build_flags_env(build_flags) if build_flags else None
with userprefs.temporary_overrides(userprefs_overrides) as effective:
result = pio.run(
["run", "-e", env, "-t", "upload", "--upload-port", port],
timeout=pio.TIMEOUT_UPLOAD,
check=False,
extra_env=extra_env,
)
return {
"exit_code": result.returncode,
@@ -167,6 +209,7 @@ def flash(
"stderr_tail": pio.tail_lines(result.stderr, 200),
"duration_s": round(result.duration_s, 2),
"userprefs": _userprefs_summary(effective),
"build_flags": dict(build_flags) if build_flags else None,
}

View File

@@ -0,0 +1,410 @@
"""Read-side queries over the recorder's JSONL streams.
Pure functions over `mcp-server/.mtlog/`. Streaming JSONL reader: never
loads a whole file. Time-bound queries short-circuit as soon as `ts`
exceeds the requested end. The recorder writes monotonically, so a
forward scan is cheap; we don't need an index.
All time arguments accept:
- epoch seconds (int/float)
- relative strings: "-15m", "-2h", "-3d", "now"
- ISO-ish absolute strings: "2026-05-07T14:30:00" (naive timestamps are
treated as UTC)
Tools that return data ALWAYS cap their output (max_lines / max_points
/ max), and report whether more matched than was returned.
"""
from __future__ import annotations
import gzip
import json
import re
import statistics
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterator
from .recorder.recorder import get_recorder
_REL_RE = re.compile(r"^\s*-\s*(\d+(?:\.\d+)?)\s*([smhd])\s*$")
_REGEX_PREVIEW_MAX = 100
_REGEX_PREVIEW_TRUNCATE = 97
def _parse_time(value: Any, *, now: float | None = None) -> float:
"""Coerce to epoch seconds. Defaults `now` to `time.time()`."""
if value is None:
return time.time()
if isinstance(value, (int, float)):
return float(value)
if not isinstance(value, str):
raise ValueError(f"invalid time: {value!r}")
s = value.strip().lower()
if s in ("", "now"):
return time.time() if now is None else now
m = _REL_RE.match(s)
if m:
n = float(m.group(1))
unit = m.group(2)
secs = n * {"s": 1, "m": 60, "h": 3600, "d": 86400}[unit]
base = time.time() if now is None else now
return base - secs
# Try ISO 8601. Accept naive (assume UTC) and Z-suffixed.
try:
if s.endswith("z"):
s = s[:-1] + "+00:00"
dt = datetime.fromisoformat(s)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.timestamp()
except ValueError as e:
raise ValueError(f"unparseable time: {value!r}") from e
def _iter_jsonl(path: Path, *, since: float, until: float) -> Iterator[dict[str, Any]]:
"""Stream records in chronological order: rotated archives first
(oldest → newest by lex sort, which is chronological for our
`YYYYMMDD-HHMMSS-uuuuuu-NNNNN` archive naming), then the live file
last. The "keep last N" pop-front logic in the window queries
relies on records arriving in time order across files.
"""
files: list[Path] = []
# Gzipped archives are named "<stem>.YYYYMMDD-HHMMSS-uuuuuu-NNNNN.jsonl.gz".
for archive in sorted(path.parent.glob(f"{path.stem}.*.jsonl.gz")):
files.append(archive)
if path.exists():
files.append(path)
for f in files:
opener = gzip.open if f.suffix == ".gz" else open
try:
with opener(f, "rt", encoding="utf-8") as fh: # type: ignore[arg-type]
for line in fh:
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
except json.JSONDecodeError:
continue
ts = rec.get("ts")
if not isinstance(ts, (int, float)):
continue
if ts < since:
continue
if ts > until:
# Records are append-monotonic within a file, so
# the rest of this file is also past `until`.
# Archives can still overlap each other, so only
# short-circuit this file, not the whole scan.
break
yield rec
except (FileNotFoundError, OSError):
continue
# -- queries ------------------------------------------------------------
def logs_window(
start: Any = "-15m",
end: Any = "now",
*,
grep: str | None = None,
level: str | None = None,
tag: str | None = None,
port: str | None = None,
max_lines: int = 200,
) -> dict[str, Any]:
"""Recent firmware log lines, filtered.
`level` accepts a single level name or pipe-separated set
("WARN|ERROR|CRIT"). `grep` is a regex (Python re) over the raw
`line` field. Returns the last `max_lines` matches.
"""
s = _parse_time(start)
e = _parse_time(end)
levels = _split_set(level)
if grep:
try:
grep_re = re.compile(grep)
except re.error as exc:
preview = (
grep
if len(grep) <= _REGEX_PREVIEW_MAX
else f"{grep[:_REGEX_PREVIEW_TRUNCATE]}..."
)
raise ValueError(f"invalid grep regex {preview!r}: {exc}") from exc
else:
grep_re = None
base = get_recorder().base_dir
matched = 0
out: list[dict[str, Any]] = []
for rec in _iter_jsonl(base / "logs.jsonl", since=s, until=e):
if levels and rec.get("level") not in levels:
continue
if tag and rec.get("tag") != tag:
continue
if port and rec.get("port") != port:
continue
if grep_re and not grep_re.search(rec.get("line") or ""):
continue
matched += 1
out.append(rec)
if len(out) > max_lines:
out.pop(0) # keep the most recent N
return {
"lines": out,
"total_matched": matched,
"dropped": max(0, matched - max_lines),
"window": {"start": s, "end": e},
}
def telemetry_timeline(
window: Any = "1h",
*,
variant: str = "local",
field: str = "free_heap",
port: str | None = None,
max_points: int = 200,
) -> dict[str, Any]:
"""Timeseries of one telemetry field, downsampled.
`field` matches both the protobuf snake_case name (`free_heap`,
`heap_free_bytes`, `battery_level`) and camelCase (`freeHeap`).
Server-side bucket-mean downsamples to ≤ `max_points`. Returns
`slope_per_min` (linear regression slope, units/min) so a leak
detector can read one number.
"""
end = time.time()
if isinstance(window, (int, float)):
# Numeric `window` is a duration in seconds — "last N seconds".
# Without this branch, `_parse_time(-N)` would treat -N as an
# absolute epoch timestamp (i.e., Jan 1 1970 minus N seconds),
# producing a wildly negative `start` and matching nothing.
start = end - float(window)
elif isinstance(window, str) and not window.startswith("-"):
# Bare string like "1h" is sugar for "-1h".
start = _parse_time(f"-{window}", now=end)
else:
start = _parse_time(window, now=end)
base = get_recorder().base_dir
raw: list[tuple[float, float]] = []
field_aliases = _field_aliases(field)
for rec in _iter_jsonl(base / "telemetry.jsonl", since=start, until=end):
if rec.get("variant") != variant:
continue
if port and rec.get("port") != port:
continue
fields = rec.get("fields") or {}
value: Any = None
for alias in field_aliases:
if alias in fields:
value = fields[alias]
break
if not isinstance(value, (int, float)):
continue
raw.append((float(rec["ts"]), float(value)))
if not raw:
return {
"points": [],
"samples": 0,
"min": None,
"max": None,
"slope_per_min": None,
"window": {"start": start, "end": end, "variant": variant, "field": field},
}
points = _downsample(raw, max_points=max_points)
values = [v for _, v in raw]
return {
"points": [{"ts": ts, "value": v} for ts, v in points],
"samples": len(raw),
"min": min(values),
"max": max(values),
"slope_per_min": _slope_per_min(raw),
"window": {"start": start, "end": end, "variant": variant, "field": field},
}
def packets_window(
start: Any = "-5m",
end: Any = "now",
*,
portnum: str | None = None,
from_node: str | None = None,
to_node: str | None = None,
max: int = 200,
) -> dict[str, Any]:
s = _parse_time(start)
e = _parse_time(end)
portnums = _split_set(portnum)
base = get_recorder().base_dir
matched = 0
out: list[dict[str, Any]] = []
for rec in _iter_jsonl(base / "packets.jsonl", since=s, until=e):
if portnums and rec.get("portnum") not in portnums:
continue
if from_node and str(rec.get("from_node")) != str(from_node):
continue
if to_node and str(rec.get("to_node")) != str(to_node):
continue
matched += 1
out.append(rec)
if len(out) > max:
out.pop(0)
return {
"packets": out,
"total_matched": matched,
"dropped": matched - max if matched > max else 0,
"window": {"start": s, "end": e},
}
def events_window(
start: Any = "-1h",
end: Any = "now",
*,
kind: str | None = None,
max: int = 200,
) -> dict[str, Any]:
s = _parse_time(start)
e = _parse_time(end)
kinds = _split_set(kind)
base = get_recorder().base_dir
matched = 0
out: list[dict[str, Any]] = []
for rec in _iter_jsonl(base / "events.jsonl", since=s, until=e):
if kinds and rec.get("kind") not in kinds:
continue
matched += 1
out.append(rec)
if len(out) > max:
out.pop(0)
return {
"events": out,
"total_matched": matched,
"dropped": matched - max if matched > max else 0,
"window": {"start": s, "end": e},
}
def export(
start: Any,
end: Any,
dest_dir: str,
*,
streams: list[str] | None = None,
) -> dict[str, Any]:
"""Bundle a slice of each requested stream into `dest_dir`.
For a notebook, a bug report, or a Datadog backfill. Output files
are uncompressed JSONL (callers gzip themselves if they want to).
"""
s = _parse_time(start)
e = _parse_time(end)
selected = streams or ["logs", "telemetry", "packets", "events"]
dest = Path(dest_dir)
dest.mkdir(parents=True, exist_ok=True)
base = get_recorder().base_dir
paths: dict[str, str] = {}
for stream in selected:
src = base / f"{stream}.jsonl"
if not src.exists() and not list(base.glob(f"{stream}.*.jsonl.gz")):
continue
out_path = dest / f"{stream}.jsonl"
n = 0
with out_path.open("w", encoding="utf-8") as fh:
for rec in _iter_jsonl(src, since=s, until=e):
fh.write(json.dumps(rec, separators=(",", ":")) + "\n")
n += 1
paths[stream] = str(out_path)
paths[f"{stream}_count"] = str(n)
return {"dest_dir": str(dest), "paths": paths, "window": {"start": s, "end": e}}
# -- helpers ------------------------------------------------------------
def _split_set(value: str | None) -> set[str] | None:
if not value:
return None
return {v.strip() for v in value.split("|") if v.strip()}
def _field_aliases(field: str) -> list[str]:
"""Accept snake_case OR camelCase, plus a few legacy aliases."""
snake = field
camel = _snake_to_camel(field)
aliases = {snake, camel}
# Old protobuf fields (pre-LocalStats) used different names
legacy = {
"free_heap": ["free_heap", "freeHeap", "heap_free_bytes", "heapFreeBytes"],
"heap_free_bytes": [
"heap_free_bytes",
"heapFreeBytes",
"free_heap",
"freeHeap",
],
"total_heap": ["total_heap", "totalHeap", "heap_total_bytes", "heapTotalBytes"],
"heap_total_bytes": [
"heap_total_bytes",
"heapTotalBytes",
"total_heap",
"totalHeap",
],
}
if field in legacy:
aliases.update(legacy[field])
return list(aliases)
def _snake_to_camel(name: str) -> str:
parts = name.split("_")
return parts[0] + "".join(p.title() for p in parts[1:])
def _downsample(
points: list[tuple[float, float]], *, max_points: int
) -> list[tuple[float, float]]:
if len(points) <= max_points:
return points
# Even-bucket mean. Preserves shape better than nth-sample picking.
n = len(points)
bucket = n / max_points
out: list[tuple[float, float]] = []
i = 0
for k in range(max_points):
end = int((k + 1) * bucket)
end = min(end, n)
if end <= i:
continue
chunk = points[i:end]
ts = chunk[len(chunk) // 2][0]
val = statistics.fmean(v for _, v in chunk)
out.append((ts, val))
i = end
return out
def _slope_per_min(points: list[tuple[float, float]]) -> float | None:
"""Least-squares slope (units per minute). None if too few points."""
if len(points) < 2:
return None
xs = [t for t, _ in points]
ys = [v for _, v in points]
n = len(xs)
mean_x = sum(xs) / n
mean_y = sum(ys) / n
num = sum((xs[i] - mean_x) * (ys[i] - mean_y) for i in range(n))
den = sum((x - mean_x) ** 2 for x in xs)
if den == 0:
return None
slope_per_sec = num / den
return slope_per_sec * 60.0

View File

@@ -92,6 +92,7 @@ def _run_capturing(
cwd: Path | None = None,
timeout: float | None = None,
tee_header: str | None = None,
extra_env: dict[str, str] | None = None,
) -> tuple[int, str, str, float]:
"""Run a subprocess, capture stdout+stderr, optionally tee to the flash log.
@@ -99,6 +100,9 @@ def _run_capturing(
`subprocess.TimeoutExpired` on timeout (callers map this to their own
domain-specific error).
`extra_env` merges into the subprocess environment (parent env stays
intact). Used for `PLATFORMIO_BUILD_FLAGS=-DDEBUG_HEAP=1` and similar.
Fast path: `subprocess.run(capture_output=True)` when no flash log is
configured (unchanged behavior).
@@ -110,6 +114,9 @@ def _run_capturing(
"""
log_path = _flash_log_path()
t0 = time.monotonic()
env = None
if extra_env:
env = {**os.environ, **extra_env}
if log_path is None:
# Fast path — unchanged.
@@ -119,6 +126,7 @@ def _run_capturing(
capture_output=True,
text=True,
timeout=timeout,
env=env,
)
return (
proc.returncode,
@@ -145,6 +153,7 @@ def _run_capturing(
stderr=subprocess.PIPE,
text=True,
bufsize=1, # line-buffered
env=env,
)
stdout_chunks: list[str] = []
stderr_chunks: list[str] = []
@@ -232,12 +241,17 @@ def run(
cwd: Path | None = None,
timeout: float | None = TIMEOUT_DEFAULT,
check: bool = True,
extra_env: dict[str, str] | None = None,
) -> PioResult:
"""Invoke `pio <args>` and return captured output.
`cwd` defaults to the firmware root. `check=True` raises `PioError` on
non-zero exit; set `check=False` to inspect `returncode` manually.
`extra_env` merges into the subprocess environment — used for
`PLATFORMIO_BUILD_FLAGS=-DDEBUG_HEAP=1` and similar build-time
toggles that can't be expressed as command-line args.
If `MESHTASTIC_MCP_FLASH_LOG` is set, output is also tee'd to that file
line-by-line as it arrives (for live flash progress in the TUI).
"""
@@ -250,6 +264,7 @@ def run(
cwd=work_dir,
timeout=timeout,
tee_header=f"pio {' '.join(args)}",
extra_env=extra_env,
)
except subprocess.TimeoutExpired as exc:
raise PioTimeout(f"pio {' '.join(args)} timed out after {timeout}s") from exc

View File

@@ -0,0 +1,19 @@
"""Persistent device-log capture.
Singleton `Recorder` subscribes once to the meshtastic pubsub fan-out
(`meshtastic.log.line`, `meshtastic.receive.*`, `meshtastic.connection.*`)
and appends to four JSONL files under `mcp-server/.mtlog/`. Pubsub is
process-global so a single subscription captures every active interface
(serial / TCP / BLE) without any per-connection bookkeeping.
The recorder is opt-in-by-import: importing this package is a no-op; call
`get_recorder().start()` (which `server.py` does at FastMCP app init) to
begin writing. `pause()` / `resume()` exist for the rare case the user
wants a clean stretch of file (e.g. capturing a known-good baseline).
"""
from __future__ import annotations
from .recorder import Recorder, get_recorder
__all__ = ["Recorder", "get_recorder"]

View File

@@ -0,0 +1,309 @@
"""Best-effort parsers for log lines and telemetry packets.
Two flavors of log line cross our pubsub subscription:
1. Text-mode path (debug_log_api disabled): the meshtastic Python lib
accumulates bytes between protobuf frames and emits the full
firmware-formatted line, e.g.
"INFO | 12:34:56 12345 [Main] Booting"
— level, HH:MM:SS, uptime seconds, thread bracket, then message.
2. LogRecord protobuf path (debug_log_api enabled): the lib calls
`_handleLogLine(record.message)` with ONLY the message body. The
level/source/time fields on the LogRecord are dropped before
pubsub fan-out. We get e.g. just "Booting".
Both arrive on `meshtastic.log.line`. The parser tries to recover a
level + thread when the prefix is present and falls back to level=None
otherwise. Consumers who want level filtering on protobuf-mode hosts
should grep the raw `line` field instead.
Telemetry: `meshtastic.receive.telemetry` packets carry one of several
metric variants in `packet["decoded"]["telemetry"]`. We flatten the
chosen variant into a {field: value} dict so callers don't have to
know the protobuf shape.
"""
from __future__ import annotations
import re
from typing import Any
# Match: LEVEL | HH:MM:SS UPTIME [Thread] message
# HH:MM:SS may be ??:??:?? when RTC isn't valid. The level alternation
# below is the canonical list — DebugConfiguration.h's MESHTASTIC_LOG_LEVEL_*
# macros must stay in sync with these strings.
_LINE_RE = re.compile(
r"""
^
(?P<level>DEBUG|INFO\ |WARN\ |ERROR|CRIT\ |TRACE|HEAP\ )
\s*\|\s*
(?P<clock>(?:\d{2}:\d{2}:\d{2})|(?:\?{2}:\?{2}:\?{2}))
\s+
(?P<uptime>\d+)
\s+
(?:\[(?P<thread>[^\]]+)\]\s+)?
(?P<msg>.*)
$
""",
re.VERBOSE,
)
# DEBUG_HEAP build prepends `[heap N] ` to every message body, AFTER the
# thread bracket. See src/RedirectablePrint.cpp:175.
_HEAP_PREFIX_RE = re.compile(r"^\[heap\s+(?P<heap>\d+)\]\s+(?P<rest>.*)$")
# OSThread leak/free detection. See src/concurrency/OSThread.cpp:89-91.
# Format: "------ Thread NAME leaked heap A -> B (delta) ------"
# "++++++ Thread NAME freed heap A -> B (delta) ++++++"
_THREAD_HEAP_RE = re.compile(
r"""
^[\-+]+\s*
Thread\s+(?P<thread>\S+)\s+
(?P<kind>leaked|freed)\s+heap\s+
(?P<before>-?\d+)\s*->\s*(?P<after>-?\d+)\s+
\((?P<delta>-?\d+)\)
""",
re.VERBOSE,
)
# Power.cpp:908 periodic heap status (DEBUG_HEAP only).
# Format: "Heap status: FREE/TOTAL bytes free (DELTA), running R/N threads"
_HEAP_STATUS_RE = re.compile(
r"""
Heap\s+status:\s+
(?P<free>\d+)\s*/\s*(?P<total>\d+)\s+bytes\s+free
(?:\s+\((?P<delta>-?\d+)\))?
""",
re.VERBOSE,
)
_ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]")
_HEAP_BRACKET_RE = re.compile(r"^heap\s+(?P<heap>\d+)$")
def parse_log_line(line: str) -> dict[str, Any]:
"""Best-effort decompose a raw firmware log line.
Returns a dict with at least `line` (the original, unmodified — ANSI
codes preserved for fidelity). Adds `level`, `tag`, `clock`,
`uptime_s`, and `msg` when the full prefix is present.
Handles two firmware quirks:
- LogRecord.message can carry ANSI color escapes from RedirectablePrint
(the BLE/StreamAPI path inherited the colored body in some builds).
We strip ANSI before regex matching so the prefix survives.
- DEBUG_HEAP injects `[heap N]` after the thread bracket. When NO
thread name is set, the heap takes the thread bracket position —
looks like `[heap 12345] msg`. We detect that shape and move it
out of `tag` and into `heap_free`.
DEBUG_HEAP-build extras (when `[heap N]` is injected): `heap_free`
(bytes), and when a `Thread X leaked|freed heap` line is recognized,
`heap_event` = {kind, thread, before, after, delta}.
Never raises.
"""
out: dict[str, Any] = {"line": line}
if not line:
return out
# Strip ANSI escapes BEFORE any regex matching. The original `line`
# stays in `out["line"]` for fidelity / future grep.
clean = _ANSI_RE.sub("", line)
m = _LINE_RE.match(clean)
msg: str | None = None
if m:
level = m.group("level").rstrip()
out["level"] = level
out["clock"] = m.group("clock")
try:
out["uptime_s"] = int(m.group("uptime"))
except (TypeError, ValueError):
out["uptime_s"] = None
thread = m.group("thread")
if thread:
# If "thread" is actually the heap prefix taking the bracket
# position (DEBUG_HEAP build, no thread set), capture heap
# and leave tag unset.
hb = _HEAP_BRACKET_RE.match(thread.strip())
if hb:
try:
out["heap_free"] = int(hb.group("heap"))
except (TypeError, ValueError):
pass
else:
out["tag"] = thread
msg = m.group("msg")
out["msg"] = msg
else:
# No prefix — bare LogRecord.message body. Inspect the whole
# line for DEBUG_HEAP-style content; the heap-prefix and
# thread-leak patterns can survive on either path.
msg = clean
# DEBUG_HEAP per-line heap prefix: `[heap 92344] message`.
# Sits AFTER the thread bracket and BEFORE the message body, but
# for bare LogRecord lines it's at the start. Match it at the
# head of `msg`.
if msg:
hp = _HEAP_PREFIX_RE.match(msg)
if hp:
try:
out["heap_free"] = int(hp.group("heap"))
except (TypeError, ValueError):
pass
else:
# Strip the prefix from `msg` so a grep on the message
# body doesn't have to know about it.
out["msg"] = hp.group("rest")
msg = hp.group("rest")
# Thread-level leak/free detection.
thr = _THREAD_HEAP_RE.search(msg)
if thr:
try:
out["heap_event"] = {
"kind": thr.group("kind"),
"thread": thr.group("thread"),
"before": int(thr.group("before")),
"after": int(thr.group("after")),
"delta": int(thr.group("delta")),
}
except (TypeError, ValueError):
pass
# Power.cpp periodic "Heap status: F/T bytes free (D), running ..."
hs = _HEAP_STATUS_RE.search(msg)
if hs:
try:
out["heap_free"] = int(hs.group("free"))
out["heap_total"] = int(hs.group("total"))
if hs.group("delta") is not None:
out["heap_delta"] = int(hs.group("delta"))
except (TypeError, ValueError):
pass
return out
# -- Telemetry ----------------------------------------------------------
# Order matters: meshtastic-python decoded packets use the protobuf
# `oneof variant` field name (snake_case) as the dict key.
_TELEMETRY_VARIANTS = (
("device_metrics", "device"),
("local_stats", "local"),
("environment_metrics", "environment"),
("power_metrics", "power"),
("air_quality_metrics", "airQuality"),
("health_metrics", "health"),
("host_metrics", "host"),
)
def extract_telemetry(packet: dict[str, Any]) -> dict[str, Any] | None:
"""Pull the telemetry variant + flat fields out of a `meshtastic.receive.telemetry`
packet. Returns None when the shape isn't what we expect — so the
caller can fall back to a generic packets.jsonl row.
"""
if not isinstance(packet, dict):
return None
decoded = packet.get("decoded")
if not isinstance(decoded, dict):
return None
telem = decoded.get("telemetry")
if not isinstance(telem, dict):
return None
# The Python lib produces dict-of-camelCase keys via MessageToDict.
# Try both camelCase and snake_case to be robust to lib version drift.
for snake, label in _TELEMETRY_VARIANTS:
camel = _snake_to_camel(snake)
for key in (snake, camel):
value = telem.get(key)
if isinstance(value, dict):
return {
"variant": label,
"fields": {k: _scalarize(v) for k, v in value.items()},
"time": telem.get("time"),
}
return None
def _snake_to_camel(name: str) -> str:
parts = name.split("_")
return parts[0] + "".join(p.title() for p in parts[1:])
def _scalarize(value: Any) -> Any:
"""Keep telemetry fields JSON-friendly. Lists/dicts pass through
untouched; bytes -> hex string; protobuf enums occasionally arrive
as ints (fine) or strings (also fine)."""
if isinstance(value, (bytes, bytearray, memoryview)):
return bytes(value).hex()
return value
# -- Generic packet summary ---------------------------------------------
def summarize_packet(
packet: dict[str, Any], *, payload_hex_len: int = 64
) -> dict[str, Any]:
"""Reduce a packet dict to a stable, queryable summary. Drops the
full payload bytes — the recorder records summaries, not pcaps.
"""
if not isinstance(packet, dict):
return {"raw_type": type(packet).__name__}
decoded = packet.get("decoded") if isinstance(packet.get("decoded"), dict) else {}
portnum = decoded.get("portnum") if isinstance(decoded, dict) else None
payload = decoded.get("payload") if isinstance(decoded, dict) else None
payload_hex = None
payload_size = None
if isinstance(payload, (bytes, bytearray, memoryview)):
b = bytes(payload)
payload_size = len(b)
payload_hex = b[:payload_hex_len].hex() if b else ""
elif isinstance(payload, str):
# Some decoded payloads (text messages) come as decoded strings.
payload_size = len(payload)
payload_hex = None # not bytes
return {
"from_node": packet.get("fromId") or packet.get("from"),
"to_node": packet.get("toId") or packet.get("to"),
"portnum": portnum,
"hop_limit": packet.get("hopLimit"),
"want_ack": packet.get("wantAck"),
"rx_rssi": packet.get("rxRssi"),
"rx_snr": packet.get("rxSnr"),
"channel": packet.get("channel"),
"id": packet.get("id"),
"payload_size": payload_size,
"payload_hex_prefix": payload_hex,
}
# -- Interface identification ------------------------------------------
def interface_label(interface: Any) -> dict[str, Any]:
"""Stable identifier for the meshtastic interface that emitted an event.
Used as the `port`/`role` tag on every recorded row. SerialInterface
has `devPath`; TCPInterface has `hostname`+`portNumber`; BLEInterface
has `address`. Falls back to the class name when none of those exist.
"""
if interface is None:
return {"port": None, "role": None}
dev_path = getattr(interface, "devPath", None)
if dev_path:
return {"port": str(dev_path), "role": "serial"}
hostname = getattr(interface, "hostname", None)
if hostname:
port_num = getattr(interface, "portNumber", None)
endpoint = f"tcp://{hostname}:{port_num}" if port_num else f"tcp://{hostname}"
return {"port": endpoint, "role": "tcp"}
address = getattr(interface, "address", None)
if address:
return {"port": str(address), "role": "ble"}
return {"port": type(interface).__name__, "role": None}

View File

@@ -0,0 +1,467 @@
"""Process-global recorder singleton.
Subscribes once to the meshtastic pubsub fan-out and writes four append-only
JSONL streams under `mcp-server/.mtlog/`. The pubsub fan-out is
process-global — a single subscription captures every active interface
without per-connection bookkeeping.
Files:
logs.jsonl — every `meshtastic.log.line` event (best-effort prefix
parsed for level/tag/uptime; raw `line` always preserved)
telemetry.jsonl — `meshtastic.receive.telemetry` packets, flattened by
variant (device / local / environment / power / etc.)
packets.jsonl — every other `meshtastic.receive.*` packet, summarized
(portnum, hops, RSSI/SNR, payload size + 64-byte hex)
events.jsonl — connection lifecycle, node-DB updates, and manual
`mark_event` rows. Lower volume; useful for aligning
timelines.
Pause/resume: `pause()` flips a flag; subscriptions stay registered. The
write methods short-circuit when paused, so we don't lose ordering when
resumed (we just have a gap). No queueing.
"""
from __future__ import annotations
import logging
import os
import threading
import time
from pathlib import Path
from typing import Any
from . import parsers
from .rotating import _RotatingJsonl
_DEFAULT_DIR = Path(__file__).resolve().parents[3] / ".mtlog"
log = logging.getLogger(__name__)
class Recorder:
"""Singleton write-side of the persistent log capture system."""
def __init__(self, base_dir: Path | None = None) -> None:
self.base_dir = Path(base_dir) if base_dir else _DEFAULT_DIR
self._lock = threading.RLock()
self._started = False
self._paused = False
self._pause_reason: str | None = None
self._started_at: float | None = None
self._handlers: list[tuple[str, Any]] = []
self._files: dict[str, _RotatingJsonl] = {}
# -- lifecycle ----------------------------------------------------
def start(self) -> None:
"""Idempotent. Safe to call from FastMCP app startup."""
with self._lock:
if self._started:
return
self.base_dir.mkdir(parents=True, exist_ok=True)
self._files = {
"logs": _RotatingJsonl(self.base_dir / "logs.jsonl"),
"telemetry": _RotatingJsonl(self.base_dir / "telemetry.jsonl"),
"packets": _RotatingJsonl(self.base_dir / "packets.jsonl"),
"events": _RotatingJsonl(self.base_dir / "events.jsonl"),
}
self._wire_pubsub()
self._started = True
self._started_at = time.time()
# Write the recorder_start marker after the initialization block.
# `_write_event()` re-checks recorder state via `_files_snapshot()`,
# so keeping this out of the setup block avoids nested lifecycle work.
self._write_event(kind="recorder_start", label="recorder_started")
def stop(self) -> None:
with self._lock:
if not self._started:
return
self._unwire_pubsub()
for f in self._files.values():
f.close()
self._files = {}
self._started = False
def pause(self, reason: str | None = None) -> None:
# Write the pause marker BEFORE flipping the flag — `_write_event`
# short-circuits when paused, so the order matters for this event
# to actually land in events.jsonl.
self._write_event(
kind="recorder_pause",
label="paused",
note=reason,
)
with self._lock:
self._paused = True
self._pause_reason = reason
def resume(self) -> None:
# Mirror of `pause()`: clear the flag first, then write the marker
# so it isn't suppressed by the still-paused short-circuit.
with self._lock:
self._paused = False
self._pause_reason = None
self._write_event(kind="recorder_resume", label="resumed")
# -- pubsub wiring ------------------------------------------------
def _wire_pubsub(self) -> None:
from pubsub import pub # type: ignore[import-untyped]
# Subscribers — one per topic. Each pubsub publisher sends
# keyword args matching its handler's signature; pubsub
# introspects the function signature to route args.
bindings = [
("meshtastic.log.line", self._on_log_line),
("meshtastic.serial.line", self._on_serial_line),
("meshtastic.receive", self._on_receive),
("meshtastic.receive.telemetry", self._on_telemetry),
("meshtastic.connection.established", self._on_connection_established),
("meshtastic.connection.lost", self._on_connection_lost),
("meshtastic.node.updated", self._on_node_updated),
]
for topic, handler in bindings:
try:
pub.subscribe(handler, topic)
self._handlers.append((topic, handler))
except Exception as exc:
# If pubsub refuses one binding (signature mismatch on
# an old lib version), log it and keep the rest.
log.warning("Recorder failed to subscribe to %s: %s", topic, exc)
def _unwire_pubsub(self) -> None:
from pubsub import pub # type: ignore[import-untyped]
for topic, handler in self._handlers:
try:
pub.unsubscribe(handler, topic)
except Exception:
pass
self._handlers.clear()
# -- handlers -----------------------------------------------------
#
# Pubsub callbacks must never raise. Every handler is wrapped in a
# try/except that swallows so a bug here can't take down the
# SerialInterface receive thread.
#
# Threading: handlers fire on whatever thread the meshtastic library
# dispatches from (varies by interface), while `stop()` clears
# `self._files` under `self._lock`. We snapshot `_files` under the
# lock at the top of each handler so a concurrent stop can't
# KeyError us mid-write. The actual file write goes through
# `_RotatingJsonl` which has its own lock.
def _files_snapshot(self) -> dict[str, _RotatingJsonl] | None:
"""Atomic-ish view of `self._files`. Returns None when the recorder
is paused or stopped, so handlers can early-exit cleanly without
racing `stop()`'s clear."""
with self._lock:
if not self._started or self._paused:
return None
return dict(self._files)
def _on_log_line(self, line: str, interface: Any = None) -> None:
files = self._files_snapshot()
if files is None:
return
try:
tags = parsers.interface_label(interface)
parsed = parsers.parse_log_line(str(line))
ts = time.time()
record: dict[str, Any] = {
"ts": ts,
"port": tags["port"],
"role": tags["role"],
"level": parsed.get("level"),
"tag": parsed.get("tag"),
"uptime_s": parsed.get("uptime_s"),
"line": parsed["line"],
}
# DEBUG_HEAP enrichments (only present when the firmware
# was built with -DDEBUG_HEAP=1). Surface as first-class
# fields so logs_window can grep/filter on them and so
# heap_free synthesizes a telemetry point below.
if "heap_free" in parsed:
record["heap_free"] = parsed["heap_free"]
if "heap_total" in parsed:
record["heap_total"] = parsed["heap_total"]
if "heap_delta" in parsed:
record["heap_delta"] = parsed["heap_delta"]
heap_event = parsed.get("heap_event")
if heap_event:
record["heap_event"] = heap_event
files["logs"].write(record)
# If the line carried a heap snapshot, also write it as a
# synthesized LocalStats-shaped row so telemetry_timeline
# picks it up at log cadence (much higher resolution than
# the ~60 s LocalStats packet). Tagged source=debug_heap so
# consumers can filter if mixing scales is unwanted.
heap_free = parsed.get("heap_free")
if isinstance(heap_free, int):
fields: dict[str, Any] = {"heap_free_bytes": heap_free}
heap_total = parsed.get("heap_total")
if isinstance(heap_total, int):
fields["heap_total_bytes"] = heap_total
files["telemetry"].write(
{
"ts": ts,
"port": tags["port"],
"role": tags["role"],
"from_node": None,
"variant": "local",
"fields": fields,
"source": "debug_heap",
}
)
except Exception:
pass
def _on_serial_line(self, line: str, port: str | None = None) -> None:
"""Text-mode passive tap. Fired from `serial_session._drain` when a
`pio device monitor` subprocess is running.
Same parse + heap-synthesis path as `_on_log_line`, but receives
the raw text-formatted line (full level/clock/uptime/thread/`[heap N]`/
body). On DEBUG_HEAP builds in text mode this gives us per-log-line
heap data — far higher cadence than LocalStats, and works without
protobuf API mode (no SerialInterface required).
"""
files = self._files_snapshot()
if files is None:
return
try:
parsed = parsers.parse_log_line(str(line))
ts = time.time()
record: dict[str, Any] = {
"ts": ts,
"port": port,
"role": "serial_session",
"level": parsed.get("level"),
"tag": parsed.get("tag"),
"uptime_s": parsed.get("uptime_s"),
"line": parsed["line"],
}
if "heap_free" in parsed:
record["heap_free"] = parsed["heap_free"]
if "heap_total" in parsed:
record["heap_total"] = parsed["heap_total"]
if "heap_delta" in parsed:
record["heap_delta"] = parsed["heap_delta"]
heap_event = parsed.get("heap_event")
if heap_event:
record["heap_event"] = heap_event
files["logs"].write(record)
# Synthesize a heap_free telemetry sample whenever the line
# carries one — same logic as _on_log_line, tagged source so
# consumers can distinguish text-mode tap from protobuf path.
heap_free = parsed.get("heap_free")
if isinstance(heap_free, int):
fields: dict[str, Any] = {"heap_free_bytes": heap_free}
heap_total = parsed.get("heap_total")
if isinstance(heap_total, int):
fields["heap_total_bytes"] = heap_total
files["telemetry"].write(
{
"ts": ts,
"port": port,
"role": "serial_session",
"from_node": None,
"variant": "local",
"fields": fields,
"source": "debug_heap_serial",
}
)
except Exception:
pass
def _on_telemetry(self, packet: dict[str, Any], interface: Any = None) -> None:
files = self._files_snapshot()
if files is None:
return
try:
tags = parsers.interface_label(interface)
extracted = parsers.extract_telemetry(packet)
if extracted is None:
# Couldn't extract a known variant — fall through to the
# generic `_on_receive` path, which will still fire for
# this packet via the parent topic.
return
record = {
"ts": time.time(),
"port": tags["port"],
"role": tags["role"],
"from_node": packet.get("fromId") or packet.get("from"),
"variant": extracted["variant"],
"fields": extracted["fields"],
"device_time": extracted.get("time"),
}
files["telemetry"].write(record)
except Exception:
pass
def _on_receive(self, packet: dict[str, Any], interface: Any = None) -> None:
# Generic-receive fires for EVERY packet. Telemetry packets get
# recorded twice (here and in _on_telemetry) — that's intentional:
# packets.jsonl is the universal record, telemetry.jsonl is the
# structured timeseries view.
files = self._files_snapshot()
if files is None:
return
try:
tags = parsers.interface_label(interface)
summary = parsers.summarize_packet(packet)
record = {
"ts": time.time(),
"port": tags["port"],
"role": tags["role"],
**summary,
}
files["packets"].write(record)
except Exception:
pass
def _on_connection_established(self, interface: Any = None) -> None:
self._write_event(
kind="connection_established",
interface=interface,
)
def _on_connection_lost(self, interface: Any = None) -> None:
self._write_event(
kind="connection_lost",
interface=interface,
)
def _on_node_updated(
self, node: dict[str, Any] | None = None, interface: Any = None
) -> None:
# Lower-volume than packets but informative — node ID, hops away,
# last heard. Skip the user dict if absent.
try:
user = (node or {}).get("user") if isinstance(node, dict) else None
self._write_event(
kind="node_updated",
interface=interface,
data={
"num": (node or {}).get("num"),
"id": (user or {}).get("id"),
"short": (user or {}).get("shortName"),
"long": (user or {}).get("longName"),
"hops_away": (node or {}).get("hopsAway"),
"snr": (node or {}).get("snr"),
"last_heard": (node or {}).get("lastHeard"),
},
)
except Exception:
pass
# -- public write helpers -----------------------------------------
def mark_event(
self,
label: str,
note: str | None = None,
data: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""User-facing marker. Writes to events.jsonl AND emits a
synthetic logs.jsonl row tagged level=MARK so timelines align.
"""
ts = self._write_event(kind="mark", label=label, note=note, data=data)
# Mirror into logs so a single logs_window grep finds it.
files = self._files_snapshot()
if files is not None:
try:
files["logs"].write(
{
"ts": ts,
"port": None,
"role": "marker",
"level": "MARK",
"tag": "mark_event",
"line": f"[mark] {label}" + (f"{note}" if note else ""),
}
)
except Exception:
pass
return {"ts": ts, "label": label}
def _write_event(
self,
*,
kind: str,
label: str | None = None,
note: str | None = None,
interface: Any = None,
data: dict[str, Any] | None = None,
) -> float:
ts = time.time()
# Lifecycle markers (recorder_start, recorder_pause, recorder_resume)
# arrive at choreographed moments — `pause()` writes BEFORE flipping
# the flag and `resume()` writes AFTER clearing it, so those calls
# see _paused=False here. Other event kinds short-circuit when
# paused via the snapshot guard below.
files = self._files_snapshot()
if files is None:
return ts
try:
tags = parsers.interface_label(interface)
files["events"].write(
{
"ts": ts,
"kind": kind,
"label": label,
"note": note,
"port": tags["port"],
"role": tags["role"],
"data": data,
}
)
except Exception:
pass
return ts
# -- introspection ------------------------------------------------
def status(self) -> dict[str, Any]:
with self._lock:
return {
"running": self._started,
"paused": self._paused,
"pause_reason": self._pause_reason,
"started_at": self._started_at,
"base_dir": str(self.base_dir),
"files": {name: f.status() for name, f in self._files.items()},
}
def force_rotate_all(self) -> dict[str, Any]:
"""Test/admin hook: rotate every stream right now."""
with self._lock:
files = list(self._files.values())
for f in files:
f.force_rotate()
# `status()` re-acquires `self._lock`; release before calling it.
return self.status()
# -- module-level singleton accessor ------------------------------------
_INSTANCE_LOCK = threading.Lock()
_INSTANCE: Recorder | None = None
def get_recorder() -> Recorder:
"""Return the process-global Recorder. Created on first call.
Honors `MESHTASTIC_MCP_LOG_DIR` env var for the base directory
(used by tests to redirect to a tmpdir).
"""
global _INSTANCE
with _INSTANCE_LOCK:
if _INSTANCE is None:
override = os.environ.get("MESHTASTIC_MCP_LOG_DIR")
base = Path(override) if override else None
_INSTANCE = Recorder(base_dir=base)
return _INSTANCE

View File

@@ -0,0 +1,163 @@
"""Append-only JSONL writer with size-capped rotation.
A `_RotatingJsonl` owns one live `.jsonl` file. Writes are line-delimited
JSON objects (one row per call). When the live file exceeds `max_bytes`,
it is closed, gzipped to `<name>.YYYYMMDD-HHMMSS-uuuuuu-NNNNN.jsonl.gz`,
and the live file resets to empty. Old archives past `keep_archives` are
unlinked oldest-first.
Size check is amortized — `os.fstat` runs every `check_every` writes,
not per-write, so the hot path stays at one `fh.write` + one `fh.flush`.
Threading: every public method acquires `self._lock`. The recorder runs
several pubsub handlers on whatever thread the meshtastic library
dispatches from (varies by interface), and queries from MCP tool calls
arrive on the FastMCP request thread, so this lock is not optional.
"""
from __future__ import annotations
import gzip
import json
import os
import shutil
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
class _RotatingJsonl:
"""Append-only JSONL with size rotation. Thread-safe."""
def __init__(
self,
path: Path,
*,
max_bytes: int = 100 * 1024 * 1024,
keep_archives: int = 5,
check_every: int = 1000,
) -> None:
self.path = path
self.max_bytes = max_bytes
self.keep_archives = keep_archives
self.check_every = check_every
self._lock = threading.Lock()
self._fh: Any = None
self._writes_since_check = 0
self._rotations = 0
self._lines_written = 0
self._last_ts: float | None = None
self._open()
# -- lifecycle ----------------------------------------------------
def _open(self) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
self._fh = self.path.open("a", encoding="utf-8")
def close(self) -> None:
with self._lock:
if self._fh is not None:
try:
self._fh.close()
finally:
self._fh = None
# -- write --------------------------------------------------------
def write(self, record: dict[str, Any]) -> None:
"""Append one JSON object as a line. Triggers rotation if oversized."""
line = json.dumps(record, separators=(",", ":"), default=str) + "\n"
with self._lock:
if self._fh is None:
return
try:
self._fh.write(line)
self._fh.flush()
except Exception:
# Best-effort: a failed write must not crash the pubsub
# handler. Caller has no way to react anyway.
return
self._lines_written += 1
ts = record.get("ts")
if isinstance(ts, (int, float)):
self._last_ts = float(ts)
self._writes_since_check += 1
if self._writes_since_check >= self.check_every:
self._writes_since_check = 0
self._maybe_rotate()
# -- rotation -----------------------------------------------------
def _maybe_rotate(self) -> None:
# Caller holds self._lock.
try:
size = os.fstat(self._fh.fileno()).st_size
except OSError:
return
if size < self.max_bytes:
return
self._rotate_locked()
def _rotate_locked(self) -> None:
# Close, gzip-rename, reopen empty, prune oldest archives.
try:
self._fh.close()
except Exception:
pass
self._fh = None
# Microsecond-resolution timestamp + per-instance counter so back-
# to-back rotations (small max_bytes, repeated `force_rotate()`,
# or chatty test loops) get unique archive filenames. The lex
# sort order of `YYYYMMDD-HHMMSS-uuuuuu-NNNNN` is chronological,
# which `_prune_archives()` and `log_query._iter_jsonl()` both
# rely on.
stamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S-%f")
archive = self.path.with_suffix(f".{stamp}-{self._rotations:05d}.jsonl.gz")
try:
with self.path.open("rb") as src, gzip.open(archive, "wb") as dst:
shutil.copyfileobj(src, dst, length=1024 * 1024)
self.path.unlink()
except Exception:
# Rotation is best-effort. If gzip fails, leave the file
# in place and re-open it; we'll try again next check.
pass
self._open()
self._rotations += 1
self._prune_archives()
def _prune_archives(self) -> None:
# Match siblings of self.path.name with `.jsonl.gz` suffix.
prefix = self.path.stem # "logs" for "logs.jsonl"
# Archive filenames are already lexicographically chronological.
# Prune by name, not mtime, so copied/restored files don't reorder.
archives = sorted(self.path.parent.glob(f"{prefix}.*.jsonl.gz"))
excess = len(archives) - self.keep_archives
for old in archives[: max(0, excess)]:
try:
old.unlink()
except OSError:
pass
def force_rotate(self) -> None:
"""Test/admin hook: rotate immediately regardless of size."""
with self._lock:
if self._fh is not None:
self._rotate_locked()
# -- introspection ------------------------------------------------
def status(self) -> dict[str, Any]:
with self._lock:
try:
size = os.fstat(self._fh.fileno()).st_size if self._fh else 0
except OSError:
size = 0
return {
"path": str(self.path),
"size": size,
"lines": self._lines_written,
"last_ts": self._last_ts,
"rotations": self._rotations,
}

View File

@@ -46,7 +46,23 @@ class SerialSession:
def _drain(session: SerialSession) -> None:
"""Reader thread: line-by-line pull stdout into buffer."""
"""Reader thread: line-by-line pull stdout into buffer.
Each line is also published to the `meshtastic.serial.line` pubsub
topic so the persistent recorder can capture it without holding its
own port. This is the text-mode tap path: when no SerialInterface is
open, the firmware emits full formatted lines (level + clock + uptime
+ thread + `[heap N]` prefix on DEBUG_HEAP builds + body), and we
fan them out to whoever is listening. Pubsub is best-effort —
publish failures must never block the reader.
"""
# Lazy import: pubsub isn't required just to import this module
# (e.g., during static analysis), and we want a clean test surface.
try:
from pubsub import pub # type: ignore[import-untyped]
except Exception: # pragma: no cover - defensive
pub = None
assert session.proc.stdout is not None
try:
for line in session.proc.stdout:
@@ -54,6 +70,16 @@ def _drain(session: SerialSession) -> None:
with session.lock:
session.buffer.append(line_stripped)
session.total_lines += 1
if pub is not None:
try:
pub.sendMessage(
"meshtastic.serial.line",
line=line_stripped,
port=session.port,
)
except Exception:
# A subscriber raising must not break the reader.
pass
except Exception: # pragma: no cover - defensive
pass
finally:

View File

@@ -6,6 +6,7 @@ etc.). Business logic does not live here.
from __future__ import annotations
import logging
from typing import Any
from mcp.server.fastmcp import FastMCP
@@ -17,14 +18,34 @@ from . import (
flash,
hw_tools,
info,
log_query,
registry,
serial_session,
)
from . import userprefs as userprefs_mod
from .recorder import get_recorder
log = logging.getLogger(__name__)
app = FastMCP("meshtastic-mcp")
def _start_recorder() -> None:
# Persistent device-log capture. Starts on first import — pubsub fan-out
# is process-global, so subscribing here captures every active interface
# (whether opened by an MCP tool, a pytest fixture, or a serial_session).
# Files land in mcp-server/.mtlog/ (gitignored). See recorder/recorder.py
# for the full design. Recorder startup is best-effort: an unwritable
# log dir or pubsub mismatch should not take the MCP server down.
try:
get_recorder().start()
except Exception as exc:
log.warning("Failed to start persistent recorder: %s", exc)
_start_recorder()
# ---------- Discovery & metadata ------------------------------------------
@@ -75,6 +96,7 @@ def build(
env: str,
with_manifest: bool = True,
userprefs: dict[str, Any] | None = None,
build_flags: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Build firmware for one env via `pio run -e <env>`.
@@ -86,8 +108,21 @@ def build(
build via userPrefs.jsonc injection. The file is restored after the build
completes. Use `userprefs_manifest` to discover available keys. Use
`userprefs_set` for persistent changes.
`build_flags` (optional): dict of `-D<NAME>=<VALUE>` macros for this build
only, injected via `PLATFORMIO_BUILD_FLAGS`. Common pattern:
`build_flags={"DEBUG_HEAP": 1}` enables per-thread leak detection + a
`[heap N]` prefix on every log line. The recorder picks the prefix up
automatically and synthesizes a high-resolution heap timeline that
`telemetry_timeline(field="free_heap")` can read alongside the normal
~60 s LocalStats packets. Pair with `/leakhunt` for classification.
"""
return flash.build(env, with_manifest=with_manifest, userprefs_overrides=userprefs)
return flash.build(
env,
with_manifest=with_manifest,
userprefs_overrides=userprefs,
build_flags=build_flags,
)
@app.tool()
@@ -105,6 +140,7 @@ def pio_flash(
port: str,
confirm: bool = False,
userprefs: dict[str, Any] | None = None,
build_flags: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Flash firmware via `pio run -e <env> -t upload --upload-port <port>`.
@@ -114,8 +150,19 @@ def pio_flash(
`userprefs` (optional): dict of `USERPREFS_<KEY>: value` baked into this
build via userPrefs.jsonc injection; restored after upload.
`build_flags` (optional): dict of `-D<NAME>=<VALUE>` macros for the
rebuild-before-upload, e.g. `{"DEBUG_HEAP": 1}`. Required for the flags
to actually land in the uploaded firmware — without it, the implicit
rebuild relinks without the env var and silently drops them.
"""
return flash.flash(env, port, confirm=confirm, userprefs_overrides=userprefs)
return flash.flash(
env,
port,
confirm=confirm,
userprefs_overrides=userprefs,
build_flags=build_flags,
)
@app.tool()
@@ -734,3 +781,185 @@ def picotool_load(uf2_path: str, confirm: bool = False) -> dict[str, Any]:
def picotool_raw(args: list[str], confirm: bool = False) -> dict[str, Any]:
"""Pass-through to `picotool`. load/reboot/save/erase require confirm=True."""
return hw_tools.picotool_raw(args, confirm=confirm)
# ---------- Persistent device-log capture (recorder) ----------------------
#
# The recorder is autouse — it starts at server import and continuously
# writes every meshtastic pubsub event to JSONL files under .mtlog/. These
# tools are query-only over those files, plus a few lifecycle controls.
@app.tool()
def logs_window(
start: str = "-15m",
end: str = "now",
grep: str | None = None,
level: str | None = None,
tag: str | None = None,
port: str | None = None,
max_lines: int = 200,
) -> dict[str, Any]:
"""Recent firmware log lines from the persistent recorder.
Filters by time window, regex over the line, level (single or
pipe-separated set like "WARN|ERROR|CRIT"), thread-name tag, and
interface port. Returns up to max_lines most-recent matches.
Time strings: "-15m", "-2h", "-3d", "now", or ISO 8601.
Note: lines arriving via the LogRecord protobuf path (when
set_debug_log_api(True) is on) come without level prefix — the
meshtastic Python lib drops record.level before fan-out. For those,
`level` filter won't match; use `grep` instead.
"""
return log_query.logs_window(
start=start,
end=end,
grep=grep,
level=level,
tag=tag,
port=port,
max_lines=max_lines,
)
@app.tool()
def telemetry_timeline(
window: str = "1h",
variant: str = "local",
field: str = "free_heap",
port: str | None = None,
max_points: int = 200,
) -> dict[str, Any]:
"""Time series of one telemetry field, downsampled to <= max_points.
`variant` ∈ device, local, environment, power, airQuality, health, host.
`field` accepts snake_case or camelCase; common aliases (free_heap ↔
heap_free_bytes) are normalized.
Returns slope_per_min (linear-regression slope, units/minute) so a
leak detector can read one number — negative slope on free_heap over
a long window indicates a real leak.
LocalStats variant ("local") cadence is ~60 s (whatever the device's
`device_update_interval` is set to), so a 1 h window gives ~60 raw
points. Bucket-mean downsampling preserves shape.
"""
return log_query.telemetry_timeline(
window=window,
variant=variant,
field=field,
port=port,
max_points=max_points,
)
@app.tool()
def packets_window(
start: str = "-5m",
end: str = "now",
portnum: str | None = None,
from_node: str | None = None,
to_node: str | None = None,
max: int = 200,
) -> dict[str, Any]:
"""Recent mesh packets recorded by the recorder.
Each row is a summary (portnum, from/to, hop_limit, RSSI/SNR, payload
size + first 64 bytes hex) — full payload bytes are not stored.
`portnum` accepts a pipe-separated set like "TEXT_MESSAGE_APP|POSITION_APP".
"""
return log_query.packets_window(
start=start,
end=end,
portnum=portnum,
from_node=from_node,
to_node=to_node,
max=max,
)
@app.tool()
def events_window(
start: str = "-1h",
end: str = "now",
kind: str | None = None,
max: int = 200,
) -> dict[str, Any]:
"""Return recorder events: connection lifecycle, node updates, and `mark_event` markers.
`kind` ∈ recorder_start, recorder_pause, recorder_resume,
connection_established, connection_lost, node_updated, mark.
Pipe-separated sets ("connection_lost|connection_established") work.
"""
return log_query.events_window(start=start, end=end, kind=kind, max=max)
@app.tool()
def mark_event(
label: str,
note: str | None = None,
data: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Drop a named marker into events.jsonl AND logs.jsonl.
Useful for aligning a timeline around a known stimulus: call before
and after a stress workload, then query telemetry_timeline /
logs_window with the markers' timestamps as bounds.
The marker also lands in logs.jsonl with level=MARK so a single
grep over logs picks it up.
"""
return get_recorder().mark_event(label=label, note=note, data=data)
@app.tool()
def recorder_status() -> dict[str, Any]:
"""Return recorder runtime info: running, paused, file sizes, last_ts per stream.
Use this to sanity-check that capture is working before you trust a
`logs_window` / `telemetry_timeline` result.
"""
return get_recorder().status()
@app.tool()
def recorder_pause(reason: str | None = None) -> dict[str, Any]:
"""Pause writes to all four streams. Pubsub subscriptions stay active —
we just drop events on the floor while paused. Resume with `recorder_resume`.
Use when capturing a known-good baseline that you don't want to
pollute with pre-test noise. Default state is recording; this is
rarely needed.
"""
get_recorder().pause(reason=reason)
return {"ok": True, "paused": True, "reason": reason}
@app.tool()
def recorder_resume() -> dict[str, Any]:
"""Resume writes after `recorder_pause`. No-op if already running."""
get_recorder().resume()
return {"ok": True, "paused": False}
@app.tool()
def recorder_export(
start: str,
end: str,
dest_dir: str,
streams: list[str] | None = None,
) -> dict[str, Any]:
"""Bundle a slice of the recorder's streams into `dest_dir`.
Writes one uncompressed JSONL per requested stream (logs / telemetry /
packets / events). Useful for: attaching to a bug report, feeding a
notebook, or backfilling Datadog after the fact.
"""
return log_query.export(
start=start,
end=end,
dest_dir=dest_dir,
streams=streams,
)

View File

@@ -0,0 +1,88 @@
"""Unit tests for the `build_flags` injection on `flash.build()`.
We don't actually run pio here — too slow, requires hardware-aware envs.
We test the translation layer (`_build_flags_env`) and that the env vars
are threaded through pio.run correctly via mock.
"""
from __future__ import annotations
from unittest.mock import patch
from meshtastic_mcp import flash, pio
class TestBuildFlagsEnv:
def test_simple_value(self) -> None:
out = flash._build_flags_env({"DEBUG_HEAP": 1})
assert out == {"PLATFORMIO_BUILD_FLAGS": "-DDEBUG_HEAP=1"}
def test_string_value(self) -> None:
out = flash._build_flags_env({"FOO": "bar"})
assert out == {"PLATFORMIO_BUILD_FLAGS": "-DFOO=bar"}
def test_bool_true_is_bare_flag(self) -> None:
out = flash._build_flags_env({"DEBUG_HEAP": True})
assert out == {"PLATFORMIO_BUILD_FLAGS": "-DDEBUG_HEAP"}
def test_bool_false_dropped(self) -> None:
out = flash._build_flags_env({"DEBUG_HEAP": False, "OTHER": 1})
assert out == {"PLATFORMIO_BUILD_FLAGS": "-DOTHER=1"}
def test_none_dropped(self) -> None:
out = flash._build_flags_env({"DEBUG_HEAP": None})
assert out == {}
def test_multiple_combined(self) -> None:
out = flash._build_flags_env({"DEBUG_HEAP": 1, "FOO": "x", "BAR": True})
# Order isn't guaranteed in dict iteration, so check membership.
flags = out["PLATFORMIO_BUILD_FLAGS"].split()
assert set(flags) == {"-DDEBUG_HEAP=1", "-DFOO=x", "-DBAR"}
class TestBuildPropagatesFlags:
def test_extra_env_passed_to_pio_run(self) -> None:
# Mock pio.run so we don't actually invoke pio. Capture extra_env.
captured = {}
class _StubResult:
returncode = 0
stdout = ""
stderr = ""
duration_s = 0.1
def _stub(args, **kwargs):
captured["args"] = args
captured["kwargs"] = kwargs
return _StubResult()
with patch.object(pio, "run", side_effect=_stub):
with patch.object(flash, "_artifacts_for", return_value=[]):
out = flash.build(
"fake-env",
with_manifest=False,
build_flags={"DEBUG_HEAP": 1},
)
assert captured["args"] == ["run", "-e", "fake-env"]
assert captured["kwargs"]["extra_env"] == {
"PLATFORMIO_BUILD_FLAGS": "-DDEBUG_HEAP=1"
}
assert out["build_flags"] == {"DEBUG_HEAP": 1}
def test_no_flags_means_no_extra_env(self) -> None:
captured = {}
class _StubResult:
returncode = 0
stdout = ""
stderr = ""
duration_s = 0.1
def _stub(args, **kwargs):
captured["kwargs"] = kwargs
return _StubResult()
with patch.object(pio, "run", side_effect=_stub):
with patch.object(flash, "_artifacts_for", return_value=[]):
flash.build("fake-env", with_manifest=False)
assert captured["kwargs"]["extra_env"] is None

View File

@@ -0,0 +1,548 @@
"""Unit tests for the persistent device-log recorder.
Hardware-free: drives the Recorder through its `_on_*` handlers with
synthetic packet/line dicts, then queries via log_query. Validates
prefix parsing, telemetry variant dispatch, marker round-trip, time
window filtering, downsampling, slope estimation, and gzip rotation
+ archive pruning.
"""
from __future__ import annotations
import gzip
import json
import logging
import os
import time
from pathlib import Path
import pubsub
import pytest
from meshtastic_mcp import log_query
from meshtastic_mcp.recorder.parsers import (
extract_telemetry,
interface_label,
parse_log_line,
summarize_packet,
)
from meshtastic_mcp.recorder.recorder import Recorder
from meshtastic_mcp.recorder.rotating import _RotatingJsonl
# -- isolation: every test gets a fresh Recorder + tmp dir -----------
@pytest.fixture
def recorder(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Recorder:
# Redirect both the Recorder and the module-level singleton lookup
# to the same tmp dir so log_query queries the same files we write.
monkeypatch.setenv("MESHTASTIC_MCP_LOG_DIR", str(tmp_path))
monkeypatch.setattr(
"meshtastic_mcp.recorder.recorder._INSTANCE", None, raising=False
)
r = Recorder(base_dir=tmp_path)
r.start()
monkeypatch.setattr("meshtastic_mcp.recorder.recorder._INSTANCE", r, raising=False)
yield r
r.stop()
class _FakeIface:
devPath = "/dev/cu.fake"
# -- parsers ---------------------------------------------------------
class TestParseLogLine:
def test_full_prefix(self) -> None:
out = parse_log_line("INFO | 12:34:56 12345 [Main] Booting")
assert out["level"] == "INFO"
assert out["tag"] == "Main"
assert out["uptime_s"] == 12345
assert out["msg"] == "Booting"
assert out["clock"] == "12:34:56"
def test_invalid_clock(self) -> None:
out = parse_log_line("WARN | ??:??:?? 7 [SerialConsole] Boot")
assert out["level"] == "WARN"
assert out["clock"] == "??:??:??"
assert out["uptime_s"] == 7
def test_no_thread_bracket(self) -> None:
out = parse_log_line("DEBUG | 00:00:00 0 raw message body")
assert out["level"] == "DEBUG"
assert out.get("tag") is None
assert out["msg"] == "raw message body"
def test_bare_message(self) -> None:
# LogRecord.message path — no level prefix at all.
out = parse_log_line("just a bare message")
assert "level" not in out or out.get("level") is None
assert out["line"] == "just a bare message"
def test_empty(self) -> None:
assert parse_log_line("") == {"line": ""}
def test_debug_heap_prefix_extracted(self) -> None:
out = parse_log_line("INFO | 12:34:56 12345 [Main] [heap 92344] Booting")
assert out["level"] == "INFO"
assert out["tag"] == "Main"
assert out["heap_free"] == 92344
assert out["msg"] == "Booting"
def test_debug_heap_prefix_on_bare_line(self) -> None:
# LogRecord.message path: no level prefix but still has [heap N].
out = parse_log_line("[heap 12345] some message")
assert out["heap_free"] == 12345
assert out["msg"] == "some message"
def test_thread_leak_event(self) -> None:
out = parse_log_line(
"HEAP | 00:00:01 100 [Power] [heap 90000] "
"------ Thread MeshPacket leaked heap 92344 -> 90000 (-2344) ------"
)
assert out["level"] == "HEAP"
assert out["heap_free"] == 90000
ev = out["heap_event"]
assert ev["kind"] == "leaked"
assert ev["thread"] == "MeshPacket"
assert ev["before"] == 92344
assert ev["after"] == 90000
assert ev["delta"] == -2344
def test_thread_freed_event(self) -> None:
out = parse_log_line(
"++++++ Thread Router freed heap 1000 -> 1500 (500) ++++++"
)
ev = out["heap_event"]
assert ev["kind"] == "freed"
assert ev["thread"] == "Router"
assert ev["delta"] == 500
def test_heap_status_periodic(self) -> None:
out = parse_log_line(
"HEAP | 00:00:30 30 [Power] "
"Heap status: 92344/200000 bytes free (-128), running 8/12 threads"
)
assert out["heap_free"] == 92344
assert out["heap_total"] == 200000
assert out["heap_delta"] == -128
class TestRecorderDebugHeapSynthesis:
def test_log_with_heap_writes_telemetry(self, recorder: "Recorder") -> None:
# When a log line carries [heap N], the recorder should also
# emit a synthesized telemetry row tagged source=debug_heap.
recorder._on_log_line(
"INFO | 00:00:00 1 [Main] [heap 88888] hello",
_FakeIface(),
)
telem = (recorder.base_dir / "telemetry.jsonl").read_text().splitlines()
synth = [json.loads(r) for r in telem if '"source":"debug_heap"' in r]
assert len(synth) == 1
assert synth[0]["fields"]["heap_free_bytes"] == 88888
assert synth[0]["variant"] == "local"
def test_heap_status_writes_total_too(self, recorder: "Recorder") -> None:
recorder._on_log_line(
"HEAP | 00:00:30 30 [Power] "
"Heap status: 50000/200000 bytes free (-100), running 8/12 threads",
_FakeIface(),
)
telem = (recorder.base_dir / "telemetry.jsonl").read_text().splitlines()
synth = [json.loads(r) for r in telem if '"source":"debug_heap"' in r]
assert synth[-1]["fields"]["heap_free_bytes"] == 50000
assert synth[-1]["fields"]["heap_total_bytes"] == 200000
def test_no_heap_no_synthesis(self, recorder: "Recorder") -> None:
# Plain log line (no [heap N], no Heap status) — telemetry.jsonl
# should NOT gain a synth row.
before = (recorder.base_dir / "telemetry.jsonl").read_text().count("\n")
recorder._on_log_line("INFO | 00:00:00 1 [Main] just a message", _FakeIface())
after = (recorder.base_dir / "telemetry.jsonl").read_text().count("\n")
assert after == before
def test_thread_leak_event_persists_on_log_row(self, recorder: "Recorder") -> None:
recorder._on_log_line(
"HEAP | 00:00:01 100 [Power] [heap 90000] "
"------ Thread MeshPacket leaked heap 92344 -> 90000 (-2344) ------",
_FakeIface(),
)
rows = [
json.loads(r)
for r in (recorder.base_dir / "logs.jsonl").read_text().splitlines()
if r
]
evt_rows = [r for r in rows if r.get("heap_event")]
assert len(evt_rows) == 1
assert evt_rows[0]["heap_event"]["thread"] == "MeshPacket"
assert evt_rows[0]["heap_event"]["delta"] == -2344
class TestSerialTap:
def test_serial_line_records_log_and_synthesizes_heap(
self, recorder: "Recorder"
) -> None:
recorder._on_serial_line(
"INFO | 00:00:00 5 [Main] [heap 88888] tap-line",
port="/dev/cu.tap",
)
logs = (recorder.base_dir / "logs.jsonl").read_text().splitlines()
telem = (recorder.base_dir / "telemetry.jsonl").read_text().splitlines()
log_rows = [json.loads(r) for r in logs if r]
# Find the row from this call (port=/dev/cu.tap, role=serial_session)
tap_rows = [r for r in log_rows if r.get("port") == "/dev/cu.tap"]
assert len(tap_rows) == 1
assert tap_rows[0]["role"] == "serial_session"
assert tap_rows[0]["level"] == "INFO"
assert tap_rows[0]["tag"] == "Main"
assert tap_rows[0]["heap_free"] == 88888
synth = [json.loads(r) for r in telem if '"source":"debug_heap_serial"' in r]
assert len(synth) == 1
assert synth[0]["fields"]["heap_free_bytes"] == 88888
assert synth[0]["role"] == "serial_session"
def test_serial_line_thread_leak_event(self, recorder: "Recorder") -> None:
recorder._on_serial_line(
"HEAP | 00:00:30 30 [Power] [heap 53484] "
"------ Thread Router leaked heap 53612 -> 53484 (-128) ------",
port="/dev/cu.tap",
)
rows = [
json.loads(r)
for r in (recorder.base_dir / "logs.jsonl").read_text().splitlines()
if r
]
evt = [r for r in rows if r.get("heap_event")]
assert len(evt) == 1
assert evt[0]["heap_event"]["thread"] == "Router"
assert evt[0]["heap_event"]["delta"] == -128
# Heap also synthesized.
telem = (recorder.base_dir / "telemetry.jsonl").read_text()
assert '"source":"debug_heap_serial"' in telem
def test_serial_line_pause(self, recorder: "Recorder") -> None:
recorder.pause("baseline")
recorder._on_serial_line(
"INFO | 00:00:00 1 [t] [heap 1000] dropped",
port="/dev/cu.tap",
)
# Only the pause event row should exist; no tap row.
logs = (recorder.base_dir / "logs.jsonl").read_text()
assert "dropped" not in logs
def test_serial_line_handler_swallows_exceptions(
self, recorder: "Recorder"
) -> None:
# Hostile input — should not raise.
recorder._on_serial_line(None, port="/dev/cu.tap") # type: ignore[arg-type]
recorder._on_serial_line(b"\x00\x01\x02\x03", port="/dev/cu.tap") # type: ignore[arg-type]
# Survived.
class TestExtractTelemetry:
def test_local_stats_camel(self) -> None:
pkt = {
"decoded": {
"telemetry": {
"localStats": {"heap_total_bytes": 1000, "heap_free_bytes": 600}
}
}
}
out = extract_telemetry(pkt)
assert out is not None
assert out["variant"] == "local"
assert out["fields"]["heap_free_bytes"] == 600
def test_device_metrics_snake(self) -> None:
pkt = {
"decoded": {
"telemetry": {"device_metrics": {"battery_level": 88, "voltage": 4.1}}
}
}
out = extract_telemetry(pkt)
assert out is not None
assert out["variant"] == "device"
assert out["fields"]["battery_level"] == 88
def test_unknown_variant_returns_none(self) -> None:
assert extract_telemetry({"decoded": {"telemetry": {"weird": {}}}}) is None
assert extract_telemetry({}) is None
assert extract_telemetry({"decoded": "not-a-dict"}) is None
class TestSummarizePacket:
def test_text_with_payload(self) -> None:
pkt = {
"fromId": "!abc",
"toId": "!def",
"decoded": {"portnum": "TEXT_MESSAGE_APP", "payload": b"hello"},
"hopLimit": 3,
}
out = summarize_packet(pkt)
assert out["from_node"] == "!abc"
assert out["portnum"] == "TEXT_MESSAGE_APP"
assert out["payload_size"] == 5
assert out["payload_hex_prefix"] == "68656c6c6f"
def test_no_decoded(self) -> None:
out = summarize_packet({"fromId": "!abc"})
assert out["from_node"] == "!abc"
assert out["portnum"] is None
class TestInterfaceLabel:
def test_serial(self) -> None:
assert interface_label(_FakeIface()) == {
"port": "/dev/cu.fake",
"role": "serial",
}
def test_tcp(self) -> None:
class T:
hostname = "node.lan"
portNumber = 4403
assert interface_label(T()) == {"port": "tcp://node.lan:4403", "role": "tcp"}
def test_unknown(self) -> None:
assert interface_label(object()) == {"port": "object", "role": None}
def test_none(self) -> None:
assert interface_label(None) == {"port": None, "role": None}
# -- recorder write side ---------------------------------------------
class TestRecorderWrites:
def test_log_line_is_recorded(self, recorder: Recorder) -> None:
recorder._on_log_line("INFO | 12:34:56 99 [T] hi", _FakeIface())
path = recorder.base_dir / "logs.jsonl"
rows = [json.loads(line) for line in path.read_text().splitlines() if line]
# First row is recorder_start_event mirror? No — that's events.jsonl only.
assert any(r.get("level") == "INFO" and r.get("tag") == "T" for r in rows)
def test_telemetry_recorded_and_packet_double(self, recorder: Recorder) -> None:
# _on_telemetry alone — only telemetry.jsonl
recorder._on_telemetry(
{
"fromId": "!abc",
"decoded": {"telemetry": {"localStats": {"heap_free_bytes": 600}}},
},
_FakeIface(),
)
telem_rows = (recorder.base_dir / "telemetry.jsonl").read_text().splitlines()
assert any('"variant":"local"' in r for r in telem_rows)
def test_packets_summary(self, recorder: Recorder) -> None:
recorder._on_receive(
{
"fromId": "!abc",
"toId": "!def",
"decoded": {"portnum": "TEXT_MESSAGE_APP", "payload": b"hi"},
},
_FakeIface(),
)
rows = (recorder.base_dir / "packets.jsonl").read_text().splitlines()
assert any('"portnum":"TEXT_MESSAGE_APP"' in r for r in rows)
def test_mark_event_round_trip(self, recorder: Recorder) -> None:
out = recorder.mark_event("checkpoint", note="midpoint")
assert "ts" in out
events = (recorder.base_dir / "events.jsonl").read_text().splitlines()
logs = (recorder.base_dir / "logs.jsonl").read_text().splitlines()
assert any('"label":"checkpoint"' in r and '"kind":"mark"' in r for r in events)
assert any('"level":"MARK"' in r and "checkpoint" in r for r in logs)
def test_pause_drops_writes(self, recorder: Recorder) -> None:
before = len((recorder.base_dir / "logs.jsonl").read_text().splitlines())
recorder.pause(reason="baseline")
recorder._on_log_line("INFO | 00:00:00 1 [t] swallowed", _FakeIface())
after = len((recorder.base_dir / "logs.jsonl").read_text().splitlines())
assert after == before
recorder.resume()
recorder._on_log_line("INFO | 00:00:00 2 [t] kept", _FakeIface())
post_resume = (recorder.base_dir / "logs.jsonl").read_text()
assert "kept" in post_resume
def test_pubsub_handler_swallows_exceptions(self, recorder: Recorder) -> None:
# If the writer dies, the pubsub callback must NOT raise — that
# would crash the meshtastic receive thread.
bad_packet = object() # not a dict
recorder._on_receive(bad_packet, _FakeIface()) # type: ignore[arg-type]
recorder._on_telemetry(bad_packet, _FakeIface()) # type: ignore[arg-type]
recorder._on_log_line(None, _FakeIface()) # type: ignore[arg-type]
# No assertion needed — survival is the test.
# -- log_query read side ---------------------------------------------
class TestLogQuery:
def test_logs_window_grep_and_level(self, recorder: Recorder) -> None:
recorder._on_log_line("INFO | 12:00:00 1 [A] alpha", _FakeIface())
recorder._on_log_line("WARN | 12:00:01 2 [B] bravo failed", _FakeIface())
recorder._on_log_line("ERROR | 12:00:02 3 [C] charlie failed", _FakeIface())
out = log_query.logs_window(start="-1m", level="WARN|ERROR", max_lines=10)
assert out["total_matched"] == 2
levels = {r["level"] for r in out["lines"]}
assert levels == {"WARN", "ERROR"}
out2 = log_query.logs_window(start="-1m", grep=r"failed$", max_lines=10)
assert out2["total_matched"] == 2
def test_logs_window_invalid_regex(self, recorder: Recorder) -> None:
recorder._on_log_line("INFO | 12:00:00 1 [A] alpha", _FakeIface())
with pytest.raises(ValueError, match="invalid grep regex"):
log_query.logs_window(start="-1m", grep="(")
def test_telemetry_timeline_slope_and_downsample(self, recorder: Recorder) -> None:
# Synthesize a downward leak: 100 points, free_heap drops 1 byte/sample.
base_ts = time.time() - 60
for i in range(100):
recorder._files["telemetry"].write(
{
"ts": base_ts + i * 0.5,
"port": "/dev/cu.fake",
"role": "serial",
"from_node": "!abc",
"variant": "local",
"fields": {"heap_free_bytes": 10000 - i},
}
)
out = log_query.telemetry_timeline(
window="2m", variant="local", field="free_heap", max_points=10
)
assert out["samples"] == 100
assert len(out["points"]) <= 10
# Negative slope (heap dropping). Magnitude: 1 byte every 0.5s = 120/min.
assert out["slope_per_min"] is not None
assert out["slope_per_min"] < -100
def test_export_bundles_slice(self, recorder: Recorder, tmp_path: Path) -> None:
recorder._on_log_line("INFO | 00:00:00 1 [t] one", _FakeIface())
recorder._on_log_line("INFO | 00:00:00 2 [t] two", _FakeIface())
dest = tmp_path / "bundle"
out = log_query.export(start="-1m", end="now", dest_dir=str(dest))
assert (dest / "logs.jsonl").exists()
assert "logs" in out["paths"]
# -- time parser -----------------------------------------------------
class TestParseTime:
def test_relative(self) -> None:
now = 1_000_000.0
assert log_query._parse_time("-15m", now=now) == now - 900
assert log_query._parse_time("-2h", now=now) == now - 7200
assert log_query._parse_time("-1d", now=now) == now - 86400
def test_now_and_epoch(self) -> None:
now = 1_000_000.0
assert log_query._parse_time("now", now=now) == now
assert log_query._parse_time(now) == now
def test_iso(self) -> None:
ts = log_query._parse_time("2026-01-01T00:00:00Z")
assert isinstance(ts, float) and ts > 1_700_000_000
def test_naive_iso_assumes_utc(self) -> None:
assert log_query._parse_time("2026-01-01T00:00:00") == log_query._parse_time(
"2026-01-01T00:00:00Z"
)
def test_invalid(self) -> None:
with pytest.raises(ValueError):
log_query._parse_time("not a time")
# -- rotation --------------------------------------------------------
class TestRotation:
def test_size_cap_rotates_and_gzips(self, tmp_path: Path) -> None:
path = tmp_path / "rot.jsonl"
r = _RotatingJsonl(path, max_bytes=512, keep_archives=5, check_every=1)
for i in range(100):
r.write({"ts": float(i), "i": i, "pad": "x" * 40})
r.close()
archives = sorted(tmp_path.glob("rot.*.jsonl.gz"))
assert archives, "expected at least one rotation"
# Archive content is valid gzip + valid JSONL
with gzip.open(archives[0], "rt") as fh:
first = json.loads(fh.readline())
assert "ts" in first
def test_archive_pruning(self, tmp_path: Path) -> None:
path = tmp_path / "rot.jsonl"
r = _RotatingJsonl(path, max_bytes=200, keep_archives=2, check_every=1)
# Force several rotations.
for _ in range(8):
for i in range(20):
r.write({"ts": float(i), "pad": "x" * 30})
r.force_rotate()
r.close()
archives = sorted(tmp_path.glob("rot.*.jsonl.gz"))
assert len(archives) <= 2, f"expected ≤2 kept archives, got {len(archives)}"
def test_archive_pruning_uses_filename_order(self, tmp_path: Path) -> None:
path = tmp_path / "rot.jsonl"
r = _RotatingJsonl(path, keep_archives=2)
old = tmp_path / "rot.20260101-000000-000000-00000.jsonl.gz"
mid = tmp_path / "rot.20260101-000001-000000-00000.jsonl.gz"
new = tmp_path / "rot.20260101-000002-000000-00000.jsonl.gz"
for archive in (old, mid, new):
with gzip.open(archive, "wt", encoding="utf-8") as fh:
fh.write('{"ts":1}\n')
# Deliberately scramble mtimes so lexicographic filename order is
# the only stable chronological signal.
os.utime(old, (300, 300))
os.utime(mid, (100, 100))
os.utime(new, (200, 200))
r._prune_archives()
r.close()
archives = sorted(p.name for p in tmp_path.glob("rot.*.jsonl.gz"))
assert archives == [mid.name, new.name]
def test_force_rotate_when_below_threshold(self, tmp_path: Path) -> None:
path = tmp_path / "rot.jsonl"
r = _RotatingJsonl(path, max_bytes=10_000_000, check_every=999_999)
r.write({"ts": 1.0, "msg": "tiny"})
r.force_rotate()
r.write({"ts": 2.0, "msg": "after-rotate"})
r.close()
archives = sorted(tmp_path.glob("rot.*.jsonl.gz"))
assert len(archives) == 1
assert path.exists()
assert "after-rotate" in path.read_text()
class TestRecorderLocks:
def test_force_rotate_all_returns_status(self, recorder: Recorder) -> None:
out = recorder.force_rotate_all()
assert out["running"] is True
assert out["files"]
def test_wire_pubsub_logs_subscription_failure(
self,
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
class FailingPubSubMock:
def subscribe(self, callback: object, topic: str) -> None:
raise RuntimeError("boom")
monkeypatch.setattr(pubsub, "pub", FailingPubSubMock())
recorder = Recorder(base_dir=tmp_path)
with caplog.at_level(logging.WARNING):
recorder._wire_pubsub()
assert (
"Recorder failed to subscribe to meshtastic.log.line: boom" in caplog.text
)