Files
firmware/mcp-server/scripts/mtlog_to_datadog.py
Ben Meadors f6a954b97e Implement rotating JSONL recorder for persistent logging (#10428)
* Implement rotating JSONL recorder for persistent logging

* Fixes

* Update documentation and clean up imports in command files

* Address remaining recorder review feedback

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/2541773c-869a-463f-9fae-8505272c06ff

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* recorder: fix lock re-entry deadlock on start() and force_rotate_all()

The previous "Fixes" commit added `_files_snapshot()` which acquires
`self._lock` so handlers don't race with `stop()` clearing `_files`.
But two callers were already holding `self._lock` when they invoked
methods that go through the snapshot:

  - `start()` writes the `recorder_start` event from inside its `with
    self._lock:` block. `_write_event` -> `_files_snapshot` re-acquires
    the same non-reentrant `threading.Lock`, freezing process startup.

  - `force_rotate_all()` calls `self.status()` (which also acquires
    `self._lock`) while still holding the lock from rotating each file.

Both fixes release the lock before the call. The recorder_start marker
still lands in events.jsonl because the started/started_at flags are
already set when we write it.

Verified end-to-end against the standalone /tmp/verify_pr_fixes.py
harness — all 9 PR review-comment fixes pass, including pause/resume
event ordering and concurrent start/stop without KeyError.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Fix markdown linting issues in leakhunt.md and repro.md

* Handle recorder startup and query review fixes

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* Tighten recorder follow-up tests

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* Stabilize recorder startup tests

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* Remove brittle recorder startup test

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* Polish recorder follow-up errors

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* Refine recorder startup and regex errors

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* Clean up recorder follow-up nits

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* Trunk

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 09:22:40 -05:00

390 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""Forward selected recorder JSONL streams to Datadog.
Reads `.mtlog/logs.jsonl` and `.mtlog/telemetry.jsonl`, ships logs to the
Logs Intake API and telemetry numerics to the Metrics v2 series API.
Resumes from `.mtlog/.dd-cursor.json` so a daemon restart doesn't
duplicate rows already shipped from the current live files.
This forwarder does not currently backfill rotated `.jsonl.gz` archives.
If the recorder rotates before this process drains the live file, or the
forwarder is down across a rotation, those older rows are skipped.
Usage:
DD_API_KEY=... ./scripts/mtlog_to_datadog.py --tail
./scripts/mtlog_to_datadog.py --once # catch up + exit
./scripts/mtlog_to_datadog.py --since 3600 # backfill last hour from start
Default `DD_SITE` is `us5.datadoghq.com` — the team's Datadog instance.
Override via `DD_SITE=...` env var or `--site` flag for one-offs.
The forwarder is a separate process by design — a Datadog outage or
auth failure must not backpressure the recorder. We exit non-zero on
fatal config errors (missing API key) and keep retrying on transient
network/HTTP errors.
"""
from __future__ import annotations
import argparse
import json
import os
import socket
import sys
import time
from pathlib import Path
from typing import Any, Iterator
try:
import requests
except ImportError:
print(
"requests is required. Install it in the mcp-server venv: "
"uv pip install requests",
file=sys.stderr,
)
sys.exit(2)
_DEFAULT_LOG_DIR = Path(__file__).resolve().parents[1] / ".mtlog"
_LOG_INTAKE_TPL = "https://http-intake.logs.{site}/api/v2/logs"
_METRICS_TPL = "https://api.{site}/api/v2/series"
_LOG_BATCH = 50
_METRICS_BATCH = 100
_MAX_RETRIES = 5
_RETRY_BASE_S = 1.5
# --- streaming JSONL with byte-position cursor -------------------------
class _StreamReader:
"""Reads a single rotating JSONL with cursor-based resume.
This tails only the live `.jsonl` file. The recorder rotates files
(live `.jsonl` → `.YYYYMMDD-HHMMSS-uuuuuu-NNNNN.jsonl.gz`), which means
the live file shrinks abruptly. We detect that via inode change OR live
size < cursor position, and reset the live-file cursor to 0.
"""
def __init__(self, path: Path, cursor: dict[str, Any]):
self.path = path
self.cursor = cursor
def _state(self) -> tuple[int, int]:
"""Return (inode, size) for the live file. (0, 0) if missing."""
try:
st = self.path.stat()
return (st.st_ino, st.st_size)
except FileNotFoundError:
return (0, 0)
def iter_new_records(self) -> Iterator[dict[str, Any]]:
ino, size = self._state()
last_ino = self.cursor.get("ino")
last_pos = int(self.cursor.get("pos") or 0)
if ino == 0:
return
if last_ino is not None and last_ino != ino:
# Rotation happened. Start over.
last_pos = 0
if last_pos > size:
# Live file truncated/shrunk under us — recorder rotated.
last_pos = 0
try:
with self.path.open("r", encoding="utf-8") as fh:
fh.seek(last_pos)
for line in fh:
line = line.rstrip("\n")
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError:
continue
last_pos = fh.tell()
except FileNotFoundError:
return
self.cursor["ino"] = ino
self.cursor["pos"] = last_pos
def _load_cursor(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
return json.loads(path.read_text())
except (OSError, json.JSONDecodeError):
return {}
def _save_cursor(path: Path, data: dict[str, Any]) -> None:
tmp = path.with_suffix(".json.tmp")
tmp.write_text(json.dumps(data, separators=(",", ":")))
tmp.replace(path)
# --- Datadog clients ---------------------------------------------------
class _DDSession:
"""Pool one HTTPS session, share retry logic."""
def __init__(self, api_key: str, site: str, hostname: str) -> None:
self.api_key = api_key
self.site = site
self.hostname = hostname
self.session = requests.Session()
self.session.headers.update(
{
"DD-API-KEY": api_key,
"Content-Type": "application/json",
}
)
def _post(self, url: str, payload: Any) -> bool:
for attempt in range(_MAX_RETRIES):
try:
resp = self.session.post(url, json=payload, timeout=30)
except requests.RequestException as e:
_wait_retry(attempt, f"network error: {e}")
continue
if 200 <= resp.status_code < 300:
return True
if resp.status_code in (408, 429, 500, 502, 503, 504):
_wait_retry(
attempt,
f"HTTP {resp.status_code} retrying",
)
continue
print(
f"datadog refused: {resp.status_code} {resp.text[:200]}",
file=sys.stderr,
)
return False
return False
def send_logs(self, records: list[dict[str, Any]]) -> int:
if not records:
return 0
url = _LOG_INTAKE_TPL.format(site=self.site)
sent = 0
for i in range(0, len(records), _LOG_BATCH):
batch = records[i : i + _LOG_BATCH]
if self._post(url, batch):
sent += len(batch)
return sent
def send_metrics(self, series: list[dict[str, Any]]) -> int:
if not series:
return 0
url = _METRICS_TPL.format(site=self.site)
sent = 0
for i in range(0, len(series), _METRICS_BATCH):
batch = series[i : i + _METRICS_BATCH]
if self._post(url, {"series": batch}):
sent += len(batch)
return sent
def _wait_retry(attempt: int, reason: str) -> None:
wait = _RETRY_BASE_S * (2**attempt)
print(
f" retry {attempt + 1}/{_MAX_RETRIES} in {wait:.1f}s ({reason})",
file=sys.stderr,
)
time.sleep(wait)
# --- record → datadog payload ------------------------------------------
def _log_record_to_dd(rec: dict[str, Any], host: str) -> dict[str, Any]:
line = rec.get("line") or ""
tags = [
f"role:{rec.get('role')}",
f"port:{rec.get('port')}",
]
level = rec.get("level")
if level:
tags.append(f"level:{level}")
tag = rec.get("tag")
if tag:
tags.append(f"thread:{tag}")
return {
"ddsource": "meshtastic-firmware",
"service": "meshtastic-firmware",
"hostname": host,
"message": line,
"ddtags": ",".join(t for t in tags if t and "None" not in t),
"timestamp": int((rec.get("ts") or time.time()) * 1000),
"level": level,
}
def _telemetry_record_to_metrics(
rec: dict[str, Any], host: str
) -> list[dict[str, Any]]:
fields = rec.get("fields") or {}
if not isinstance(fields, dict):
return []
variant = rec.get("variant") or "unknown"
ts = int(rec.get("ts") or time.time())
out: list[dict[str, Any]] = []
tags = []
if rec.get("port"):
tags.append(f"port:{rec['port']}")
if rec.get("role"):
tags.append(f"role:{rec['role']}")
if rec.get("from_node"):
tags.append(f"from_node:{rec['from_node']}")
tags.append(f"variant:{variant}")
for field, value in fields.items():
if not isinstance(value, (int, float)) or isinstance(value, bool):
continue
metric = f"mesh.{variant}.{_metric_safe(field)}"
out.append(
{
"metric": metric,
"type": 3, # GAUGE
"points": [{"timestamp": ts, "value": float(value)}],
"tags": tags,
"resources": [{"type": "host", "name": host}],
}
)
return out
def _metric_safe(name: str) -> str:
# Lowercase, replace non-alnum with underscore for safe metric names.
return "".join(c.lower() if c.isalnum() else "_" for c in name)
# --- main loop ---------------------------------------------------------
def run(
log_dir: Path,
*,
once: bool,
since_seconds: float | None,
poll_interval: float,
dd: _DDSession,
) -> int:
cursor_path = log_dir / ".dd-cursor.json"
cursors = _load_cursor(cursor_path)
# `--since` overrides cursor: rewind to (now-since) timestamp.
# We can't seek by timestamp directly (cursor is byte position), so
# we just reset cursors to 0 and let the time filter in iter_new
# drop older records.
cutoff_ts: float | None = None
if since_seconds is not None:
cursors = {}
cutoff_ts = time.time() - since_seconds
sent_total = {"logs": 0, "telemetry": 0}
while True:
# logs.jsonl → DD logs
log_cursor = cursors.setdefault("logs", {})
log_batch: list[dict[str, Any]] = []
for rec in _StreamReader(log_dir / "logs.jsonl", log_cursor).iter_new_records():
if cutoff_ts and (rec.get("ts") or 0) < cutoff_ts:
continue
log_batch.append(_log_record_to_dd(rec, dd.hostname))
if log_batch:
n = dd.send_logs(log_batch)
sent_total["logs"] += n
print(f"logs: sent {n}/{len(log_batch)}")
# telemetry.jsonl → DD metrics
telem_cursor = cursors.setdefault("telemetry", {})
metric_series: list[dict[str, Any]] = []
for rec in _StreamReader(
log_dir / "telemetry.jsonl", telem_cursor
).iter_new_records():
if cutoff_ts and (rec.get("ts") or 0) < cutoff_ts:
continue
metric_series.extend(_telemetry_record_to_metrics(rec, dd.hostname))
if metric_series:
n = dd.send_metrics(metric_series)
sent_total["telemetry"] += n
print(f"telemetry: sent {n}/{len(metric_series)} metric points")
_save_cursor(cursor_path, cursors)
if once:
print(f"done. logs={sent_total['logs']} metrics={sent_total['telemetry']}")
return 0
time.sleep(poll_interval)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--log-dir",
default=str(_DEFAULT_LOG_DIR),
help="Path to .mtlog/ (default: mcp-server/.mtlog)",
)
mode = parser.add_mutually_exclusive_group()
mode.add_argument("--once", action="store_true", help="Catch up then exit")
mode.add_argument(
"--tail",
action="store_true",
help="Daemon: poll forever (default)",
)
parser.add_argument(
"--since",
type=float,
default=None,
help="Backfill last N seconds. Resets cursor.",
)
parser.add_argument(
"--poll-interval",
type=float,
default=5.0,
help="Seconds between tail polls (default 5)",
)
parser.add_argument(
"--site",
default=os.environ.get("DD_SITE", "us5.datadoghq.com"),
help=(
"Datadog site. Default is the team's instance (us5.datadoghq.com). "
"Override via DD_SITE env var or this flag."
),
)
parser.add_argument(
"--host",
default=socket.gethostname(),
help="Hostname tag (default: socket.gethostname())",
)
args = parser.parse_args(argv)
api_key = os.environ.get("DD_API_KEY")
if not api_key:
print("DD_API_KEY env var required.", file=sys.stderr)
return 2
log_dir = Path(args.log_dir)
if not log_dir.exists():
print(
f"log dir {log_dir} does not exist — start the mcp-server first.",
file=sys.stderr,
)
return 2
dd = _DDSession(api_key=api_key, site=args.site, hostname=args.host)
once = args.once and not args.tail
return run(
log_dir,
once=once,
since_seconds=args.since,
poll_interval=args.poll_interval,
dd=dd,
)
if __name__ == "__main__":
sys.exit(main())