Add tests for config persistence and more exchange messages

This commit is contained in:
Ben Meadors
2026-04-18 06:42:31 -05:00
parent 3e59db6f3e
commit a48eb5ff05
6 changed files with 479 additions and 12 deletions

View File

@@ -138,13 +138,107 @@ _The tool tables below document 38 currently registered MCP server tools._
## Environment variables
| Var | Default | Purpose |
| -------------------------- | ----------------------------------------------------------- | ----------------------- |
| `MESHTASTIC_FIRMWARE_ROOT` | walks up from cwd for `platformio.ini` | Pin the firmware repo |
| `MESHTASTIC_PIO_BIN` | `~/.platformio/penv/bin/pio``$PATH` `pio``platformio` | Override `pio` location |
| `MESHTASTIC_ESPTOOL_BIN` | `<firmware>/.venv/bin/esptool``$PATH` | Override esptool |
| `MESHTASTIC_NRFUTIL_BIN` | `$PATH` | Override nrfutil |
| `MESHTASTIC_PICOTOOL_BIN` | `$PATH` | Override picotool |
| Var | Default | Purpose |
| -------------------------- | ----------------------------------------------------------- | ------------------------------------------------------------------- |
| `MESHTASTIC_FIRMWARE_ROOT` | walks up from cwd for `platformio.ini` | Pin the firmware repo |
| `MESHTASTIC_PIO_BIN` | `~/.platformio/penv/bin/pio``$PATH` `pio``platformio` | Override `pio` location |
| `MESHTASTIC_ESPTOOL_BIN` | `<firmware>/.venv/bin/esptool``$PATH` | Override esptool |
| `MESHTASTIC_NRFUTIL_BIN` | `$PATH` | Override nrfutil |
| `MESHTASTIC_PICOTOOL_BIN` | `$PATH` | Override picotool |
| `MESHTASTIC_MCP_SEED` | `mcp-<user>-<host>` | PSK seed for test-harness session (CI override) |
| `MESHTASTIC_MCP_FLASH_LOG` | `<mcp-server>/tests/flash.log` | Tee target for pio/esptool/nrfutil subprocess output (TUI tails it) |
## Hardware Test Suite
`mcp-server/tests/` holds a pytest-based integration suite that exercises
real USB-connected Meshtastic devices against the MCP server surface. Separate
from the native C++ unit tests in the firmware repo's top-level `test/`
directory — this one validates the device-facing behavior end-to-end.
### Invocation
```bash
./mcp-server/run-tests.sh # full suite (auto-detect + auto-bake-if-needed)
./mcp-server/run-tests.sh --force-bake # reflash devices before testing
./mcp-server/run-tests.sh --assume-baked # skip the bake step (caller vouches for state)
./mcp-server/run-tests.sh tests/mesh # one tier
./mcp-server/run-tests.sh tests/mesh/test_traceroute.py # one file
./mcp-server/run-tests.sh -k telemetry # pytest name filter
```
The wrapper auto-detects connected devices (VID `0x239A``nrf52` → env
`rak4631`; `0x303A` or `0x10C4``esp32s3` → env `heltec-v3`), exports
`MESHTASTIC_MCP_ENV_<ROLE>` env vars, and invokes pytest. Overrides via
per-role env vars: `MESHTASTIC_MCP_ENV_NRF52=heltec-mesh-node-t114 ./run-tests.sh`.
No hardware connected? The wrapper narrows to `tests/unit/` only and says so
in the pre-flight header.
### Tiers (run in this order)
- **`bake`** (`tests/test_00_bake.py`) — flashes both hub roles with the
session's test profile. Has a skip-if-already-baked check (region + channel
match); `--force-bake` overrides.
- **`unit`** — pure Python, no hardware. boards / PIO wrapper /
userPrefs-parse / testing-profile fixtures.
- **`mesh`** — 2-device mesh: formation, broadcast delivery, direct+ACK,
traceroute, position broadcast, bidirectional. Parametrized over both
directions.
- **`telemetry`** — periodic telemetry broadcast + on-demand request/reply
(`TELEMETRY_APP` with `wantResponse=True`).
- **`monitor`** — boot log has no panic markers within 60 s of reboot.
- **`fleet`** — PSK-seed isolation: two labs with different seeds never
overlap.
- **`admin`** — owner persistence across reboot, channel URL round-trip,
`lora.hop_limit` persistence.
- **`provisioning`** — region/channel baking, userPrefs survive
`factory_reset(full=False)`.
### Artifacts (regenerated every run, under `tests/`)
- `report.html` — self-contained pytest-html report. Each test gets a
**Meshtastic debug** section attached on failure with a 200-line firmware
log tail + device-state dump. Open this first on failures.
- `junit.xml` — CI-parseable.
- `reportlog.jsonl``pytest-reportlog` event stream; consumed by the TUI.
- `fwlog.jsonl` — firmware log mirror (`meshtastic.log.line` pubsub → JSONL).
- `flash.log` — tee of all pio / esptool / nrfutil / picotool subprocess
output during the run (driven by `MESHTASTIC_MCP_FLASH_LOG`).
### Live TUI
```bash
.venv/bin/meshtastic-mcp-test-tui
.venv/bin/meshtastic-mcp-test-tui tests/mesh # pytest args pass through
```
Textual-based wrapper over `run-tests.sh` with a live test tree, tier
counters, pytest output pane, firmware-log pane, and a device-status strip.
Key bindings: `r` re-run focused, `f` filter, `d` failure detail, `g` open
`report.html`, `x` export reproducer bundle, `l` cycle fw-log filter, `q`
quit (SIGINT → SIGTERM → SIGKILL escalation).
### Slash commands
Three AI-assisted workflows are wired up for Claude Code operators
(`.claude/commands/`) and Copilot operators (`.github/prompts/`):
`/test` (run + interpret), `/diagnose` (read-only health report), `/repro`
(flake triage, N-times re-run with log diff).
### House rules (for human + agent contributors)
- Session-scoped fixtures in `tests/conftest.py` snapshot + restore
`userPrefs.jsonc`; **never edit `userPrefs.jsonc` from inside a test**.
Use the `test_profile` / `no_region_profile` fixtures for ephemeral
overrides.
- `SerialInterface` holds an **exclusive port lock**; sequence calls
open → mutate → close, then next device. No parallel calls to the
same port.
- Directed PKI-encrypted sends need **bilateral NodeInfo warmup**
both sides must hold the other's current pubkey. See
`tests/mesh/_receive.py::nudge_nodeinfo_port` and the three directed-
send tests (`test_direct_with_ack`, `test_traceroute`,
`test_telemetry_request_reply`) for the canonical pattern.
## Layout

