Add TCP support for Meshtastic MCP interface / tests and update docs (#10355)

* Add TCP support for Meshtastic MCP interface / tests and update docs

* Address TCP endpoint validation and error handling in connection

* TCP connection handling and device listing logic

* Fix docstring formatting in normalize_tcp_endpoint function
This commit is contained in:
Ben Meadors
2026-04-30 13:51:29 -05:00
committed by GitHub
parent 173ac58ed7
commit 21cef8c2e5
9 changed files with 714 additions and 39 deletions

View File

@@ -575,6 +575,8 @@ Grouped by purpose. Full argument shapes in `mcp-server/README.md`; a few high-v
`confirm=True` is a tool-level gate on top of whatever permission prompt your MCP host shows. **Don't bypass it** by asking the host to auto-approve — it exists specifically because MCP hosts sometimes remember "always allow this tool" and that's dangerous for `factory_reset`, `erase_and_flash`, `uhubctl_power(action='off')`, and `uhubctl_cycle`.
**TCP / native-host nodes.** Setting `MESHTASTIC_MCP_TCP_HOST=<host[:port]>` makes `list_devices` surface a `meshtasticd` daemon (e.g. the `native-macos` build) as a synthetic `tcp://host:port` entry, and `connect()` routes through `meshtastic.tcp_interface.TCPInterface` instead of `SerialInterface`. Every read/write/admin tool that flows through `connect()` works against the daemon transparently. USB-only tools (`pio_flash`, `erase_and_flash`, `update_flash`, `touch_1200bps`, `serial_open`, `esptool_*`, `nrfutil_*`, `picotool_*`) raise a clear `ConnectionError` when handed a `tcp://` port; `pio_flash` against a `native*` env raises a `FlashError` (no upload step — use `build` and run the binary directly). The pytest harness still assumes USB-attached devices per role; TCP-aware fixtures are deferred. See `mcp-server/README.md` § "TCP / native-host nodes".
### Hardware test suite (`mcp-server/run-tests.sh`)
The wrapper auto-detects connected devices (VID → role map: `0x239A``nrf52`, `0x303A`/`0x10C4``esp32s3`), maps each role to a PlatformIO env (`nrf52``rak4631`, `esp32s3``heltec-v3`, overridable via `MESHTASTIC_MCP_ENV_<ROLE>`), then invokes pytest. Zero pre-flight config needed from the operator.

View File

@@ -126,16 +126,17 @@ Sequence these; don't parallelize on the same port.
## Environment variables (test harness)
| Var | Purpose |
| ------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------- |
| `MESHTASTIC_MCP_ENV_<ROLE>` | Override PlatformIO env for a role (e.g. `MESHTASTIC_MCP_ENV_NRF52=rak4631-dap`). Default map: `nrf52→rak4631`, `esp32s3→heltec-v3`. |
| `MESHTASTIC_MCP_SEED` | PSK seed for the session test profile. Defaults to `mcp-<user>-<host>`. |
| `MESHTASTIC_MCP_FLASH_LOG` | File path to tee pio/esptool/nrfutil/picotool output. `run-tests.sh` sets this to `tests/flash.log` so the TUI can stream live flash progress. |
| `MESHTASTIC_UHUBCTL_BIN` | Absolute path to `uhubctl` binary. Default: PATH lookup. |
| `MESHTASTIC_UHUBCTL_LOCATION_<ROLE>` | Pin a role to a specific uhubctl hub location (e.g. `1-1.3`). Wins over VID auto-detection — use when multiple devices share a VID. |
| `MESHTASTIC_UHUBCTL_PORT_<ROLE>` | Pin a role to a specific hub port number. Required alongside `LOCATION_<ROLE>`. |
| `MESHTASTIC_UI_CAMERA_BACKEND` | Camera backend for UI tier + `capture_screen` tool: `opencv` / `ffmpeg` / `null` / `auto` (default). |
| `MESHTASTIC_UI_CAMERA_DEVICE` | Generic camera device (index or path). Used by the UI tier when no per-role var is set. |
| `MESHTASTIC_UI_CAMERA_DEVICE_<ROLE>` | Per-role camera pinning (e.g. `MESHTASTIC_UI_CAMERA_DEVICE_ESP32S3=0` for the OLED-bearing heltec-v3). |
| `MESHTASTIC_UI_OCR_BACKEND` | OCR engine selection: `easyocr` / `pytesseract` / `null` / `auto` (default). |
| `MESHTASTIC_UI_TUI_CAMERA` | Set to `1` to mount the live camera-feed panel in `meshtastic-mcp-test-tui`. |
| Var | Purpose |
| ------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `MESHTASTIC_MCP_ENV_<ROLE>` | Override PlatformIO env for a role (e.g. `MESHTASTIC_MCP_ENV_NRF52=rak4631-dap`). Default map: `nrf52→rak4631`, `esp32s3→heltec-v3`. |
| `MESHTASTIC_MCP_SEED` | PSK seed for the session test profile. Defaults to `mcp-<user>-<host>`. |
| `MESHTASTIC_MCP_FLASH_LOG` | File path to tee pio/esptool/nrfutil/picotool output. `run-tests.sh` sets this to `tests/flash.log` so the TUI can stream live flash progress. |
| `MESHTASTIC_MCP_TCP_HOST` | `host` or `host:port` of a `meshtasticd` daemon (e.g. the `native-macos` build). Surfaces it in `list_devices` as `tcp://host:port` so `connect()`-based tools target it transparently. Default port 4403. |
| `MESHTASTIC_UHUBCTL_BIN` | Absolute path to `uhubctl` binary. Default: PATH lookup. |
| `MESHTASTIC_UHUBCTL_LOCATION_<ROLE>` | Pin a role to a specific uhubctl hub location (e.g. `1-1.3`). Wins over VID auto-detection — use when multiple devices share a VID. |
| `MESHTASTIC_UHUBCTL_PORT_<ROLE>` | Pin a role to a specific hub port number. Required alongside `LOCATION_<ROLE>`. |
| `MESHTASTIC_UI_CAMERA_BACKEND` | Camera backend for UI tier + `capture_screen` tool: `opencv` / `ffmpeg` / `null` / `auto` (default). |
| `MESHTASTIC_UI_CAMERA_DEVICE` | Generic camera device (index or path). Used by the UI tier when no per-role var is set. |
| `MESHTASTIC_UI_CAMERA_DEVICE_<ROLE>` | Per-role camera pinning (e.g. `MESHTASTIC_UI_CAMERA_DEVICE_ESP32S3=0` for the OLED-bearing heltec-v3). |
| `MESHTASTIC_UI_OCR_BACKEND` | OCR engine selection: `easyocr` / `pytesseract` / `null` / `auto` (default). |
| `MESHTASTIC_UI_TUI_CAMERA` | Set to `1` to mount the live camera-feed panel in `meshtastic-mcp-test-tui`. |

View File

@@ -166,15 +166,73 @@ rather than auto-`sudo`'ing mid-run.
## Environment variables
| Var | Default | Purpose |
| -------------------------- | ----------------------------------------------------------- | ------------------------------------------------------------------- |
| `MESHTASTIC_FIRMWARE_ROOT` | walks up from cwd for `platformio.ini` | Pin the firmware repo |
| `MESHTASTIC_PIO_BIN` | `~/.platformio/penv/bin/pio``$PATH` `pio``platformio` | Override `pio` location |
| `MESHTASTIC_ESPTOOL_BIN` | `<firmware>/.venv/bin/esptool``$PATH` | Override esptool |
| `MESHTASTIC_NRFUTIL_BIN` | `$PATH` | Override nrfutil |
| `MESHTASTIC_PICOTOOL_BIN` | `$PATH` | Override picotool |
| `MESHTASTIC_MCP_SEED` | `mcp-<user>-<host>` | PSK seed for test-harness session (CI override) |
| `MESHTASTIC_MCP_FLASH_LOG` | `<mcp-server>/tests/flash.log` | Tee target for pio/esptool/nrfutil subprocess output (TUI tails it) |
| Var | Default | Purpose |
| -------------------------- | ----------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------- |
| `MESHTASTIC_FIRMWARE_ROOT` | walks up from cwd for `platformio.ini` | Pin the firmware repo |
| `MESHTASTIC_PIO_BIN` | `~/.platformio/penv/bin/pio``$PATH` `pio``platformio` | Override `pio` location |
| `MESHTASTIC_ESPTOOL_BIN` | `<firmware>/.venv/bin/esptool``$PATH` | Override esptool |
| `MESHTASTIC_NRFUTIL_BIN` | `$PATH` | Override nrfutil |
| `MESHTASTIC_PICOTOOL_BIN` | `$PATH` | Override picotool |
| `MESHTASTIC_MCP_SEED` | `mcp-<user>-<host>` | PSK seed for test-harness session (CI override) |
| `MESHTASTIC_MCP_FLASH_LOG` | `<mcp-server>/tests/flash.log` | Tee target for pio/esptool/nrfutil subprocess output (TUI tails it) |
| `MESHTASTIC_MCP_TCP_HOST` | unset | `host` or `host:port` of a `meshtasticd` daemon to surface as a TCP device (see "TCP / native-host nodes" below) |
## TCP / native-host nodes
The `native-macos` and `native` PlatformIO envs build a headless `meshtasticd`
binary that runs on the host (Apple Silicon / Intel macOS, or Linux Portduino).
The daemon exposes the meshtastic TCP API on port `4403` rather than a USB
serial endpoint — point the MCP server at it via `MESHTASTIC_MCP_TCP_HOST`:
```bash
# 1. Build + run a daemon on this host (see variants/native/portduino/platformio.ini
# for full Homebrew prereqs and CH341 LoRa-adapter setup).
pio run -e native-macos
~/.meshtasticd/meshtasticd
# 2. Point the MCP server at it.
export MESHTASTIC_MCP_TCP_HOST=localhost # or host:port, default port 4403
```
**First-run gotcha — MAC address.** `meshtasticd` derives its MAC from the
USB adapter's serial-number / product strings. Many cheap CH341 dongles
(MeshStick included — VID 0x1A86 / PID 0x5512) ship with `iSerialNumber=0`
and `iProduct=0`, so the daemon aborts on boot with `*** Blank MAC Address
not allowed!`. Set the MAC explicitly in `config.yaml`:
```yaml
# Under General:
MACAddress: 02:CA:FE:BA:BE:01
```
Use a locally-administered address (first byte's second-LSB set, e.g.
`02:*` / `06:*` / `0A:*` / `0E:*`) to avoid colliding with a real OUI.
There is also a `--hwid AA:BB:CC:DD:EE:FF` CLI flag visible in
`meshtasticd --help`, but it is **currently broken** in
`MAC_from_string()` (`src/platform/portduino/PortduinoGlue.cpp`): the
function strips colons from its parameter but then reads bytes from the
global `portduino_config.mac_address`, so `--hwid` is silently overridden
when `MACAddress:` is also set, and crashes the daemon (uncaught
`std::invalid_argument: stoi: no conversion`) when it isn't. Use the YAML
form until that's fixed upstream.
`list_devices` will surface the daemon as `tcp://localhost:4403` with
`likely_meshtastic=True`, so `device_info`, `list_nodes`, `get_config`,
`set_config`, `set_owner`, `send_text`, `userprefs_*`, and the admin RPCs
auto-select it when no `port` is passed. Pass `port="tcp://other-host:9999"`
explicitly to target a different daemon.
**Tools that don't apply to a TCP/native node** (no USB hardware to operate
on) raise a clear `ConnectionError` rather than failing mysteriously:
`pio_flash`, `erase_and_flash`, `update_flash`, `touch_1200bps`,
`serial_open` (use info/admin tools directly), and the vendor escape hatches
`esptool_*`, `nrfutil_*`, `picotool_*`. `pio_flash` against a `native*` env
similarly raises — there's no upload step; use `build` and run the binary
directly.
The pytest harness in `tests/` still assumes USB-attached devices per role —
TCP-aware fixtures are not part of this surface yet.
## Hardware Test Suite

