mirror of
https://github.com/meshtastic/firmware.git
synced 2026-05-19 14:25:28 -04:00
* Implement rotating JSONL recorder for persistent logging * Fixes * Update documentation and clean up imports in command files * Address remaining recorder review feedback Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/2541773c-869a-463f-9fae-8505272c06ff Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com> * recorder: fix lock re-entry deadlock on start() and force_rotate_all() The previous "Fixes" commit added `_files_snapshot()` which acquires `self._lock` so handlers don't race with `stop()` clearing `_files`. But two callers were already holding `self._lock` when they invoked methods that go through the snapshot: - `start()` writes the `recorder_start` event from inside its `with self._lock:` block. `_write_event` -> `_files_snapshot` re-acquires the same non-reentrant `threading.Lock`, freezing process startup. - `force_rotate_all()` calls `self.status()` (which also acquires `self._lock`) while still holding the lock from rotating each file. Both fixes release the lock before the call. The recorder_start marker still lands in events.jsonl because the started/started_at flags are already set when we write it. Verified end-to-end against the standalone /tmp/verify_pr_fixes.py harness — all 9 PR review-comment fixes pass, including pause/resume event ordering and concurrent start/stop without KeyError. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Fix markdown linting issues in leakhunt.md and repro.md * Handle recorder startup and query review fixes Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com> * Tighten recorder follow-up tests Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com> * Stabilize recorder startup tests Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com> * Remove brittle recorder startup test Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com> * Polish recorder follow-up errors Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com> * Refine recorder startup and regex errors Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com> * Clean up recorder follow-up nits Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/78540a9f-fe62-4350-b252-0ae5621f0b8a Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com> * Trunk --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
390 lines
12 KiB
Python
Executable File
390 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Forward selected recorder JSONL streams to Datadog.
|
|
|
|
Reads `.mtlog/logs.jsonl` and `.mtlog/telemetry.jsonl`, ships logs to the
|
|
Logs Intake API and telemetry numerics to the Metrics v2 series API.
|
|
Resumes from `.mtlog/.dd-cursor.json` so a daemon restart doesn't
|
|
duplicate rows already shipped from the current live files.
|
|
|
|
This forwarder does not currently backfill rotated `.jsonl.gz` archives.
|
|
If the recorder rotates before this process drains the live file, or the
|
|
forwarder is down across a rotation, those older rows are skipped.
|
|
|
|
Usage:
|
|
DD_API_KEY=... ./scripts/mtlog_to_datadog.py --tail
|
|
./scripts/mtlog_to_datadog.py --once # catch up + exit
|
|
./scripts/mtlog_to_datadog.py --since 3600 # backfill last hour from start
|
|
|
|
Default `DD_SITE` is `us5.datadoghq.com` — the team's Datadog instance.
|
|
Override via `DD_SITE=...` env var or `--site` flag for one-offs.
|
|
|
|
The forwarder is a separate process by design — a Datadog outage or
|
|
auth failure must not backpressure the recorder. We exit non-zero on
|
|
fatal config errors (missing API key) and keep retrying on transient
|
|
network/HTTP errors.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import socket
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Iterator
|
|
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
print(
|
|
"requests is required. Install it in the mcp-server venv: "
|
|
"uv pip install requests",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(2)
|
|
|
|
|
|
_DEFAULT_LOG_DIR = Path(__file__).resolve().parents[1] / ".mtlog"
|
|
_LOG_INTAKE_TPL = "https://http-intake.logs.{site}/api/v2/logs"
|
|
_METRICS_TPL = "https://api.{site}/api/v2/series"
|
|
_LOG_BATCH = 50
|
|
_METRICS_BATCH = 100
|
|
_MAX_RETRIES = 5
|
|
_RETRY_BASE_S = 1.5
|
|
|
|
|
|
# --- streaming JSONL with byte-position cursor -------------------------
|
|
|
|
|
|
class _StreamReader:
|
|
"""Reads a single rotating JSONL with cursor-based resume.
|
|
|
|
This tails only the live `.jsonl` file. The recorder rotates files
|
|
(live `.jsonl` → `.YYYYMMDD-HHMMSS-uuuuuu-NNNNN.jsonl.gz`), which means
|
|
the live file shrinks abruptly. We detect that via inode change OR live
|
|
size < cursor position, and reset the live-file cursor to 0.
|
|
"""
|
|
|
|
def __init__(self, path: Path, cursor: dict[str, Any]):
|
|
self.path = path
|
|
self.cursor = cursor
|
|
|
|
def _state(self) -> tuple[int, int]:
|
|
"""Return (inode, size) for the live file. (0, 0) if missing."""
|
|
try:
|
|
st = self.path.stat()
|
|
return (st.st_ino, st.st_size)
|
|
except FileNotFoundError:
|
|
return (0, 0)
|
|
|
|
def iter_new_records(self) -> Iterator[dict[str, Any]]:
|
|
ino, size = self._state()
|
|
last_ino = self.cursor.get("ino")
|
|
last_pos = int(self.cursor.get("pos") or 0)
|
|
if ino == 0:
|
|
return
|
|
if last_ino is not None and last_ino != ino:
|
|
# Rotation happened. Start over.
|
|
last_pos = 0
|
|
if last_pos > size:
|
|
# Live file truncated/shrunk under us — recorder rotated.
|
|
last_pos = 0
|
|
try:
|
|
with self.path.open("r", encoding="utf-8") as fh:
|
|
fh.seek(last_pos)
|
|
for line in fh:
|
|
line = line.rstrip("\n")
|
|
if not line:
|
|
continue
|
|
try:
|
|
yield json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
last_pos = fh.tell()
|
|
except FileNotFoundError:
|
|
return
|
|
self.cursor["ino"] = ino
|
|
self.cursor["pos"] = last_pos
|
|
|
|
|
|
def _load_cursor(path: Path) -> dict[str, Any]:
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(path.read_text())
|
|
except (OSError, json.JSONDecodeError):
|
|
return {}
|
|
|
|
|
|
def _save_cursor(path: Path, data: dict[str, Any]) -> None:
|
|
tmp = path.with_suffix(".json.tmp")
|
|
tmp.write_text(json.dumps(data, separators=(",", ":")))
|
|
tmp.replace(path)
|
|
|
|
|
|
# --- Datadog clients ---------------------------------------------------
|
|
|
|
|
|
class _DDSession:
|
|
"""Pool one HTTPS session, share retry logic."""
|
|
|
|
def __init__(self, api_key: str, site: str, hostname: str) -> None:
|
|
self.api_key = api_key
|
|
self.site = site
|
|
self.hostname = hostname
|
|
self.session = requests.Session()
|
|
self.session.headers.update(
|
|
{
|
|
"DD-API-KEY": api_key,
|
|
"Content-Type": "application/json",
|
|
}
|
|
)
|
|
|
|
def _post(self, url: str, payload: Any) -> bool:
|
|
for attempt in range(_MAX_RETRIES):
|
|
try:
|
|
resp = self.session.post(url, json=payload, timeout=30)
|
|
except requests.RequestException as e:
|
|
_wait_retry(attempt, f"network error: {e}")
|
|
continue
|
|
if 200 <= resp.status_code < 300:
|
|
return True
|
|
if resp.status_code in (408, 429, 500, 502, 503, 504):
|
|
_wait_retry(
|
|
attempt,
|
|
f"HTTP {resp.status_code} retrying",
|
|
)
|
|
continue
|
|
print(
|
|
f"datadog refused: {resp.status_code} {resp.text[:200]}",
|
|
file=sys.stderr,
|
|
)
|
|
return False
|
|
return False
|
|
|
|
def send_logs(self, records: list[dict[str, Any]]) -> int:
|
|
if not records:
|
|
return 0
|
|
url = _LOG_INTAKE_TPL.format(site=self.site)
|
|
sent = 0
|
|
for i in range(0, len(records), _LOG_BATCH):
|
|
batch = records[i : i + _LOG_BATCH]
|
|
if self._post(url, batch):
|
|
sent += len(batch)
|
|
return sent
|
|
|
|
def send_metrics(self, series: list[dict[str, Any]]) -> int:
|
|
if not series:
|
|
return 0
|
|
url = _METRICS_TPL.format(site=self.site)
|
|
sent = 0
|
|
for i in range(0, len(series), _METRICS_BATCH):
|
|
batch = series[i : i + _METRICS_BATCH]
|
|
if self._post(url, {"series": batch}):
|
|
sent += len(batch)
|
|
return sent
|
|
|
|
|
|
def _wait_retry(attempt: int, reason: str) -> None:
|
|
wait = _RETRY_BASE_S * (2**attempt)
|
|
print(
|
|
f" retry {attempt + 1}/{_MAX_RETRIES} in {wait:.1f}s ({reason})",
|
|
file=sys.stderr,
|
|
)
|
|
time.sleep(wait)
|
|
|
|
|
|
# --- record → datadog payload ------------------------------------------
|
|
|
|
|
|
def _log_record_to_dd(rec: dict[str, Any], host: str) -> dict[str, Any]:
|
|
line = rec.get("line") or ""
|
|
tags = [
|
|
f"role:{rec.get('role')}",
|
|
f"port:{rec.get('port')}",
|
|
]
|
|
level = rec.get("level")
|
|
if level:
|
|
tags.append(f"level:{level}")
|
|
tag = rec.get("tag")
|
|
if tag:
|
|
tags.append(f"thread:{tag}")
|
|
return {
|
|
"ddsource": "meshtastic-firmware",
|
|
"service": "meshtastic-firmware",
|
|
"hostname": host,
|
|
"message": line,
|
|
"ddtags": ",".join(t for t in tags if t and "None" not in t),
|
|
"timestamp": int((rec.get("ts") or time.time()) * 1000),
|
|
"level": level,
|
|
}
|
|
|
|
|
|
def _telemetry_record_to_metrics(
|
|
rec: dict[str, Any], host: str
|
|
) -> list[dict[str, Any]]:
|
|
fields = rec.get("fields") or {}
|
|
if not isinstance(fields, dict):
|
|
return []
|
|
variant = rec.get("variant") or "unknown"
|
|
ts = int(rec.get("ts") or time.time())
|
|
out: list[dict[str, Any]] = []
|
|
tags = []
|
|
if rec.get("port"):
|
|
tags.append(f"port:{rec['port']}")
|
|
if rec.get("role"):
|
|
tags.append(f"role:{rec['role']}")
|
|
if rec.get("from_node"):
|
|
tags.append(f"from_node:{rec['from_node']}")
|
|
tags.append(f"variant:{variant}")
|
|
for field, value in fields.items():
|
|
if not isinstance(value, (int, float)) or isinstance(value, bool):
|
|
continue
|
|
metric = f"mesh.{variant}.{_metric_safe(field)}"
|
|
out.append(
|
|
{
|
|
"metric": metric,
|
|
"type": 3, # GAUGE
|
|
"points": [{"timestamp": ts, "value": float(value)}],
|
|
"tags": tags,
|
|
"resources": [{"type": "host", "name": host}],
|
|
}
|
|
)
|
|
return out
|
|
|
|
|
|
def _metric_safe(name: str) -> str:
|
|
# Lowercase, replace non-alnum with underscore for safe metric names.
|
|
return "".join(c.lower() if c.isalnum() else "_" for c in name)
|
|
|
|
|
|
# --- main loop ---------------------------------------------------------
|
|
|
|
|
|
def run(
|
|
log_dir: Path,
|
|
*,
|
|
once: bool,
|
|
since_seconds: float | None,
|
|
poll_interval: float,
|
|
dd: _DDSession,
|
|
) -> int:
|
|
cursor_path = log_dir / ".dd-cursor.json"
|
|
cursors = _load_cursor(cursor_path)
|
|
|
|
# `--since` overrides cursor: rewind to (now-since) timestamp.
|
|
# We can't seek by timestamp directly (cursor is byte position), so
|
|
# we just reset cursors to 0 and let the time filter in iter_new
|
|
# drop older records.
|
|
cutoff_ts: float | None = None
|
|
if since_seconds is not None:
|
|
cursors = {}
|
|
cutoff_ts = time.time() - since_seconds
|
|
|
|
sent_total = {"logs": 0, "telemetry": 0}
|
|
|
|
while True:
|
|
# logs.jsonl → DD logs
|
|
log_cursor = cursors.setdefault("logs", {})
|
|
log_batch: list[dict[str, Any]] = []
|
|
for rec in _StreamReader(log_dir / "logs.jsonl", log_cursor).iter_new_records():
|
|
if cutoff_ts and (rec.get("ts") or 0) < cutoff_ts:
|
|
continue
|
|
log_batch.append(_log_record_to_dd(rec, dd.hostname))
|
|
if log_batch:
|
|
n = dd.send_logs(log_batch)
|
|
sent_total["logs"] += n
|
|
print(f"logs: sent {n}/{len(log_batch)}")
|
|
|
|
# telemetry.jsonl → DD metrics
|
|
telem_cursor = cursors.setdefault("telemetry", {})
|
|
metric_series: list[dict[str, Any]] = []
|
|
for rec in _StreamReader(
|
|
log_dir / "telemetry.jsonl", telem_cursor
|
|
).iter_new_records():
|
|
if cutoff_ts and (rec.get("ts") or 0) < cutoff_ts:
|
|
continue
|
|
metric_series.extend(_telemetry_record_to_metrics(rec, dd.hostname))
|
|
if metric_series:
|
|
n = dd.send_metrics(metric_series)
|
|
sent_total["telemetry"] += n
|
|
print(f"telemetry: sent {n}/{len(metric_series)} metric points")
|
|
|
|
_save_cursor(cursor_path, cursors)
|
|
|
|
if once:
|
|
print(f"done. logs={sent_total['logs']} metrics={sent_total['telemetry']}")
|
|
return 0
|
|
time.sleep(poll_interval)
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument(
|
|
"--log-dir",
|
|
default=str(_DEFAULT_LOG_DIR),
|
|
help="Path to .mtlog/ (default: mcp-server/.mtlog)",
|
|
)
|
|
mode = parser.add_mutually_exclusive_group()
|
|
mode.add_argument("--once", action="store_true", help="Catch up then exit")
|
|
mode.add_argument(
|
|
"--tail",
|
|
action="store_true",
|
|
help="Daemon: poll forever (default)",
|
|
)
|
|
parser.add_argument(
|
|
"--since",
|
|
type=float,
|
|
default=None,
|
|
help="Backfill last N seconds. Resets cursor.",
|
|
)
|
|
parser.add_argument(
|
|
"--poll-interval",
|
|
type=float,
|
|
default=5.0,
|
|
help="Seconds between tail polls (default 5)",
|
|
)
|
|
parser.add_argument(
|
|
"--site",
|
|
default=os.environ.get("DD_SITE", "us5.datadoghq.com"),
|
|
help=(
|
|
"Datadog site. Default is the team's instance (us5.datadoghq.com). "
|
|
"Override via DD_SITE env var or this flag."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--host",
|
|
default=socket.gethostname(),
|
|
help="Hostname tag (default: socket.gethostname())",
|
|
)
|
|
args = parser.parse_args(argv)
|
|
|
|
api_key = os.environ.get("DD_API_KEY")
|
|
if not api_key:
|
|
print("DD_API_KEY env var required.", file=sys.stderr)
|
|
return 2
|
|
|
|
log_dir = Path(args.log_dir)
|
|
if not log_dir.exists():
|
|
print(
|
|
f"log dir {log_dir} does not exist — start the mcp-server first.",
|
|
file=sys.stderr,
|
|
)
|
|
return 2
|
|
|
|
dd = _DDSession(api_key=api_key, site=args.site, hostname=args.host)
|
|
once = args.once and not args.tail
|
|
return run(
|
|
log_dir,
|
|
once=once,
|
|
since_seconds=args.since,
|
|
poll_interval=args.poll_interval,
|
|
dd=dd,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|