View File

@@ -221,7 +221,12 @@ def set_config(path: str, value: Any, port: str | None = None) -> dict[str, Any]
# Treat the section as the root; the rest of the path walks into it.
leaf_parent, field = _walk_to_field(container, segments[1:] or [])
if field.label == pb_descriptor.FieldDescriptor.LABEL_REPEATED:
# Use `is_repeated` (modern upb protobuf API) rather than the
# deprecated `label == LABEL_REPEATED` check — the C-extension
# FieldDescriptor in protobuf >= 5.x doesn't expose `.label` at
# all, and `is_repeated` is the supported replacement that works
# across both the pure-python and upb backends.
if field.is_repeated:
raise AdminError(
f"{path!r} is a repeated field; v1 only supports scalar sets. "
"Use the raw meshtastic CLI for now."

View File

@@ -0,0 +1,118 @@
"""Role-to-port rediscovery after USB CDC re-enumeration.
Used by tests that mutate device identity in ways macOS treats as a
"new device" — notably ``factory_reset(full=False)`` on the nRF52840 and
any operation that kicks the device through its bootloader. Both cases
cause the kernel to re-assign the ``/dev/cu.usbmodem*`` path; a test that
captured the pre-operation port and reuses it after will fail with
``FileNotFoundError``.
The helper polls :func:`meshtastic_mcp.devices.list_devices` (the same API
``run-tests.sh`` and ``conftest.py::hub_devices`` use for initial hub
detection) filtered by the role's canonical USB VID. Returns the first
matching port — equivalent to "give me the single nRF52 (or ESP32-S3) on
the bench right now, whichever `cu.*` path it happens to be at".
Test-harness-local (not exported from ``meshtastic_mcp``): a thin wrapper
over public ``devices.list_devices`` with no extra moving parts. If a
non-test caller ever needs this, it's trivial to promote.
Caveat: the session-scoped ``hub_devices`` fixture snapshots ports at
session start and is dict-keyed — it doesn't learn about re-enumerations.
Tests that call ``resolve_port_by_role`` should use the returned port
locally for the rest of the test body rather than expecting
``hub_devices[role]`` to update.
"""
from __future__ import annotations
import time
from meshtastic_mcp import devices as devices_module
# Role → canonical VID(s). Kept in sync with:
# - `mcp-server/run-tests.sh` (ROLE_BY_VID)
# - `mcp-server/tests/conftest.py::hub_profile`
# If any of those change, this must too.
_ROLE_VIDS: dict[str, tuple[int, ...]] = {
"nrf52": (0x239A,), # Adafruit / RAK nRF52840 native USB
"esp32s3": (0x303A, 0x10C4), # Espressif native USB + CP2102 USB-UART
}
def _coerce_vid(raw: object) -> int | None:
"""`devices.list_devices` returns vid as either '0x239a' or an int;
normalize to int. None on un-parseable input (matches the same fault-
tolerance `run-tests.sh` uses for its role detection)."""
if raw is None:
return None
if isinstance(raw, int):
return raw
if isinstance(raw, str):
try:
return int(raw, 16) if raw.lower().startswith("0x") else int(raw)
except ValueError:
return None
return None
def resolve_port_by_role(
role: str,
*,
timeout_s: float = 30.0,
poll_start: float = 0.5,
poll_max: float = 5.0,
) -> str:
"""Return the current ``/dev/cu.*`` path for ``role`` once one appears.
Polls ``devices.list_devices(include_unknown=True)`` every ``poll_start``
seconds (1.5× backoff, capped at ``poll_max``) until a device matching
``role``'s VID appears. Returns the first matching port.
On timeout raises :class:`AssertionError` with the list of devices that
WERE seen — helpful when debugging "wrong board connected" vs. "no
board connected" vs. "still re-enumerating".
Args:
role: ``"nrf52"`` or ``"esp32s3"`` (keys of ``_ROLE_VIDS``).
timeout_s: upper bound on how long to wait for the device to
re-appear. Default 30 s — nRF52 factory_reset observed at
2-12 s on a healthy lab hub.
poll_start: initial poll interval in seconds. Default 0.5 s.
poll_max: cap on poll interval after backoff. Default 5 s.
Raises:
AssertionError: if no matching device appears within ``timeout_s``.
ValueError: if ``role`` is not in ``_ROLE_VIDS``.
"""
if role not in _ROLE_VIDS:
raise ValueError(f"unknown role {role!r}; expected one of {sorted(_ROLE_VIDS)}")
wanted_vids = _ROLE_VIDS[role]
deadline = time.monotonic() + timeout_s
delay = poll_start
last_seen: list[dict] = []
while time.monotonic() < deadline:
try:
last_seen = devices_module.list_devices(include_unknown=True)
except Exception as exc:
# list_devices is wrapped by meshtastic_mcp.devices and
# shouldn't raise on normal enumeration — but a kernel-level
# USB hiccup during re-enumeration can bubble up briefly.
# Treat as "nothing seen this round" and retry.
last_seen = [{"error": repr(exc)}]
for dev in last_seen:
vid = _coerce_vid(dev.get("vid"))
if vid is not None and vid in wanted_vids and dev.get("port"):
return dev["port"]
time.sleep(delay)
delay = min(delay * 1.5, poll_max)
# Timeout path — include what we saw so the operator can tell
# "nothing plugged in" from "wrong VID" from "transient USB error".
raise AssertionError(
f"no device matching role {role!r} (VIDs "
f"{[hex(v) for v in wanted_vids]}) appeared within {timeout_s:.0f}s. "
f"Last enumeration: {last_seen!r}"
)

View File

@@ -0,0 +1,106 @@
"""Admin: a config mutation survives a reboot.
This is the most-critical admin behavior not tested elsewhere. If
config persistence breaks in a firmware release, every deployed device
gets bricked on its next reboot (channels lost, region lost, owner lost,
everything back to Meshtastic stock). The fleet blast radius is "every
unit on every shelf" — easily worth one explicit test per release.
Pattern: single-device (``baked_single``, one test per role). Mutate a
benign, easy-to-observe LoRa field (``lora.hop_limit``), confirm
pre-reboot, reboot, rediscover port (nRF52 may re-enumerate), verify
the value survived, restore original for downstream tests.
Why ``lora.hop_limit`` specifically:
* Non-destructive — doesn't change region, channel, or PSK, so
downstream mesh tests still work regardless of the flipped value.
* Bounded small-integer (1..7) — easy to flip to a definitively
different value and read back.
* Persisted via ``writeConfig("lora")`` which is the same path
every other LoRa config mutation uses, so we're really testing
the whole lora-config persistence pipeline end-to-end.
"""
from __future__ import annotations
import time
from typing import Any
import pytest
from meshtastic_mcp import admin, info
from .._port_discovery import resolve_port_by_role
def _get_hop_limit(port: str) -> int:
"""Read `lora.hop_limit` from the device's current config."""
lora = admin.get_config("lora", port=port).get("config", {}).get("lora", {})
hl = lora.get("hop_limit")
assert isinstance(hl, int), (
f"lora.hop_limit missing or non-int in get_config response: " f"{lora!r}"
)
return hl
@pytest.mark.timeout(180)
def test_lora_hop_limit_survives_reboot(
baked_single: dict[str, Any],
wait_until,
) -> None:
"""Runs once per connected role. Mutates `lora.hop_limit`, reboots,
verifies the new value is still there after the device comes back.
"""
role = baked_single["role"]
port = baked_single["port"]
original = _get_hop_limit(port)
# Flip to a definitively different value within the protocol's
# valid range (1..7 per LoRaConfig.hop_limit comment). Pick 5 if
# current is != 5, else 4.
new_value = 5 if original != 5 else 4
try:
admin.set_config("lora.hop_limit", new_value, port=port)
# Pre-reboot sanity: the write reached the device and
# get_config reflects it in-memory. If this fails, the persist
# test below is moot — something's wrong with the write path
# itself, not with persistence.
assert _get_hop_limit(port) == new_value, (
f"pre-reboot readback failed: set {new_value}, got "
f"{_get_hop_limit(port)}"
)
# Reboot. `seconds=3` gives the Python client time to
# disconnect cleanly; sleep long enough for the boot to start
# before we begin polling.
admin.reboot(port=port, confirm=True, seconds=3)
time.sleep(8.0)
# nRF52 re-enumerates on reboot → rediscover.
port = resolve_port_by_role(role, timeout_s=60.0)
wait_until(
lambda: info.device_info(port=port, timeout_s=5.0).get("my_node_num")
is not None,
timeout=60,
backoff_start=1.0,
)
# The assertion this test exists for: the mutation persisted
# across the reboot cycle through NVS / LittleFS / UICR.
post = _get_hop_limit(port)
assert post == new_value, (
f"lora.hop_limit did not survive reboot: set to {new_value} "
f"pre-reboot, read back {post} post-reboot. Config persistence "
f"is broken — downstream fleet impact would be total."
)
finally:
# Restore so downstream tests see the original hop_limit.
# Wrapped in its own try to avoid masking the real assertion
# if the restore itself races the reboot — the worst case
# there is a non-default hop_limit sticks around, which is
# benign (mesh still works at hop_limit 3 or 5).
try:
admin.set_config("lora.hop_limit", original, port=port)
except Exception:
pass

View File

@@ -0,0 +1,130 @@
"""Mesh: a fixed-position broadcast from TX arrives decoded on RX.
Exercises ``POSITION_APP`` (portnum 3) — the core Meshtastic portnum that
map apps and fleet dashboards depend on. Without real GPS hardware on
either device we put TX into **fixed-position** mode with a short
broadcast interval; the firmware's ``PositionModule::runOnce`` then
emits a Position packet every N seconds independent of GPS state. RX
listens on ``meshtastic.receive.position`` and verifies the decode.
Scope: validating the *routing + decode* path through
``modules/PositionModule.cpp``, not GPS accuracy. We don't set or assert
any specific lat/lon — the test is "a Position protobuf rode the wire
and was decoded correctly", which is what a downstream map client needs.
Why directed/PKI warmup matters: the config writes (``set_config``) are
directed admin sends. They don't fire unless the firmware trusts the
sender — same PKI staleness trap as the other directed-send tests, so
we do the bilateral warmup first.
"""
from __future__ import annotations
import time
from typing import Any
import pytest
from meshtastic_mcp import admin
from ._receive import ReceiveCollector, nudge_nodeinfo_port
def _restore_position_config(port: str, orig: dict[str, Any]) -> None:
"""Best-effort teardown: restore the 3 config fields this test mutates.
Runs in a ``finally`` block — if the test failed mid-way we still
want the device back to its pre-test state so downstream tests
don't inherit a short broadcast interval + fixed-position flag.
Swallows exceptions to avoid masking the underlying assertion.
"""
for path, value in (
("position.fixed_position", orig.get("fixed_position", False)),
(
"position.position_broadcast_secs",
orig.get("position_broadcast_secs", 0),
),
(
"position.position_broadcast_smart_enabled",
orig.get("position_broadcast_smart_enabled", True),
),
):
try:
admin.set_config(path, value, port=port)
except Exception:
pass
@pytest.mark.timeout(240)
def test_position_broadcast_and_receive(mesh_pair: dict[str, Any]) -> None:
"""Runs for every directed pair. TX emits a periodic Position; RX
receives and decodes it via the ``receive.position`` pubsub topic.
"""
tx_port = mesh_pair["tx"]["port"]
rx_port = mesh_pair["rx"]["port"]
tx_node_num = mesh_pair["tx"]["my_node_num"]
tx_role = mesh_pair["tx_role"]
rx_role = mesh_pair["rx_role"]
assert tx_node_num is not None, f"{tx_role} my_node_num missing"
# Snapshot current position config so teardown can restore.
orig_position = (
admin.get_config("position", port=tx_port).get("config", {}).get("position", {})
)
try:
# Configure TX for periodic fixed-position broadcasts. 30 s is a
# compromise: short enough for the test to finish inside the
# 240 s timeout, long enough that one dropped broadcast still
# leaves a 90 s window for the next. smart_enabled=False
# disables the distance/interval throttle in
# `PositionModule::hasChanged` so we get broadcasts on a fixed
# cadence instead of "only when position changes".
admin.set_config("position.fixed_position", True, port=tx_port)
admin.set_config("position.position_broadcast_secs", 30, port=tx_port)
admin.set_config(
"position.position_broadcast_smart_enabled", False, port=tx_port
)
# Small settle: the firmware applies config writes asynchronously,
# and the next broadcast is scheduled off the *new* interval. A
# brief pause keeps a racy first-broadcast-at-the-old-interval
# from confusing the listener.
time.sleep(2.0)
# Bilateral PKI warmup — broadcast packets aren't PKI-encrypted
# (they use channel keys), but we also want both sides in each
# other's node tables so `PositionModule::hasChanged` doesn't
# suppress the broadcast as "no peers to tell". NodeInfo pings
# achieve both.
nudge_nodeinfo_port(tx_port)
nudge_nodeinfo_port(rx_port)
time.sleep(3.0)
with ReceiveCollector(rx_port, topic="meshtastic.receive.position") as rx:
got = rx.wait_for(
lambda pkt: pkt.get("from") == tx_node_num,
timeout=120.0, # 2× the broadcast_secs for one retransmit
)
assert got is not None, (
f"no Position packet from {tx_role} (0x{tx_node_num:08x}) within "
f"120 s; RX ({rx_role}) saw {len(rx.snapshot())} position packet(s) "
f"from {[hex(p.get('from') or 0) for p in rx.snapshot()]!r}. "
f"Check that TX's PositionModule is enabled and the broadcast "
f"schedule isn't throttled elsewhere."
)
# Validate the decode path. `decoded.position` is the
# MessageToDict version of the Position proto; the protobuf
# serializer strips default-valued optional fields, so we can't
# rely on lat/lon being present (fixed-position without stored
# coords is a valid zero-state). What we CAN rely on is the
# outer `position` sub-dict existing and the `raw` protobuf
# parsing cleanly — those prove the POSITION_APP portnum
# decoded without errors.
decoded = got.get("decoded", {})
assert "position" in decoded, (
f"packet from {tx_role} had no `decoded.position` — "
f"POSITION_APP decode failed. decoded={decoded!r}"
)
finally:
_restore_position_config(tx_port, orig_position)

View File

@@ -14,6 +14,8 @@ from typing import Any
import pytest
from meshtastic_mcp import admin, info
from .._port_discovery import resolve_port_by_role
@pytest.mark.timeout(180)
def test_baked_prefs_survive_factory_reset(
@@ -24,10 +26,14 @@ def test_baked_prefs_survive_factory_reset(
"""Runs once per connected role. Flow:
1. Change owner name to a known-non-default value.
2. Trigger factory_reset(full=False).
3. Wait for device to come back.
4. Confirm owner is back to USERPREFS-baked default (or blank default if
3. Rediscover the port (macOS re-enumerates the CDC endpoint on nRF52
factory_reset; the path can change e.g. `/dev/cu.usbmodem101` →
`/dev/cu.usbmodem1101`).
4. Wait for device to come back.
5. Confirm owner is back to USERPREFS-baked default (or blank default if
not baked), and primary channel/region/slot are still the baked values.
"""
role = baked_single["role"]
port = baked_single["port"]
# Snapshot pre-reset config
@@ -45,8 +51,16 @@ def test_baked_prefs_survive_factory_reset(
# Trigger non-full factory reset
admin.factory_reset(port=port, confirm=True, full=False)
# Wait for device to come back (serial reappears)
time.sleep(10.0) # reset takes a moment
# Device re-enumerates — rediscover its port before probing. nRF52's
# CDC endpoint drops and comes back with a new `/dev/cu.usbmodem*`
# path on macOS; ESP32-S3 usually keeps the same path but the helper
# works either way (it just returns the current path for this role).
# Early sleep lets the USB kernel driver settle before we start
# polling — list_devices during a transient re-enumeration can return
# an empty list and the helper's poll-with-backoff handles that too,
# so the sleep is optimization not correctness.
time.sleep(10.0)
port = resolve_port_by_role(role, timeout_s=60.0)
wait_until(
lambda: info.device_info(port=port, timeout_s=5.0).get("my_node_num")
is not None,