View File

@@ -1,4 +1,4 @@
"""Context manager for meshtastic.SerialInterface connections.
"""Context manager for meshtastic interface connections (serial + TCP).
Every info/admin tool goes through `connect(port)` so we have a single place
that:
@@ -6,8 +6,16 @@ that:
- fails fast if a serial_session is already holding the port,
- guarantees `.close()` is called, even on exception.
The `SerialInterface` blocks on construction waiting for the node database;
that's fine for v1 since every tool is a short-lived request.
Two transports:
- Serial: USB-attached firmware on `/dev/cu.*` / `/dev/ttyUSB*` / `COM*`.
- TCP: a `meshtasticd` daemon (e.g. the native macOS / Linux Portduino
headless build) addressed as `tcp://host[:port]` (default port 4403).
Surfaced by `devices.list_devices()` when `MESHTASTIC_MCP_TCP_HOST` is
set, so `resolve_port(None)` auto-selects it like a USB candidate.
Both `SerialInterface` and `TCPInterface` block on construction waiting for
the node database; that's fine for v1 since every tool is a short-lived
request.
"""
from __future__ import annotations
@@ -17,20 +25,107 @@ from typing import Iterator
from . import devices, registry
DEFAULT_TCP_PORT = 4403
TCP_SCHEME = "tcp://"
TCP_HOST_ENV = "MESHTASTIC_MCP_TCP_HOST"
class ConnectionError(RuntimeError):
pass
def is_tcp_port(port: str | None) -> bool:
return bool(port) and port.startswith(TCP_SCHEME)
def parse_tcp_port(port: str) -> tuple[str, int]:
"""Parse `tcp://host[:port]` → (host, port). Defaults to 4403.
Validates host shape (non-empty, no path separators) and port range
(1..65535). Raises `ConnectionError` on malformed input — never lets
a raw `ValueError` bubble up to a tool surface.
"""
if not port.startswith(TCP_SCHEME):
raise ConnectionError(
f"Invalid TCP endpoint {port!r}: expected '{TCP_SCHEME}host[:port]'."
)
rest = port[len(TCP_SCHEME) :]
if ":" in rest:
host, port_str = rest.rsplit(":", 1)
try:
tcp_port = int(port_str)
except ValueError as e:
raise ConnectionError(
f"Invalid TCP endpoint {port!r}: port {port_str!r} is not an integer."
) from e
else:
host, tcp_port = rest, DEFAULT_TCP_PORT
if not host:
raise ConnectionError(f"Invalid TCP endpoint {port!r}: empty host.")
if any(c in host for c in ("/", "\\")):
raise ConnectionError(
f"Invalid TCP endpoint {port!r}: host {host!r} contains a path "
"separator. TCP hostnames cannot contain '/' or '\\' — did you "
"pass a serial port path or a Windows drive path by mistake?"
)
if not (1 <= tcp_port <= 65535):
raise ConnectionError(
f"Invalid TCP endpoint {port!r}: port {tcp_port} out of range "
"(must be 1..65535)."
)
return host, tcp_port
def normalize_tcp_endpoint(endpoint: str) -> str:
r"""Normalize `host`, `host:port`, or `tcp://host[:port]` → canonical
`tcp://host:port` form. One place that owns the lock-key shape.
Defers all validation to `parse_tcp_port`, so path-like inputs
(`/dev/cu.foo`, `C:\Windows\…`), empty hosts, non-integer ports,
and out-of-range ports raise `ConnectionError` here too.
"""
if endpoint.startswith(TCP_SCHEME):
canonical = endpoint
elif ":" in endpoint:
canonical = f"{TCP_SCHEME}{endpoint}"
else:
canonical = f"{TCP_SCHEME}{endpoint}:{DEFAULT_TCP_PORT}"
host, port = parse_tcp_port(canonical)
return f"{TCP_SCHEME}{host}:{port}"
def reject_if_tcp(port: str | None, tool_name: str) -> None:
"""Raise if `port` is a TCP endpoint — for tools that need real USB
hardware (flash, bootloader, vendor escape hatches, serial monitor).
Only checks the explicit arg; auto-selection via env var is the caller's
responsibility to handle if it matters.
"""
if is_tcp_port(port):
raise ConnectionError(
f"{tool_name} is not applicable to TCP/native nodes ({port}). "
"This tool requires USB-attached hardware."
)
def resolve_port(port: str | None) -> str:
"""Pick a port: explicit > sole likely_meshtastic candidate > error."""
"""Pick a port: explicit > sole likely_meshtastic candidate > error.
A `tcp://` string passes through (after canonicalization). When `port`
is None and no USB candidates are present, `MESHTASTIC_MCP_TCP_HOST`
is consulted via `devices.list_devices()`.
"""
if port:
if is_tcp_port(port):
return normalize_tcp_endpoint(port)
return port
candidates = [d for d in devices.list_devices() if d["likely_meshtastic"]]
if not candidates:
raise ConnectionError(
"No Meshtastic devices detected. Plug one in or pass `port` explicitly. "
"Run `list_devices` with include_unknown=True to see all serial ports."
"No Meshtastic devices detected. Plug one in, set "
f"{TCP_HOST_ENV}=<host[:port]> for a meshtasticd daemon, "
"or pass `port` explicitly. Run `list_devices` with "
"include_unknown=True to see all serial ports."
)
if len(candidates) > 1:
ports = ", ".join(c["port"] for c in candidates)
@@ -43,17 +138,62 @@ def resolve_port(port: str | None) -> str:
@contextmanager
def connect(port: str | None = None, timeout_s: float = 8.0) -> Iterator:
"""Open a `meshtastic.SerialInterface` and always close it.
"""Open a meshtastic interface (serial or TCP) and always close it.
Raises `ConnectionError` immediately if another serial session holds the
port (a `pio device monitor` in `serial_sessions/`, for instance).
For serial: raises `ConnectionError` immediately if another serial
session holds the port (a `pio device monitor` in `serial_sessions/`).
For TCP: no exclusive-access requirement, so the serial-session check
is skipped — but the `port_lock` still serializes parallel `connect()`
calls to the same daemon endpoint.
`timeout_s` is plumbed through to both `SerialInterface(timeout=...)`
and `TCPInterface(timeout=...)`. The meshtastic library uses the value
as the reply-wait deadline for `localNode.waitForConfig()` during
construction and for any subsequent admin RPC. `int()`-converted at
the boundary because the upstream API expects whole seconds.
"""
resolved = resolve_port(port)
timeout = int(timeout_s)
if is_tcp_port(resolved):
from meshtastic.tcp_interface import (
TCPInterface, # type: ignore[import-untyped]
)
host, tcp_port = parse_tcp_port(resolved)
lock = registry.port_lock(resolved)
if not lock.acquire(blocking=False):
raise ConnectionError(
f"TCP endpoint {resolved} is busy — another device operation "
"is in flight. Retry shortly."
)
iface = None
try:
iface = TCPInterface(
hostname=host,
portNumber=tcp_port,
connectNow=True,
noProto=False,
timeout=timeout,
)
yield iface
finally:
if iface is not None:
try:
iface.close()
except Exception:
pass
try:
lock.release()
except RuntimeError:
pass
return
from meshtastic.serial_interface import (
SerialInterface, # type: ignore[import-untyped]
)
resolved = resolve_port(port)
active = registry.active_session_for_port(resolved)
if active is not None:
raise ConnectionError(
@@ -70,7 +210,12 @@ def connect(port: str | None = None, timeout_s: float = 8.0) -> Iterator:
iface = None
try:
iface = SerialInterface(devPath=resolved, connectNow=True, noProto=False)
iface = SerialInterface(
devPath=resolved,
connectNow=True,
noProto=False,
timeout=timeout,
)
yield iface
finally:
if iface is not None:

View File

@@ -1,13 +1,18 @@
"""USB/serial device discovery.
"""USB/serial + TCP device discovery.
Combines the canonical `meshtastic.util.findPorts()` allowlist/blocklist with
the richer metadata (`serial.tools.list_ports.comports()`) so callers see
VID/PID, descriptions, and manufacturer strings alongside the "is this likely
a Meshtastic device" signal.
If `MESHTASTIC_MCP_TCP_HOST=<host[:port]>` is set, a synthetic entry for the
`meshtasticd` daemon at that endpoint is prepended to the result, so
`resolve_port(None)` auto-selects it like a USB candidate.
"""
from __future__ import annotations
import os
from typing import Any
from serial.tools import list_ports
@@ -19,6 +24,45 @@ def _to_hex(value: int | None) -> str | None:
return f"0x{value:04x}"
def _tcp_endpoint_from_env() -> dict[str, Any] | None:
"""Synthesize a TCP device entry from MESHTASTIC_MCP_TCP_HOST, if set.
If the env var is malformed (non-integer port, path-like host, etc.),
return an entry with `likely_meshtastic=False` and the parser error in
the description, rather than raising — `list_devices` is the diagnostic
tool a user reaches for when their env var isn't working, so it must
not crash on misconfiguration.
"""
host = os.environ.get("MESHTASTIC_MCP_TCP_HOST")
if not host:
return None
# Lazy import to avoid a circular dependency (connection imports devices).
from . import connection
try:
port = connection.normalize_tcp_endpoint(host)
description = "meshtasticd (TCP)"
likely = True
except connection.ConnectionError as e:
# Surface the raw env-var value plus the parser's reason so the
# user can see exactly what they set and why it was rejected.
# Don't double the scheme if the user already prefixed `tcp://`.
port = host if host.startswith(connection.TCP_SCHEME) else f"tcp://{host}"
description = f"meshtasticd (TCP) — invalid MESHTASTIC_MCP_TCP_HOST: {e}"
likely = False
return {
"port": port,
"vid": None,
"pid": None,
"description": description,
"manufacturer": None,
"product": None,
"serial_number": None,
"likely_meshtastic": likely,
"blacklisted": False,
}
def list_devices(include_unknown: bool = False) -> list[dict[str, Any]]:
"""Return enriched info for serial ports, flagging Meshtastic candidates.
@@ -70,6 +114,22 @@ def list_devices(include_unknown: bool = False) -> list[dict[str, Any]]:
}
)
# Stable ordering: likely_meshtastic first, then by port path
results.sort(key=lambda r: (not r["likely_meshtastic"], r["port"]))
# Append the TCP endpoint (if env var set) and sort everything together.
tcp_entry = _tcp_endpoint_from_env()
if tcp_entry is not None:
results.append(tcp_entry)
# Stable ordering: likely_meshtastic first; within rank, TCP wins over
# USB (explicit env-var configuration takes precedence over USB
# enumeration); then by port path. A misconfigured TCP entry has
# likely_meshtastic=False and lands among the other ignored entries —
# it does NOT pre-empt real USB devices at the top of the list.
results.sort(
key=lambda r: (
not r["likely_meshtastic"],
not r["port"].startswith("tcp://"),
r["port"],
)
)
return results

