Files
firmware/mcp-server/tests/mesh/_receive.py
Ben Meadors 6b15571e14 Add MCP server for interacting with meshtastic devices and testing framework / TUI (#10194)
* Start of MCP server and test suite

* Add MCP server for interacting with meshtastic devices and testing framework / TUI

* Update mcp-server/README.md

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix mcp-server review feedback from thread

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/91dc128a-ed50-4d07-8bb2-3dc6623a05f7

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* Enhance StreamAPI and PhoneAPI for improved log record handling and concurrency control

* Semgrep fixes

* Trunk and semgrep fixes

* optimize pio streaming tee file writes

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/04e26c6b-6a2b-45be-bbeb-79ae4d0be633

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* chore: remove redundant log handle assignment

Agent-Logs-Url: https://github.com/meshtastic/firmware/sessions/04e26c6b-6a2b-45be-bbeb-79ae4d0be633

Co-authored-by: thebentern <9000580+thebentern@users.noreply.github.com>

* Consolidate type imports and remove placeholder test files

* Add tests for config persistence and more exchange messages

* Refactor position test to validate on-demand request/reply behavior

* Remove  position request/reply test and update README for telemetry behavior

* Fix transmit history file to get removed on factory reset

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
2026-04-18 11:29:02 -05:00

221 lines
8.0 KiB
Python

"""Shared helper for mesh receive tests.
`pio device monitor` captures firmware log output, which does NOT include
decoded text message contents or telemetry payloads — those are only
accessible through `meshtastic.SerialInterface`'s pubsub mechanism.
`ReceiveCollector` opens a long-lived SerialInterface on a port, subscribes
to the pubsub topic of interest, and exposes an atomic `wait_for(predicate)`
that mesh tests use to verify end-to-end delivery.
This module also exposes two module-level helpers for forcing a device to
broadcast a fresh NodeInfo — the on-demand path that sidesteps the
firmware's 10-minute NodeInfo rate-limit. Tests doing directed PKI-encrypted
sends need BOTH endpoints to hold current pubkeys for each other:
nudge_nodeinfo(iface) # nudge an already-open SerialInterface
nudge_nodeinfo_port(port) # open briefly, nudge, close
See `ReceiveCollector.broadcast_nodeinfo_ping` for the firmware-side
rationale (PKI staleness → directed sends NAK with Routing.Error=35
PKI_UNKNOWN_PUBKEY or 39 PKI_SEND_FAIL_PUBLIC_KEY).
"""
from __future__ import annotations
import threading
import time
from typing import Any, Callable
def nudge_nodeinfo(iface: Any) -> None:
"""Force the device behind ``iface`` to broadcast a fresh NodeInfo.
Sends a ``ToRadio.Heartbeat(nonce=1)`` — the firmware's documented
on-demand NodeInfo trigger (see `src/mesh/api/PacketAPI.cpp:74-79`
for TCP/UDP and `src/mesh/PhoneAPI.cpp::handleToRadio` for serial,
both routed to `NodeInfoModule::sendOurNodeInfo(..., shorterTimeout=true)`
with the 60-s window rather than the 10-min rate-limit).
Call on BOTH TX and RX ifaces before a directed PKI-encrypted send.
Nudging only one side leaves the other with a stale pubkey cache and
makes the directed send NAK with PKI_UNKNOWN_PUBKEY.
"""
from meshtastic.protobuf import mesh_pb2 # type: ignore[import-untyped]
tr = mesh_pb2.ToRadio()
tr.heartbeat.nonce = 1
iface._sendToRadio(tr)
def nudge_nodeinfo_port(port: str) -> None:
"""Open ``port`` briefly, nudge, close — for when no iface is open yet.
Uses the meshtastic_mcp port-lock-aware `connect()` context manager
so we don't race ReceiveCollector or other long-lived handles on
the same port.
"""
from meshtastic_mcp.connection import connect
with connect(port=port) as iface:
nudge_nodeinfo(iface)
class ReceiveCollector:
"""Listen for meshtastic packets on `port` and let tests wait for a match.
Must be used as a context manager so the underlying SerialInterface is
always closed (leaked interfaces hold the CDC port open and break
subsequent tool calls).
Usage:
with ReceiveCollector(rx_port, topic="meshtastic.receive.text") as rx:
# ... send from TX ...
assert rx.wait_for(
lambda pkt: pkt.get("decoded", {}).get("text") == unique,
timeout=60,
), f"packet not received; got {rx.snapshot()!r}"
"""
def __init__(
self,
port: str,
topic: str = "meshtastic.receive",
capture_logs: bool = False,
) -> None:
self._port = port
self._topic = topic
self._capture_logs = capture_logs
self._packets: list[dict[str, Any]] = []
self._log_lines: list[str] = []
self._lock = threading.Lock()
self._iface = None
self._handler_ref = None # keep strong ref so pubsub doesn't GC it
self._log_handler_ref = None
def __enter__(self) -> "ReceiveCollector":
from meshtastic.serial_interface import (
SerialInterface, # type: ignore[import-untyped]
)
from pubsub import pub # type: ignore[import-untyped]
# pubsub uses weak refs by default — we stash a strong ref so the
# handler doesn't disappear between subscribe and wait_for.
def handler(packet: dict, interface: Any) -> None:
with self._lock:
self._packets.append(packet)
self._handler_ref = handler
pub.subscribe(handler, self._topic)
# Firmware-side logs come through the SAME SerialInterface when
# `config.security.debug_log_api_enabled = True`. Subscribing here
# captures them for failure-artifact attachment without needing a
# separate pio monitor session that would fight our port lock.
if self._capture_logs:
def log_handler(line: str, interface: Any) -> None:
with self._lock:
self._log_lines.append(line)
self._log_handler_ref = log_handler
pub.subscribe(log_handler, "meshtastic.log.line")
self._iface = SerialInterface(devPath=self._port, connectNow=True)
# Let the config bootstrap complete so we don't miss early arrivals.
time.sleep(1.0)
return self
def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> None:
from pubsub import pub # type: ignore[import-untyped]
if self._handler_ref is not None:
try:
pub.unsubscribe(self._handler_ref, self._topic)
except Exception:
pass
if self._log_handler_ref is not None:
try:
pub.unsubscribe(self._log_handler_ref, "meshtastic.log.line")
except Exception:
pass
if self._iface is not None:
try:
self._iface.close()
except Exception:
pass
def snapshot(self) -> list[dict[str, Any]]:
"""Return a thread-safe copy of the list of collected packets."""
with self._lock:
return list(self._packets)
def log_snapshot(self) -> list[str]:
"""Return captured firmware log lines.
Only populated if `capture_logs=True` AND the device has
`security.debug_log_api_enabled=True`.
"""
with self._lock:
return list(self._log_lines)
def send_text(
self,
text: str,
destination_id: Any = "^all",
want_ack: bool = False,
channel_index: int = 0,
) -> Any:
"""Send a text packet through the already-open SerialInterface.
Use this when a test also has a ReceiveCollector open on the same port
— `admin.send_text(port=...)` would try to open a second SerialInterface
and fail the port lock.
"""
if self._iface is None:
raise RuntimeError("ReceiveCollector not started; use as context manager")
return self._iface.sendText(
text,
destinationId=destination_id,
wantAck=want_ack,
channelIndex=channel_index,
)
def broadcast_nodeinfo_ping(self) -> None:
"""Force the firmware on `port` to broadcast a fresh NodeInfo.
Thin wrapper around the module-level :func:`nudge_nodeinfo` that
also validates the context-manager invariant. Delegates so tests
that need to nudge BOTH sides (bilateral PKI warmup) share one
implementation — the caller just passes each iface in turn.
Firmware-side details (rate-limit bypass, nonce==1 trigger path,
shorterTimeout=true window) are documented on the module-level
helper.
"""
if self._iface is None:
raise RuntimeError("ReceiveCollector not started; use as context manager")
nudge_nodeinfo(self._iface)
def wait_for(
self,
predicate: Callable[[dict[str, Any]], bool],
timeout: float = 60.0,
poll_interval: float = 0.5,
) -> dict[str, Any] | None:
"""Block until a received packet matches `predicate` or timeout.
Returns the matching packet (truthy) or None (falsy).
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
with self._lock:
for pkt in self._packets:
try:
if predicate(pkt):
return pkt
except Exception:
continue
time.sleep(poll_interval)
return None