View File

@@ -17,7 +17,7 @@ from typing import Any
import serial
from . import boards, config, devices, pio, userprefs
from . import boards, config, connection, devices, pio, userprefs
# Meshtastic variants use both `esp32s3` and `esp32-s3` style names across
# variants/*/platformio.ini (no consistency enforced). Accept both spellings.
@@ -46,6 +46,18 @@ def _require_confirm(confirm: bool, operation: str) -> None:
)
def _reject_native_env(env: str, operation: str) -> None:
"""`native*` envs build a host executable, not firmware — there's no
upload step. The user wants `build` (or just runs the binary directly).
"""
if env.startswith("native"):
raise FlashError(
f"{operation} is not applicable for env {env!r}: native envs "
"produce a host executable, not flashable firmware. Use `build` "
"instead, then run the resulting binary directly."
)
def _artifacts_for(env: str) -> list[Path]:
build_dir = config.firmware_root() / ".pio" / "build" / env
if not build_dir.is_dir():
@@ -141,6 +153,8 @@ def flash(
that pio performs will pick up the injected values.
"""
_require_confirm(confirm, "flash")
_reject_native_env(env, "flash")
connection.reject_if_tcp(port, "flash")
with userprefs.temporary_overrides(userprefs_overrides) as effective:
result = pio.run(
["run", "-e", env, "-t", "upload", "--upload-port", port],
@@ -200,6 +214,7 @@ def erase_and_flash(
in that case) since a cached factory.bin would not reflect the new prefs.
"""
_require_confirm(confirm, "erase_and_flash")
connection.reject_if_tcp(port, "erase_and_flash")
_check_esp32_env(env)
if userprefs_overrides and skip_build:
@@ -257,6 +272,7 @@ def update_flash(
overrides are provided we always force a rebuild.
"""
_require_confirm(confirm, "update_flash")
connection.reject_if_tcp(port, "update_flash")
_check_esp32_env(env)
if userprefs_overrides and skip_build:
@@ -391,6 +407,7 @@ def touch_1200bps(
Returns `{ok, former_port, new_port, new_port_vid_pid, attempts}`.
"""
connection.reject_if_tcp(port, "touch_1200bps")
before_list = devices.list_devices(include_unknown=True)
before_ports = {d["port"] for d in before_list}

View File

@@ -16,7 +16,7 @@ import subprocess
from pathlib import Path
from typing import Any, Sequence
from . import config, pio
from . import config, connection, pio
_TIMEOUT_SHORT = 30
_TIMEOUT_LONG = 600
@@ -102,6 +102,7 @@ def _parse_esptool_chip_info(stdout: str) -> dict[str, Any]:
def esptool_chip_info(port: str) -> dict[str, Any]:
connection.reject_if_tcp(port, "esptool_chip_info")
binary = config.esptool_bin()
# `chip_id` prints chip + mac + crystal + features. `flash_id` adds flash.
combined = _run(binary, ["--port", port, "flash_id"], timeout=_TIMEOUT_SHORT)
@@ -116,6 +117,7 @@ def esptool_chip_info(port: str) -> dict[str, Any]:
def esptool_erase_flash(port: str, confirm: bool = False) -> dict[str, Any]:
"""Full-chip erase. Leaves the device unbootable until reflashed."""
_require_confirm(confirm, "esptool_erase_flash")
connection.reject_if_tcp(port, "esptool_erase_flash")
binary = config.esptool_bin()
# esptool v5 uses `erase-flash`, older uses `erase_flash`. Try the new name
# first; if it fails with unknown command, retry old.
@@ -134,6 +136,7 @@ def esptool_raw(
"""Raw esptool passthrough. Destructive subcommands require confirm=True."""
if not args:
raise ToolError("args must not be empty")
connection.reject_if_tcp(port, "esptool_raw")
# Find the first non-flag arg (the subcommand).
subcommand = next((a for a in args if not a.startswith("-")), None)
if subcommand and subcommand.replace("-", "_") in {
@@ -156,6 +159,7 @@ NRFUTIL_DESTRUCTIVE = {"dfu", "settings"}
def nrfutil_dfu(port: str, package_path: str, confirm: bool = False) -> dict[str, Any]:
_require_confirm(confirm, "nrfutil_dfu")
connection.reject_if_tcp(port, "nrfutil_dfu")
pkg = Path(package_path).expanduser()
if not pkg.is_file():
raise ToolError(f"Package not found: {pkg}")
@@ -213,6 +217,7 @@ def _parse_picotool_info(stdout: str) -> dict[str, Any]:
def picotool_info(port: str | None = None) -> dict[str, Any]:
"""Read device info from a Pico in BOOTSEL mode. `port` is informational
only — picotool auto-detects."""
connection.reject_if_tcp(port, "picotool_info")
binary = config.picotool_bin()
res = _run(binary, ["info", "-a"], timeout=_TIMEOUT_SHORT)
if res["exit_code"] != 0:

View File

@@ -71,6 +71,10 @@ def open_session(
If `env` is supplied, pio resolves baud and filters from platformio.ini.
Otherwise uses the supplied `baud` and `filters` (default `['direct']`).
"""
# Lazy import to avoid circular: registry imports serial_session.
from . import connection
connection.reject_if_tcp(port, "serial_open")
args = ["device", "monitor", "--port", port, "--no-reconnect"]
effective_filters: list[str]
effective_baud: int = baud

View File

@@ -0,0 +1,383 @@
"""TCP transport plumbing in connection.py + devices.py.
Pure-Python tests — no real device or daemon required. Mocks `TCPInterface`
when exercising `connect()`.
"""
from __future__ import annotations
from unittest.mock import patch
import pytest
from meshtastic_mcp import connection, devices
# ---------- helpers --------------------------------------------------------
class TestIsTcpPort:
def test_tcp_scheme(self) -> None:
assert connection.is_tcp_port("tcp://localhost") is True
assert connection.is_tcp_port("tcp://localhost:4403") is True
assert connection.is_tcp_port("tcp://192.168.1.50:9999") is True
def test_serial_paths(self) -> None:
assert connection.is_tcp_port("/dev/cu.usbmodem1234") is False
assert connection.is_tcp_port("/dev/ttyUSB0") is False
assert connection.is_tcp_port("COM3") is False
def test_empty_or_none(self) -> None:
assert connection.is_tcp_port(None) is False
assert connection.is_tcp_port("") is False
class TestParseTcpPort:
def test_default_port(self) -> None:
assert connection.parse_tcp_port("tcp://localhost") == ("localhost", 4403)
def test_explicit_port(self) -> None:
assert connection.parse_tcp_port("tcp://localhost:9999") == (
"localhost",
9999,
)
def test_ip_with_port(self) -> None:
assert connection.parse_tcp_port("tcp://192.168.1.50:4403") == (
"192.168.1.50",
4403,
)
class TestNormalizeTcpEndpoint:
def test_bare_host(self) -> None:
assert connection.normalize_tcp_endpoint("localhost") == "tcp://localhost:4403"
def test_host_port(self) -> None:
assert (
connection.normalize_tcp_endpoint("localhost:5000")
== "tcp://localhost:5000"
)
def test_full_url(self) -> None:
assert (
connection.normalize_tcp_endpoint("tcp://1.2.3.4") == "tcp://1.2.3.4:4403"
)
assert (
connection.normalize_tcp_endpoint("tcp://1.2.3.4:9999")
== "tcp://1.2.3.4:9999"
)
def test_idempotent(self) -> None:
once = connection.normalize_tcp_endpoint("localhost:4403")
twice = connection.normalize_tcp_endpoint(once)
assert once == twice == "tcp://localhost:4403"
def test_path_like_endpoint_rejected(self) -> None:
# Serial port paths and Windows drive paths are common config typos
# (someone passes a serial path to MESHTASTIC_MCP_TCP_HOST). Reject
# rather than producing a nonsense `tcp:///dev/cu.foo:4403` URL.
with pytest.raises(connection.ConnectionError, match="path separator"):
connection.normalize_tcp_endpoint("/dev/cu.foo")
with pytest.raises(connection.ConnectionError):
connection.normalize_tcp_endpoint("tcp:///dev/cu.foo:4403")
with pytest.raises(connection.ConnectionError):
connection.normalize_tcp_endpoint(r"C:\Windows\System32")
def test_non_integer_port_rejected(self) -> None:
with pytest.raises(connection.ConnectionError, match="not an integer"):
connection.normalize_tcp_endpoint("tcp://host:notaport")
with pytest.raises(connection.ConnectionError, match="not an integer"):
connection.normalize_tcp_endpoint("host:notaport")
def test_empty_host_rejected(self) -> None:
with pytest.raises(connection.ConnectionError, match="empty host"):
connection.normalize_tcp_endpoint("tcp://:4403")
def test_port_out_of_range_rejected(self) -> None:
with pytest.raises(connection.ConnectionError, match="out of range"):
connection.normalize_tcp_endpoint("tcp://host:0")
with pytest.raises(connection.ConnectionError, match="out of range"):
connection.normalize_tcp_endpoint("tcp://host:65536")
with pytest.raises(connection.ConnectionError, match="out of range"):
connection.normalize_tcp_endpoint("host:99999")
class TestParseTcpPortValidation:
def test_missing_scheme_rejected(self) -> None:
# parse_tcp_port is a low-level helper that requires the scheme.
# Misuse should fail loudly rather than silently mis-parsing.
with pytest.raises(connection.ConnectionError, match="expected"):
connection.parse_tcp_port("localhost:4403")
def test_negative_port_rejected(self) -> None:
with pytest.raises(connection.ConnectionError, match="out of range"):
connection.parse_tcp_port("tcp://host:-1")
# ---------- reject_if_tcp --------------------------------------------------
class TestRejectIfTcp:
def test_rejects_tcp(self) -> None:
with pytest.raises(connection.ConnectionError, match="not applicable"):
connection.reject_if_tcp("tcp://localhost", "esptool_chip_info")
def test_passes_through_serial(self) -> None:
connection.reject_if_tcp("/dev/cu.usbmodem1", "esptool_chip_info") # no raise
def test_passes_through_none(self) -> None:
# None means "auto-detect"; not the explicit-arg case we guard.
connection.reject_if_tcp(None, "esptool_chip_info") # no raise
# ---------- resolve_port ---------------------------------------------------
class TestResolvePort:
def test_explicit_serial_passthrough(self) -> None:
assert connection.resolve_port("/dev/cu.usbmodem999") == "/dev/cu.usbmodem999"
def test_explicit_tcp_normalized(self) -> None:
assert connection.resolve_port("tcp://localhost") == "tcp://localhost:4403"
def test_no_port_no_devices_errors(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("MESHTASTIC_MCP_TCP_HOST", raising=False)
with patch.object(devices, "list_devices", return_value=[]):
with pytest.raises(
connection.ConnectionError, match="No Meshtastic devices"
):
connection.resolve_port(None)
def test_no_port_one_candidate_selected(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.delenv("MESHTASTIC_MCP_TCP_HOST", raising=False)
fake = [{"port": "/dev/cu.usbmodem1", "likely_meshtastic": True}]
with patch.object(devices, "list_devices", return_value=fake):
assert connection.resolve_port(None) == "/dev/cu.usbmodem1"
def test_no_port_multiple_candidates_errors(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.delenv("MESHTASTIC_MCP_TCP_HOST", raising=False)
fake = [
{"port": "/dev/cu.usbmodem1", "likely_meshtastic": True},
{"port": "/dev/cu.usbmodem2", "likely_meshtastic": True},
]
with patch.object(devices, "list_devices", return_value=fake):
with pytest.raises(connection.ConnectionError, match="Multiple"):
connection.resolve_port(None)
def test_env_var_surfaces_tcp_via_devices(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.setenv("MESHTASTIC_MCP_TCP_HOST", "localhost")
# Don't patch list_devices — let the real env-var path run, but stub
# the USB enumeration to keep the test hermetic.
with patch("meshtastic_mcp.devices.list_ports.comports", return_value=[]):
assert connection.resolve_port(None) == "tcp://localhost:4403"
# ---------- devices.list_devices TCP entry --------------------------------
class TestDevicesTcpEntry:
def test_no_env_var_no_tcp_entry(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("MESHTASTIC_MCP_TCP_HOST", raising=False)
with patch("meshtastic_mcp.devices.list_ports.comports", return_value=[]):
ds = devices.list_devices()
assert all(not d["port"].startswith("tcp://") for d in ds)
def test_env_var_adds_tcp_entry(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("MESHTASTIC_MCP_TCP_HOST", "myhost:9999")
with patch("meshtastic_mcp.devices.list_ports.comports", return_value=[]):
ds = devices.list_devices()
tcp = [d for d in ds if d["port"].startswith("tcp://")]
assert len(tcp) == 1
assert tcp[0]["port"] == "tcp://myhost:9999"
assert tcp[0]["likely_meshtastic"] is True
assert tcp[0]["description"] == "meshtasticd (TCP)"
def test_tcp_entry_first_in_results(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("MESHTASTIC_MCP_TCP_HOST", "localhost")
with patch("meshtastic_mcp.devices.list_ports.comports", return_value=[]):
ds = devices.list_devices()
assert ds, "expected at least the TCP entry"
assert ds[0]["port"].startswith("tcp://")
def test_invalid_env_var_does_not_break_list_devices(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
# `list_devices` is the diagnostic tool reached for when an env var
# isn't working — it must not throw on misconfiguration.
monkeypatch.setenv("MESHTASTIC_MCP_TCP_HOST", "host:notaport")
with patch("meshtastic_mcp.devices.list_ports.comports", return_value=[]):
ds = devices.list_devices(include_unknown=True)
tcp = [d for d in ds if "TCP" in (d["description"] or "")]
assert len(tcp) == 1
assert tcp[0]["likely_meshtastic"] is False
assert "invalid MESHTASTIC_MCP_TCP_HOST" in tcp[0]["description"]
assert "not an integer" in tcp[0]["description"]
def test_invalid_env_var_excluded_from_resolve_port_autodetect(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
# `likely_meshtastic=False` keeps the bad TCP entry out of the
# auto-select path — `resolve_port(None)` should still report
# "no Meshtastic devices" rather than picking a broken endpoint.
monkeypatch.setenv("MESHTASTIC_MCP_TCP_HOST", "host:notaport")
with patch("meshtastic_mcp.devices.list_ports.comports", return_value=[]):
with pytest.raises(connection.ConnectionError, match="No Meshtastic"):
connection.resolve_port(None)
def test_invalid_env_var_does_not_double_tcp_scheme(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
# If a user mistakenly sets `MESHTASTIC_MCP_TCP_HOST=tcp://host:bad`,
# the diagnostic entry must surface the raw value as-is rather than
# producing `tcp://tcp://host:bad`.
monkeypatch.setenv("MESHTASTIC_MCP_TCP_HOST", "tcp://host:notaport")
with patch("meshtastic_mcp.devices.list_ports.comports", return_value=[]):
ds = devices.list_devices(include_unknown=True)
tcp = [d for d in ds if "TCP" in (d["description"] or "")]
assert len(tcp) == 1
assert tcp[0]["port"] == "tcp://host:notaport"
assert "tcp://tcp://" not in tcp[0]["port"]
def test_invalid_env_var_does_not_pre_empt_real_usb_devices(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
# Sort ordering: a misconfigured TCP env var must NOT take position 0
# ahead of real USB candidates. Position 0 is reserved for the highest
# rank (likely_meshtastic=True), with TCP-before-USB as a tiebreaker
# within rank.
monkeypatch.setenv("MESHTASTIC_MCP_TCP_HOST", "host:notaport")
# Stub a USB Meshtastic candidate (Espressif VID, port present in
# findPorts).
class FakeInfo:
def __init__(self, device: str, vid: int, pid: int) -> None:
self.device = device
self.vid = vid
self.pid = pid
self.description = "Heltec V3"
self.manufacturer = "Espressif"
self.product = "USB JTAG/serial"
self.serial_number = "abc"
fake_port = FakeInfo("/dev/cu.usbmodem4201", 0x303A, 0x1001)
with patch(
"meshtastic_mcp.devices.list_ports.comports", return_value=[fake_port]
), patch(
"meshtastic.util.findPorts",
return_value=["/dev/cu.usbmodem4201"],
):
ds = devices.list_devices(include_unknown=True)
assert ds, "expected at least the USB + TCP entries"
# Real USB candidate must be at position 0 — it's likely_meshtastic.
assert ds[0]["port"] == "/dev/cu.usbmodem4201"
assert ds[0]["likely_meshtastic"] is True
# The malformed TCP entry exists but lands among the unlikely entries.
tcp = [d for d in ds if "TCP" in (d["description"] or "")]
assert len(tcp) == 1
assert tcp[0]["likely_meshtastic"] is False
assert ds.index(tcp[0]) > 0
def test_likely_tcp_entry_wins_tiebreak_over_usb(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
# Conversely, a *valid* TCP env var should sort ahead of USB
# candidates of equal likely_meshtastic rank — explicit env-var
# configuration is a precedence signal.
monkeypatch.setenv("MESHTASTIC_MCP_TCP_HOST", "localhost:4403")
class FakeInfo:
def __init__(self, device: str, vid: int, pid: int) -> None:
self.device = device
self.vid = vid
self.pid = pid
self.description = "Heltec V3"
self.manufacturer = "Espressif"
self.product = "USB JTAG/serial"
self.serial_number = "abc"
fake_port = FakeInfo("/dev/cu.usbmodem4201", 0x303A, 0x1001)
with patch(
"meshtastic_mcp.devices.list_ports.comports", return_value=[fake_port]
), patch(
"meshtastic.util.findPorts",
return_value=["/dev/cu.usbmodem4201"],
):
ds = devices.list_devices()
assert ds[0]["port"] == "tcp://localhost:4403"
assert ds[0]["likely_meshtastic"] is True
# ---------- connect() routing ---------------------------------------------
class TestConnectRoutesTcp:
def test_connect_uses_tcp_interface_for_tcp_port(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Verify the TCP branch instantiates `TCPInterface(hostname, portNumber)`
and never touches `SerialInterface`."""
# Make sure the env var doesn't leak in and confuse resolve_port.
monkeypatch.delenv("MESHTASTIC_MCP_TCP_HOST", raising=False)
with patch("meshtastic.tcp_interface.TCPInterface") as mock_tcp, patch(
"meshtastic.serial_interface.SerialInterface"
) as mock_serial:
mock_tcp.return_value.close.return_value = None
with connection.connect(port="tcp://example.com:1234", timeout_s=12.0):
pass
mock_tcp.assert_called_once_with(
hostname="example.com",
portNumber=1234,
connectNow=True,
noProto=False,
timeout=12,
)
mock_serial.assert_not_called()
def test_connect_plumbs_timeout_to_serial_interface(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Verify the serial branch also propagates `timeout_s` so callers
passing a custom timeout to `device_info` / `list_nodes` / etc. don't
silently get the library default."""
monkeypatch.delenv("MESHTASTIC_MCP_TCP_HOST", raising=False)
with patch("meshtastic.serial_interface.SerialInterface") as mock_serial, patch(
"meshtastic.tcp_interface.TCPInterface"
) as mock_tcp:
mock_serial.return_value.close.return_value = None
with connection.connect(port="/dev/cu.fake", timeout_s=20.0):
pass
mock_serial.assert_called_once_with(
devPath="/dev/cu.fake",
connectNow=True,
noProto=False,
timeout=20,
)
mock_tcp.assert_not_called()
def test_connect_releases_lock_on_tcp_failure(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.delenv("MESHTASTIC_MCP_TCP_HOST", raising=False)
with patch("meshtastic.tcp_interface.TCPInterface") as mock_tcp:
mock_tcp.side_effect = RuntimeError("boom")
with pytest.raises(RuntimeError, match="boom"):
with connection.connect(port="tcp://locktest:4403"):
pass
# Lock should be released — a second connect attempt must not fail
# with "busy".
with patch("meshtastic.tcp_interface.TCPInterface") as mock_tcp:
mock_tcp.return_value.close.return_value = None
with connection.connect(port="tcp://locktest:4403"):
pass