Merge remote-tracking branch 'origin/master' into develop

This commit is contained in:
Jonathan Bennett
2026-05-04 18:10:59 -05:00
21 changed files with 1067 additions and 77 deletions

View File

@@ -575,6 +575,8 @@ Grouped by purpose. Full argument shapes in `mcp-server/README.md`; a few high-v
`confirm=True` is a tool-level gate on top of whatever permission prompt your MCP host shows. **Don't bypass it** by asking the host to auto-approve — it exists specifically because MCP hosts sometimes remember "always allow this tool" and that's dangerous for `factory_reset`, `erase_and_flash`, `uhubctl_power(action='off')`, and `uhubctl_cycle`.
**TCP / native-host nodes.** Setting `MESHTASTIC_MCP_TCP_HOST=<host[:port]>` makes `list_devices` surface a `meshtasticd` daemon (e.g. the `native-macos` build) as a synthetic `tcp://host:port` entry, and `connect()` routes through `meshtastic.tcp_interface.TCPInterface` instead of `SerialInterface`. Every read/write/admin tool that flows through `connect()` works against the daemon transparently. USB-only tools (`pio_flash`, `erase_and_flash`, `update_flash`, `touch_1200bps`, `serial_open`, `esptool_*`, `nrfutil_*`, `picotool_*`) raise a clear `ConnectionError` when handed a `tcp://` port; `pio_flash` against a `native*` env raises a `FlashError` (no upload step — use `build` and run the binary directly). The pytest harness still assumes USB-attached devices per role; TCP-aware fixtures are deferred. See `mcp-server/README.md` § "TCP / native-host nodes".
### Hardware test suite (`mcp-server/run-tests.sh`)
The wrapper auto-detects connected devices (VID → role map: `0x239A``nrf52`, `0x303A`/`0x10C4``esp32s3`), maps each role to a PlatformIO env (`nrf52``rak4631`, `esp32s3``heltec-v3`, overridable via `MESHTASTIC_MCP_ENV_<ROLE>`), then invokes pytest. Zero pre-flight config needed from the operator.

View File

@@ -32,10 +32,15 @@ jobs:
shell: bash
working-directory: meshtasticd
run: |
# Build-tools (notably platformio) come from the Meshtastic project
# on the OpenSUSE Build Service:
# https://build.opensuse.org/project/show/network:Meshtastic:build-tools
echo 'deb http://download.opensuse.org/repositories/network:/Meshtastic:/build-tools/xUbuntu_24.04/ /' \
| sudo tee /etc/apt/sources.list.d/network:Meshtastic:build-tools.list
curl -fsSL https://download.opensuse.org/repositories/network:Meshtastic:build-tools/xUbuntu_24.04/Release.key \
| gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/network_Meshtastic_build-tools.gpg >/dev/null
sudo apt-get update -y --fix-missing
sudo apt-get install -y software-properties-common build-essential devscripts equivs
sudo add-apt-repository ppa:meshtastic/build-tools -y
sudo apt-get update -y --fix-missing
sudo apt-get install -y build-essential devscripts equivs
sudo mk-build-deps --install --remove --tool='apt-get -o Debug::pkgProblemResolver=yes --no-install-recommends --yes' debian/control
- name: Import GPG key

View File

@@ -24,7 +24,7 @@ jobs:
shell: bash
run: |
brew update
brew install platformio yaml-cpp libuv openssl@3 libusb argp-standalone pkg-config
brew install platformio yaml-cpp libuv openssl@3 libusb argp-standalone pkg-config ulfius
- name: Get release version string
run: |

View File

@@ -8,7 +8,7 @@ plugins:
uri: https://github.com/trunk-io/plugins
lint:
enabled:
- checkov@3.2.525
- checkov@3.2.526
- renovate@43.150.0
- prettier@3.8.3
- trufflehog@3.95.2

View File

@@ -126,16 +126,17 @@ Sequence these; don't parallelize on the same port.
## Environment variables (test harness)
| Var | Purpose |
| ------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------- |
| `MESHTASTIC_MCP_ENV_<ROLE>` | Override PlatformIO env for a role (e.g. `MESHTASTIC_MCP_ENV_NRF52=rak4631-dap`). Default map: `nrf52→rak4631`, `esp32s3→heltec-v3`. |
| `MESHTASTIC_MCP_SEED` | PSK seed for the session test profile. Defaults to `mcp-<user>-<host>`. |
| `MESHTASTIC_MCP_FLASH_LOG` | File path to tee pio/esptool/nrfutil/picotool output. `run-tests.sh` sets this to `tests/flash.log` so the TUI can stream live flash progress. |
| `MESHTASTIC_UHUBCTL_BIN` | Absolute path to `uhubctl` binary. Default: PATH lookup. |
| `MESHTASTIC_UHUBCTL_LOCATION_<ROLE>` | Pin a role to a specific uhubctl hub location (e.g. `1-1.3`). Wins over VID auto-detection — use when multiple devices share a VID. |
| `MESHTASTIC_UHUBCTL_PORT_<ROLE>` | Pin a role to a specific hub port number. Required alongside `LOCATION_<ROLE>`. |
| `MESHTASTIC_UI_CAMERA_BACKEND` | Camera backend for UI tier + `capture_screen` tool: `opencv` / `ffmpeg` / `null` / `auto` (default). |
| `MESHTASTIC_UI_CAMERA_DEVICE` | Generic camera device (index or path). Used by the UI tier when no per-role var is set. |
| `MESHTASTIC_UI_CAMERA_DEVICE_<ROLE>` | Per-role camera pinning (e.g. `MESHTASTIC_UI_CAMERA_DEVICE_ESP32S3=0` for the OLED-bearing heltec-v3). |
| `MESHTASTIC_UI_OCR_BACKEND` | OCR engine selection: `easyocr` / `pytesseract` / `null` / `auto` (default). |
| `MESHTASTIC_UI_TUI_CAMERA` | Set to `1` to mount the live camera-feed panel in `meshtastic-mcp-test-tui`. |
| Var | Purpose |
| ------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `MESHTASTIC_MCP_ENV_<ROLE>` | Override PlatformIO env for a role (e.g. `MESHTASTIC_MCP_ENV_NRF52=rak4631-dap`). Default map: `nrf52→rak4631`, `esp32s3→heltec-v3`. |
| `MESHTASTIC_MCP_SEED` | PSK seed for the session test profile. Defaults to `mcp-<user>-<host>`. |
| `MESHTASTIC_MCP_FLASH_LOG` | File path to tee pio/esptool/nrfutil/picotool output. `run-tests.sh` sets this to `tests/flash.log` so the TUI can stream live flash progress. |
| `MESHTASTIC_MCP_TCP_HOST` | `host` or `host:port` of a `meshtasticd` daemon (e.g. the `native-macos` build). Surfaces it in `list_devices` as `tcp://host:port` so `connect()`-based tools target it transparently. Default port 4403. |
| `MESHTASTIC_UHUBCTL_BIN` | Absolute path to `uhubctl` binary. Default: PATH lookup. |
| `MESHTASTIC_UHUBCTL_LOCATION_<ROLE>` | Pin a role to a specific uhubctl hub location (e.g. `1-1.3`). Wins over VID auto-detection — use when multiple devices share a VID. |
| `MESHTASTIC_UHUBCTL_PORT_<ROLE>` | Pin a role to a specific hub port number. Required alongside `LOCATION_<ROLE>`. |
| `MESHTASTIC_UI_CAMERA_BACKEND` | Camera backend for UI tier + `capture_screen` tool: `opencv` / `ffmpeg` / `null` / `auto` (default). |
| `MESHTASTIC_UI_CAMERA_DEVICE` | Generic camera device (index or path). Used by the UI tier when no per-role var is set. |
| `MESHTASTIC_UI_CAMERA_DEVICE_<ROLE>` | Per-role camera pinning (e.g. `MESHTASTIC_UI_CAMERA_DEVICE_ESP32S3=0` for the OLED-bearing heltec-v3). |
| `MESHTASTIC_UI_OCR_BACKEND` | OCR engine selection: `easyocr` / `pytesseract` / `null` / `auto` (default). |
| `MESHTASTIC_UI_TUI_CAMERA` | Set to `1` to mount the live camera-feed panel in `meshtastic-mcp-test-tui`. |

View File

@@ -3,15 +3,17 @@
# trunk-ignore-all(hadolint/DL3008): Do not pin apt package versions
# trunk-ignore-all(hadolint/DL3013): Do not pin pip package versions
FROM python:3.14-slim-trixie AS builder
FROM debian:trixie AS builder
ARG PIO_ENV=native
ENV DEBIAN_FRONTEND=noninteractive
ENV TZ=Etc/UTC
# Install Dependencies
ENV PIP_ROOT_USER_ACTION=ignore
ENV PIP_BREAK_SYSTEM_PACKAGES=1
RUN apt-get update && apt-get install --no-install-recommends -y \
curl wget g++ zip git ca-certificates pkg-config \
python3-pip python3-grpc-tools \
libgpiod-dev libyaml-cpp-dev libbluetooth-dev libi2c-dev libuv1-dev \
libusb-1.0-0-dev libulfius-dev liborcania-dev libssl-dev \
libx11-dev libinput-dev libxkbcommon-x11-dev libsqlite3-dev libsdl2-dev \

View File

@@ -4,12 +4,18 @@
# trunk-ignore-all(hadolint/DL3013): Do not pin pip package versions
# Ensure the Alpine version is updated in both stages of the container!
FROM python:3.14-alpine3.23 AS builder
FROM alpine:3.23 AS builder
ARG PIO_ENV=native
ENV PIP_ROOT_USER_ACTION=ignore
# Enable Alpine community repository (for 'py3-grpcio-tools')
RUN echo "https://dl-cdn.alpinelinux.org/alpine/v$(cut -d. -f1,2 /etc/alpine-release)/community" >> /etc/apk/repositories
# Install Dependencies
ENV PIP_ROOT_USER_ACTION=ignore
ENV PIP_BREAK_SYSTEM_PACKAGES=1
RUN apk --no-cache add \
bash g++ libstdc++-dev linux-headers zip git ca-certificates libbsd-dev \
py3-pip py3-grpcio-tools \
libgpiod-dev yaml-cpp-dev bluez-dev \
libusb-dev i2c-tools-dev libuv-dev openssl-dev pkgconf argp-standalone \
libx11-dev libinput-dev libxkbcommon-dev sqlite-dev sdl2-dev \

View File

@@ -166,15 +166,73 @@ rather than auto-`sudo`'ing mid-run.
## Environment variables
| Var | Default | Purpose |
| -------------------------- | ----------------------------------------------------------- | ------------------------------------------------------------------- |
| `MESHTASTIC_FIRMWARE_ROOT` | walks up from cwd for `platformio.ini` | Pin the firmware repo |
| `MESHTASTIC_PIO_BIN` | `~/.platformio/penv/bin/pio``$PATH` `pio``platformio` | Override `pio` location |
| `MESHTASTIC_ESPTOOL_BIN` | `<firmware>/.venv/bin/esptool``$PATH` | Override esptool |
| `MESHTASTIC_NRFUTIL_BIN` | `$PATH` | Override nrfutil |
| `MESHTASTIC_PICOTOOL_BIN` | `$PATH` | Override picotool |
| `MESHTASTIC_MCP_SEED` | `mcp-<user>-<host>` | PSK seed for test-harness session (CI override) |
| `MESHTASTIC_MCP_FLASH_LOG` | `<mcp-server>/tests/flash.log` | Tee target for pio/esptool/nrfutil subprocess output (TUI tails it) |
| Var | Default | Purpose |
| -------------------------- | ----------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------- |
| `MESHTASTIC_FIRMWARE_ROOT` | walks up from cwd for `platformio.ini` | Pin the firmware repo |
| `MESHTASTIC_PIO_BIN` | `~/.platformio/penv/bin/pio``$PATH` `pio``platformio` | Override `pio` location |
| `MESHTASTIC_ESPTOOL_BIN` | `<firmware>/.venv/bin/esptool``$PATH` | Override esptool |
| `MESHTASTIC_NRFUTIL_BIN` | `$PATH` | Override nrfutil |
| `MESHTASTIC_PICOTOOL_BIN` | `$PATH` | Override picotool |
| `MESHTASTIC_MCP_SEED` | `mcp-<user>-<host>` | PSK seed for test-harness session (CI override) |
| `MESHTASTIC_MCP_FLASH_LOG` | `<mcp-server>/tests/flash.log` | Tee target for pio/esptool/nrfutil subprocess output (TUI tails it) |
| `MESHTASTIC_MCP_TCP_HOST` | unset | `host` or `host:port` of a `meshtasticd` daemon to surface as a TCP device (see "TCP / native-host nodes" below) |
## TCP / native-host nodes
The `native-macos` and `native` PlatformIO envs build a headless `meshtasticd`
binary that runs on the host (Apple Silicon / Intel macOS, or Linux Portduino).
The daemon exposes the meshtastic TCP API on port `4403` rather than a USB
serial endpoint — point the MCP server at it via `MESHTASTIC_MCP_TCP_HOST`:
```bash
# 1. Build + run a daemon on this host (see variants/native/portduino/platformio.ini
# for full Homebrew prereqs and CH341 LoRa-adapter setup).
pio run -e native-macos
~/.meshtasticd/meshtasticd
# 2. Point the MCP server at it.
export MESHTASTIC_MCP_TCP_HOST=localhost # or host:port, default port 4403
```
**First-run gotcha — MAC address.** `meshtasticd` derives its MAC from the
USB adapter's serial-number / product strings. Many cheap CH341 dongles
(MeshStick included — VID 0x1A86 / PID 0x5512) ship with `iSerialNumber=0`
and `iProduct=0`, so the daemon aborts on boot with `*** Blank MAC Address
not allowed!`. Set the MAC explicitly in `config.yaml`:
```yaml
# Under General:
MACAddress: 02:CA:FE:BA:BE:01
```
Use a locally-administered address (first byte's second-LSB set, e.g.
`02:*` / `06:*` / `0A:*` / `0E:*`) to avoid colliding with a real OUI.
There is also a `--hwid AA:BB:CC:DD:EE:FF` CLI flag visible in
`meshtasticd --help`, but it is **currently broken** in
`MAC_from_string()` (`src/platform/portduino/PortduinoGlue.cpp`): the
function strips colons from its parameter but then reads bytes from the
global `portduino_config.mac_address`, so `--hwid` is silently overridden
when `MACAddress:` is also set, and crashes the daemon (uncaught
`std::invalid_argument: stoi: no conversion`) when it isn't. Use the YAML
form until that's fixed upstream.
`list_devices` will surface the daemon as `tcp://localhost:4403` with
`likely_meshtastic=True`, so `device_info`, `list_nodes`, `get_config`,
`set_config`, `set_owner`, `send_text`, `userprefs_*`, and the admin RPCs
auto-select it when no `port` is passed. Pass `port="tcp://other-host:9999"`
explicitly to target a different daemon.
**Tools that don't apply to a TCP/native node** (no USB hardware to operate
on) raise a clear `ConnectionError` rather than failing mysteriously:
`pio_flash`, `erase_and_flash`, `update_flash`, `touch_1200bps`,
`serial_open` (use info/admin tools directly), and the vendor escape hatches
`esptool_*`, `nrfutil_*`, `picotool_*`. `pio_flash` against a `native*` env
similarly raises — there's no upload step; use `build` and run the binary
directly.
The pytest harness in `tests/` still assumes USB-attached devices per role —
TCP-aware fixtures are not part of this surface yet.
## Hardware Test Suite

View File

@@ -1,4 +1,4 @@
"""Context manager for meshtastic.SerialInterface connections.
"""Context manager for meshtastic interface connections (serial + TCP).
Every info/admin tool goes through `connect(port)` so we have a single place
that:
@@ -6,8 +6,16 @@ that:
- fails fast if a serial_session is already holding the port,
- guarantees `.close()` is called, even on exception.
The `SerialInterface` blocks on construction waiting for the node database;
that's fine for v1 since every tool is a short-lived request.
Two transports:
- Serial: USB-attached firmware on `/dev/cu.*` / `/dev/ttyUSB*` / `COM*`.
- TCP: a `meshtasticd` daemon (e.g. the native macOS / Linux Portduino
headless build) addressed as `tcp://host[:port]` (default port 4403).
Surfaced by `devices.list_devices()` when `MESHTASTIC_MCP_TCP_HOST` is
set, so `resolve_port(None)` auto-selects it like a USB candidate.
Both `SerialInterface` and `TCPInterface` block on construction waiting for
the node database; that's fine for v1 since every tool is a short-lived
request.
"""
from __future__ import annotations
@@ -17,20 +25,107 @@ from typing import Iterator
from . import devices, registry
DEFAULT_TCP_PORT = 4403
TCP_SCHEME = "tcp://"
TCP_HOST_ENV = "MESHTASTIC_MCP_TCP_HOST"
class ConnectionError(RuntimeError):
pass
def is_tcp_port(port: str | None) -> bool:
return bool(port) and port.startswith(TCP_SCHEME)
def parse_tcp_port(port: str) -> tuple[str, int]:
"""Parse `tcp://host[:port]` → (host, port). Defaults to 4403.
Validates host shape (non-empty, no path separators) and port range
(1..65535). Raises `ConnectionError` on malformed input — never lets
a raw `ValueError` bubble up to a tool surface.
"""
if not port.startswith(TCP_SCHEME):
raise ConnectionError(
f"Invalid TCP endpoint {port!r}: expected '{TCP_SCHEME}host[:port]'."
)
rest = port[len(TCP_SCHEME) :]
if ":" in rest:
host, port_str = rest.rsplit(":", 1)
try:
tcp_port = int(port_str)
except ValueError as e:
raise ConnectionError(
f"Invalid TCP endpoint {port!r}: port {port_str!r} is not an integer."
) from e
else:
host, tcp_port = rest, DEFAULT_TCP_PORT
if not host:
raise ConnectionError(f"Invalid TCP endpoint {port!r}: empty host.")
if any(c in host for c in ("/", "\\")):
raise ConnectionError(
f"Invalid TCP endpoint {port!r}: host {host!r} contains a path "
"separator. TCP hostnames cannot contain '/' or '\\' — did you "
"pass a serial port path or a Windows drive path by mistake?"
)
if not (1 <= tcp_port <= 65535):
raise ConnectionError(
f"Invalid TCP endpoint {port!r}: port {tcp_port} out of range "
"(must be 1..65535)."
)
return host, tcp_port
def normalize_tcp_endpoint(endpoint: str) -> str:
r"""Normalize `host`, `host:port`, or `tcp://host[:port]` → canonical
`tcp://host:port` form. One place that owns the lock-key shape.
Defers all validation to `parse_tcp_port`, so path-like inputs
(`/dev/cu.foo`, `C:\Windows\…`), empty hosts, non-integer ports,
and out-of-range ports raise `ConnectionError` here too.
"""
if endpoint.startswith(TCP_SCHEME):
canonical = endpoint
elif ":" in endpoint:
canonical = f"{TCP_SCHEME}{endpoint}"
else:
canonical = f"{TCP_SCHEME}{endpoint}:{DEFAULT_TCP_PORT}"
host, port = parse_tcp_port(canonical)
return f"{TCP_SCHEME}{host}:{port}"
def reject_if_tcp(port: str | None, tool_name: str) -> None:
"""Raise if `port` is a TCP endpoint — for tools that need real USB
hardware (flash, bootloader, vendor escape hatches, serial monitor).
Only checks the explicit arg; auto-selection via env var is the caller's
responsibility to handle if it matters.
"""
if is_tcp_port(port):
raise ConnectionError(
f"{tool_name} is not applicable to TCP/native nodes ({port}). "
"This tool requires USB-attached hardware."
)
def resolve_port(port: str | None) -> str:
"""Pick a port: explicit > sole likely_meshtastic candidate > error."""
"""Pick a port: explicit > sole likely_meshtastic candidate > error.
A `tcp://` string passes through (after canonicalization). When `port`
is None and no USB candidates are present, `MESHTASTIC_MCP_TCP_HOST`
is consulted via `devices.list_devices()`.
"""
if port:
if is_tcp_port(port):
return normalize_tcp_endpoint(port)
return port
candidates = [d for d in devices.list_devices() if d["likely_meshtastic"]]
if not candidates:
raise ConnectionError(
"No Meshtastic devices detected. Plug one in or pass `port` explicitly. "
"Run `list_devices` with include_unknown=True to see all serial ports."
"No Meshtastic devices detected. Plug one in, set "
f"{TCP_HOST_ENV}=<host[:port]> for a meshtasticd daemon, "
"or pass `port` explicitly. Run `list_devices` with "
"include_unknown=True to see all serial ports."
)
if len(candidates) > 1:
ports = ", ".join(c["port"] for c in candidates)
@@ -43,17 +138,62 @@ def resolve_port(port: str | None) -> str:
@contextmanager
def connect(port: str | None = None, timeout_s: float = 8.0) -> Iterator:
"""Open a `meshtastic.SerialInterface` and always close it.
"""Open a meshtastic interface (serial or TCP) and always close it.
Raises `ConnectionError` immediately if another serial session holds the
port (a `pio device monitor` in `serial_sessions/`, for instance).
For serial: raises `ConnectionError` immediately if another serial
session holds the port (a `pio device monitor` in `serial_sessions/`).
For TCP: no exclusive-access requirement, so the serial-session check
is skipped — but the `port_lock` still serializes parallel `connect()`
calls to the same daemon endpoint.
`timeout_s` is plumbed through to both `SerialInterface(timeout=...)`
and `TCPInterface(timeout=...)`. The meshtastic library uses the value
as the reply-wait deadline for `localNode.waitForConfig()` during
construction and for any subsequent admin RPC. `int()`-converted at
the boundary because the upstream API expects whole seconds.
"""
resolved = resolve_port(port)
timeout = int(timeout_s)
if is_tcp_port(resolved):
from meshtastic.tcp_interface import (
TCPInterface, # type: ignore[import-untyped]
)
host, tcp_port = parse_tcp_port(resolved)
lock = registry.port_lock(resolved)
if not lock.acquire(blocking=False):
raise ConnectionError(
f"TCP endpoint {resolved} is busy — another device operation "
"is in flight. Retry shortly."
)
iface = None
try:
iface = TCPInterface(
hostname=host,
portNumber=tcp_port,
connectNow=True,
noProto=False,
timeout=timeout,
)
yield iface
finally:
if iface is not None:
try:
iface.close()
except Exception:
pass
try:
lock.release()
except RuntimeError:
pass
return
from meshtastic.serial_interface import (
SerialInterface, # type: ignore[import-untyped]
)
resolved = resolve_port(port)
active = registry.active_session_for_port(resolved)
if active is not None:
raise ConnectionError(
@@ -70,7 +210,12 @@ def connect(port: str | None = None, timeout_s: float = 8.0) -> Iterator:
iface = None
try:
iface = SerialInterface(devPath=resolved, connectNow=True, noProto=False)
iface = SerialInterface(
devPath=resolved,
connectNow=True,
noProto=False,
timeout=timeout,
)
yield iface
finally:
if iface is not None:

View File

@@ -1,13 +1,18 @@
"""USB/serial device discovery.
"""USB/serial + TCP device discovery.
Combines the canonical `meshtastic.util.findPorts()` allowlist/blocklist with
the richer metadata (`serial.tools.list_ports.comports()`) so callers see
VID/PID, descriptions, and manufacturer strings alongside the "is this likely
a Meshtastic device" signal.
If `MESHTASTIC_MCP_TCP_HOST=<host[:port]>` is set, a synthetic entry for the
`meshtasticd` daemon at that endpoint is prepended to the result, so
`resolve_port(None)` auto-selects it like a USB candidate.
"""
from __future__ import annotations
import os
from typing import Any
from serial.tools import list_ports
@@ -19,6 +24,45 @@ def _to_hex(value: int | None) -> str | None:
return f"0x{value:04x}"
def _tcp_endpoint_from_env() -> dict[str, Any] | None:
"""Synthesize a TCP device entry from MESHTASTIC_MCP_TCP_HOST, if set.
If the env var is malformed (non-integer port, path-like host, etc.),
return an entry with `likely_meshtastic=False` and the parser error in
the description, rather than raising — `list_devices` is the diagnostic
tool a user reaches for when their env var isn't working, so it must
not crash on misconfiguration.
"""
host = os.environ.get("MESHTASTIC_MCP_TCP_HOST")
if not host:
return None
# Lazy import to avoid a circular dependency (connection imports devices).
from . import connection
try:
port = connection.normalize_tcp_endpoint(host)
description = "meshtasticd (TCP)"
likely = True
except connection.ConnectionError as e:
# Surface the raw env-var value plus the parser's reason so the
# user can see exactly what they set and why it was rejected.
# Don't double the scheme if the user already prefixed `tcp://`.
port = host if host.startswith(connection.TCP_SCHEME) else f"tcp://{host}"
description = f"meshtasticd (TCP) — invalid MESHTASTIC_MCP_TCP_HOST: {e}"
likely = False
return {
"port": port,
"vid": None,
"pid": None,
"description": description,
"manufacturer": None,
"product": None,
"serial_number": None,
"likely_meshtastic": likely,
"blacklisted": False,
}
def list_devices(include_unknown: bool = False) -> list[dict[str, Any]]:
"""Return enriched info for serial ports, flagging Meshtastic candidates.
@@ -70,6 +114,22 @@ def list_devices(include_unknown: bool = False) -> list[dict[str, Any]]:
}
)
# Stable ordering: likely_meshtastic first, then by port path
results.sort(key=lambda r: (not r["likely_meshtastic"], r["port"]))
# Append the TCP endpoint (if env var set) and sort everything together.
tcp_entry = _tcp_endpoint_from_env()
if tcp_entry is not None:
results.append(tcp_entry)
# Stable ordering: likely_meshtastic first; within rank, TCP wins over
# USB (explicit env-var configuration takes precedence over USB
# enumeration); then by port path. A misconfigured TCP entry has
# likely_meshtastic=False and lands among the other ignored entries —
# it does NOT pre-empt real USB devices at the top of the list.
results.sort(
key=lambda r: (
not r["likely_meshtastic"],
not r["port"].startswith("tcp://"),
r["port"],
)
)
return results

View File

@@ -17,7 +17,7 @@ from typing import Any
import serial
from . import boards, config, devices, pio, userprefs
from . import boards, config, connection, devices, pio, userprefs
# Meshtastic variants use both `esp32s3` and `esp32-s3` style names across
# variants/*/platformio.ini (no consistency enforced). Accept both spellings.
@@ -46,6 +46,18 @@ def _require_confirm(confirm: bool, operation: str) -> None:
)
def _reject_native_env(env: str, operation: str) -> None:
"""`native*` envs build a host executable, not firmware — there's no
upload step. The user wants `build` (or just runs the binary directly).
"""
if env.startswith("native"):
raise FlashError(
f"{operation} is not applicable for env {env!r}: native envs "
"produce a host executable, not flashable firmware. Use `build` "
"instead, then run the resulting binary directly."
)
def _artifacts_for(env: str) -> list[Path]:
build_dir = config.firmware_root() / ".pio" / "build" / env
if not build_dir.is_dir():
@@ -141,6 +153,8 @@ def flash(
that pio performs will pick up the injected values.
"""
_require_confirm(confirm, "flash")
_reject_native_env(env, "flash")
connection.reject_if_tcp(port, "flash")
with userprefs.temporary_overrides(userprefs_overrides) as effective:
result = pio.run(
["run", "-e", env, "-t", "upload", "--upload-port", port],
@@ -200,6 +214,7 @@ def erase_and_flash(
in that case) since a cached factory.bin would not reflect the new prefs.
"""
_require_confirm(confirm, "erase_and_flash")
connection.reject_if_tcp(port, "erase_and_flash")
_check_esp32_env(env)
if userprefs_overrides and skip_build:
@@ -257,6 +272,7 @@ def update_flash(
overrides are provided we always force a rebuild.
"""
_require_confirm(confirm, "update_flash")
connection.reject_if_tcp(port, "update_flash")
_check_esp32_env(env)
if userprefs_overrides and skip_build:
@@ -391,6 +407,7 @@ def touch_1200bps(
Returns `{ok, former_port, new_port, new_port_vid_pid, attempts}`.
"""
connection.reject_if_tcp(port, "touch_1200bps")
before_list = devices.list_devices(include_unknown=True)
before_ports = {d["port"] for d in before_list}

View File

@@ -16,7 +16,7 @@ import subprocess
from pathlib import Path
from typing import Any, Sequence
from . import config, pio
from . import config, connection, pio
_TIMEOUT_SHORT = 30
_TIMEOUT_LONG = 600
@@ -102,6 +102,7 @@ def _parse_esptool_chip_info(stdout: str) -> dict[str, Any]:
def esptool_chip_info(port: str) -> dict[str, Any]:
connection.reject_if_tcp(port, "esptool_chip_info")
binary = config.esptool_bin()
# `chip_id` prints chip + mac + crystal + features. `flash_id` adds flash.
combined = _run(binary, ["--port", port, "flash_id"], timeout=_TIMEOUT_SHORT)
@@ -116,6 +117,7 @@ def esptool_chip_info(port: str) -> dict[str, Any]:
def esptool_erase_flash(port: str, confirm: bool = False) -> dict[str, Any]:
"""Full-chip erase. Leaves the device unbootable until reflashed."""
_require_confirm(confirm, "esptool_erase_flash")
connection.reject_if_tcp(port, "esptool_erase_flash")
binary = config.esptool_bin()
# esptool v5 uses `erase-flash`, older uses `erase_flash`. Try the new name
# first; if it fails with unknown command, retry old.
@@ -134,6 +136,7 @@ def esptool_raw(
"""Raw esptool passthrough. Destructive subcommands require confirm=True."""
if not args:
raise ToolError("args must not be empty")
connection.reject_if_tcp(port, "esptool_raw")
# Find the first non-flag arg (the subcommand).
subcommand = next((a for a in args if not a.startswith("-")), None)
if subcommand and subcommand.replace("-", "_") in {
@@ -156,6 +159,7 @@ NRFUTIL_DESTRUCTIVE = {"dfu", "settings"}
def nrfutil_dfu(port: str, package_path: str, confirm: bool = False) -> dict[str, Any]:
_require_confirm(confirm, "nrfutil_dfu")
connection.reject_if_tcp(port, "nrfutil_dfu")
pkg = Path(package_path).expanduser()
if not pkg.is_file():
raise ToolError(f"Package not found: {pkg}")
@@ -213,6 +217,7 @@ def _parse_picotool_info(stdout: str) -> dict[str, Any]:
def picotool_info(port: str | None = None) -> dict[str, Any]:
"""Read device info from a Pico in BOOTSEL mode. `port` is informational
only — picotool auto-detects."""
connection.reject_if_tcp(port, "picotool_info")
binary = config.picotool_bin()
res = _run(binary, ["info", "-a"], timeout=_TIMEOUT_SHORT)
if res["exit_code"] != 0:

View File

@@ -71,6 +71,10 @@ def open_session(
If `env` is supplied, pio resolves baud and filters from platformio.ini.
Otherwise uses the supplied `baud` and `filters` (default `['direct']`).
"""
# Lazy import to avoid circular: registry imports serial_session.
from . import connection
connection.reject_if_tcp(port, "serial_open")
args = ["device", "monitor", "--port", port, "--no-reconnect"]
effective_filters: list[str]
effective_baud: int = baud

View File

@@ -0,0 +1,383 @@
"""TCP transport plumbing in connection.py + devices.py.
Pure-Python tests — no real device or daemon required. Mocks `TCPInterface`
when exercising `connect()`.
"""
from __future__ import annotations
from unittest.mock import patch
import pytest
from meshtastic_mcp import connection, devices
# ---------- helpers --------------------------------------------------------
class TestIsTcpPort:
def test_tcp_scheme(self) -> None:
assert connection.is_tcp_port("tcp://localhost") is True
assert connection.is_tcp_port("tcp://localhost:4403") is True
assert connection.is_tcp_port("tcp://192.168.1.50:9999") is True
def test_serial_paths(self) -> None:
assert connection.is_tcp_port("/dev/cu.usbmodem1234") is False
assert connection.is_tcp_port("/dev/ttyUSB0") is False
assert connection.is_tcp_port("COM3") is False
def test_empty_or_none(self) -> None:
assert connection.is_tcp_port(None) is False
assert connection.is_tcp_port("") is False
class TestParseTcpPort:
def test_default_port(self) -> None:
assert connection.parse_tcp_port("tcp://localhost") == ("localhost", 4403)
def test_explicit_port(self) -> None:
assert connection.parse_tcp_port("tcp://localhost:9999") == (
"localhost",
9999,
)
def test_ip_with_port(self) -> None:
assert connection.parse_tcp_port("tcp://192.168.1.50:4403") == (
"192.168.1.50",
4403,
)
class TestNormalizeTcpEndpoint:
def test_bare_host(self) -> None:
assert connection.normalize_tcp_endpoint("localhost") == "tcp://localhost:4403"
def test_host_port(self) -> None:
assert (
connection.normalize_tcp_endpoint("localhost:5000")
== "tcp://localhost:5000"
)
def test_full_url(self) -> None:
assert (
connection.normalize_tcp_endpoint("tcp://1.2.3.4") == "tcp://1.2.3.4:4403"
)
assert (
connection.normalize_tcp_endpoint("tcp://1.2.3.4:9999")
== "tcp://1.2.3.4:9999"
)
def test_idempotent(self) -> None:
once = connection.normalize_tcp_endpoint("localhost:4403")
twice = connection.normalize_tcp_endpoint(once)
assert once == twice == "tcp://localhost:4403"
def test_path_like_endpoint_rejected(self) -> None:
# Serial port paths and Windows drive paths are common config typos
# (someone passes a serial path to MESHTASTIC_MCP_TCP_HOST). Reject
# rather than producing a nonsense `tcp:///dev/cu.foo:4403` URL.
with pytest.raises(connection.ConnectionError, match="path separator"):
connection.normalize_tcp_endpoint("/dev/cu.foo")
with pytest.raises(connection.ConnectionError):
connection.normalize_tcp_endpoint("tcp:///dev/cu.foo:4403")
with pytest.raises(connection.ConnectionError):
connection.normalize_tcp_endpoint(r"C:\Windows\System32")
def test_non_integer_port_rejected(self) -> None:
with pytest.raises(connection.ConnectionError, match="not an integer"):
connection.normalize_tcp_endpoint("tcp://host:notaport")
with pytest.raises(connection.ConnectionError, match="not an integer"):
connection.normalize_tcp_endpoint("host:notaport")
def test_empty_host_rejected(self) -> None:
with pytest.raises(connection.ConnectionError, match="empty host"):
connection.normalize_tcp_endpoint("tcp://:4403")
def test_port_out_of_range_rejected(self) -> None:
with pytest.raises(connection.ConnectionError, match="out of range"):
connection.normalize_tcp_endpoint("tcp://host:0")
with pytest.raises(connection.ConnectionError, match="out of range"):
connection.normalize_tcp_endpoint("tcp://host:65536")
with pytest.raises(connection.ConnectionError, match="out of range"):
connection.normalize_tcp_endpoint("host:99999")
class TestParseTcpPortValidation:
def test_missing_scheme_rejected(self) -> None:
# parse_tcp_port is a low-level helper that requires the scheme.
# Misuse should fail loudly rather than silently mis-parsing.
with pytest.raises(connection.ConnectionError, match="expected"):
connection.parse_tcp_port("localhost:4403")
def test_negative_port_rejected(self) -> None:
with pytest.raises(connection.ConnectionError, match="out of range"):
connection.parse_tcp_port("tcp://host:-1")
# ---------- reject_if_tcp --------------------------------------------------
class TestRejectIfTcp:
def test_rejects_tcp(self) -> None:
with pytest.raises(connection.ConnectionError, match="not applicable"):
connection.reject_if_tcp("tcp://localhost", "esptool_chip_info")
def test_passes_through_serial(self) -> None:
connection.reject_if_tcp("/dev/cu.usbmodem1", "esptool_chip_info") # no raise
def test_passes_through_none(self) -> None:
# None means "auto-detect"; not the explicit-arg case we guard.
connection.reject_if_tcp(None, "esptool_chip_info") # no raise
# ---------- resolve_port ---------------------------------------------------
class TestResolvePort:
def test_explicit_serial_passthrough(self) -> None:
assert connection.resolve_port("/dev/cu.usbmodem999") == "/dev/cu.usbmodem999"
def test_explicit_tcp_normalized(self) -> None:
assert connection.resolve_port("tcp://localhost") == "tcp://localhost:4403"
def test_no_port_no_devices_errors(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("MESHTASTIC_MCP_TCP_HOST", raising=False)
with patch.object(devices, "list_devices", return_value=[]):
with pytest.raises(
connection.ConnectionError, match="No Meshtastic devices"
):
connection.resolve_port(None)
def test_no_port_one_candidate_selected(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.delenv("MESHTASTIC_MCP_TCP_HOST", raising=False)
fake = [{"port": "/dev/cu.usbmodem1", "likely_meshtastic": True}]
with patch.object(devices, "list_devices", return_value=fake):
assert connection.resolve_port(None) == "/dev/cu.usbmodem1"
def test_no_port_multiple_candidates_errors(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.delenv("MESHTASTIC_MCP_TCP_HOST", raising=False)
fake = [
{"port": "/dev/cu.usbmodem1", "likely_meshtastic": True},
{"port": "/dev/cu.usbmodem2", "likely_meshtastic": True},
]
with patch.object(devices, "list_devices", return_value=fake):
with pytest.raises(connection.ConnectionError, match="Multiple"):
connection.resolve_port(None)
def test_env_var_surfaces_tcp_via_devices(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.setenv("MESHTASTIC_MCP_TCP_HOST", "localhost")
# Don't patch list_devices — let the real env-var path run, but stub
# the USB enumeration to keep the test hermetic.
with patch("meshtastic_mcp.devices.list_ports.comports", return_value=[]):
assert connection.resolve_port(None) == "tcp://localhost:4403"
# ---------- devices.list_devices TCP entry --------------------------------
class TestDevicesTcpEntry:
def test_no_env_var_no_tcp_entry(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("MESHTASTIC_MCP_TCP_HOST", raising=False)
with patch("meshtastic_mcp.devices.list_ports.comports", return_value=[]):
ds = devices.list_devices()
assert all(not d["port"].startswith("tcp://") for d in ds)
def test_env_var_adds_tcp_entry(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("MESHTASTIC_MCP_TCP_HOST", "myhost:9999")
with patch("meshtastic_mcp.devices.list_ports.comports", return_value=[]):
ds = devices.list_devices()
tcp = [d for d in ds if d["port"].startswith("tcp://")]
assert len(tcp) == 1
assert tcp[0]["port"] == "tcp://myhost:9999"
assert tcp[0]["likely_meshtastic"] is True
assert tcp[0]["description"] == "meshtasticd (TCP)"
def test_tcp_entry_first_in_results(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("MESHTASTIC_MCP_TCP_HOST", "localhost")
with patch("meshtastic_mcp.devices.list_ports.comports", return_value=[]):
ds = devices.list_devices()
assert ds, "expected at least the TCP entry"
assert ds[0]["port"].startswith("tcp://")
def test_invalid_env_var_does_not_break_list_devices(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
# `list_devices` is the diagnostic tool reached for when an env var
# isn't working — it must not throw on misconfiguration.
monkeypatch.setenv("MESHTASTIC_MCP_TCP_HOST", "host:notaport")
with patch("meshtastic_mcp.devices.list_ports.comports", return_value=[]):
ds = devices.list_devices(include_unknown=True)
tcp = [d for d in ds if "TCP" in (d["description"] or "")]
assert len(tcp) == 1
assert tcp[0]["likely_meshtastic"] is False
assert "invalid MESHTASTIC_MCP_TCP_HOST" in tcp[0]["description"]
assert "not an integer" in tcp[0]["description"]
def test_invalid_env_var_excluded_from_resolve_port_autodetect(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
# `likely_meshtastic=False` keeps the bad TCP entry out of the
# auto-select path — `resolve_port(None)` should still report
# "no Meshtastic devices" rather than picking a broken endpoint.
monkeypatch.setenv("MESHTASTIC_MCP_TCP_HOST", "host:notaport")
with patch("meshtastic_mcp.devices.list_ports.comports", return_value=[]):
with pytest.raises(connection.ConnectionError, match="No Meshtastic"):
connection.resolve_port(None)
def test_invalid_env_var_does_not_double_tcp_scheme(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
# If a user mistakenly sets `MESHTASTIC_MCP_TCP_HOST=tcp://host:bad`,
# the diagnostic entry must surface the raw value as-is rather than
# producing `tcp://tcp://host:bad`.
monkeypatch.setenv("MESHTASTIC_MCP_TCP_HOST", "tcp://host:notaport")
with patch("meshtastic_mcp.devices.list_ports.comports", return_value=[]):
ds = devices.list_devices(include_unknown=True)
tcp = [d for d in ds if "TCP" in (d["description"] or "")]
assert len(tcp) == 1
assert tcp[0]["port"] == "tcp://host:notaport"
assert "tcp://tcp://" not in tcp[0]["port"]
def test_invalid_env_var_does_not_pre_empt_real_usb_devices(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
# Sort ordering: a misconfigured TCP env var must NOT take position 0
# ahead of real USB candidates. Position 0 is reserved for the highest
# rank (likely_meshtastic=True), with TCP-before-USB as a tiebreaker
# within rank.
monkeypatch.setenv("MESHTASTIC_MCP_TCP_HOST", "host:notaport")
# Stub a USB Meshtastic candidate (Espressif VID, port present in
# findPorts).
class FakeInfo:
def __init__(self, device: str, vid: int, pid: int) -> None:
self.device = device
self.vid = vid
self.pid = pid
self.description = "Heltec V3"
self.manufacturer = "Espressif"
self.product = "USB JTAG/serial"
self.serial_number = "abc"
fake_port = FakeInfo("/dev/cu.usbmodem4201", 0x303A, 0x1001)
with patch(
"meshtastic_mcp.devices.list_ports.comports", return_value=[fake_port]
), patch(
"meshtastic.util.findPorts",
return_value=["/dev/cu.usbmodem4201"],
):
ds = devices.list_devices(include_unknown=True)
assert ds, "expected at least the USB + TCP entries"
# Real USB candidate must be at position 0 — it's likely_meshtastic.
assert ds[0]["port"] == "/dev/cu.usbmodem4201"
assert ds[0]["likely_meshtastic"] is True
# The malformed TCP entry exists but lands among the unlikely entries.
tcp = [d for d in ds if "TCP" in (d["description"] or "")]
assert len(tcp) == 1
assert tcp[0]["likely_meshtastic"] is False
assert ds.index(tcp[0]) > 0
def test_likely_tcp_entry_wins_tiebreak_over_usb(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
# Conversely, a *valid* TCP env var should sort ahead of USB
# candidates of equal likely_meshtastic rank — explicit env-var
# configuration is a precedence signal.
monkeypatch.setenv("MESHTASTIC_MCP_TCP_HOST", "localhost:4403")
class FakeInfo:
def __init__(self, device: str, vid: int, pid: int) -> None:
self.device = device
self.vid = vid
self.pid = pid
self.description = "Heltec V3"
self.manufacturer = "Espressif"
self.product = "USB JTAG/serial"
self.serial_number = "abc"
fake_port = FakeInfo("/dev/cu.usbmodem4201", 0x303A, 0x1001)
with patch(
"meshtastic_mcp.devices.list_ports.comports", return_value=[fake_port]
), patch(
"meshtastic.util.findPorts",
return_value=["/dev/cu.usbmodem4201"],
):
ds = devices.list_devices()
assert ds[0]["port"] == "tcp://localhost:4403"
assert ds[0]["likely_meshtastic"] is True
# ---------- connect() routing ---------------------------------------------
class TestConnectRoutesTcp:
def test_connect_uses_tcp_interface_for_tcp_port(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Verify the TCP branch instantiates `TCPInterface(hostname, portNumber)`
and never touches `SerialInterface`."""
# Make sure the env var doesn't leak in and confuse resolve_port.
monkeypatch.delenv("MESHTASTIC_MCP_TCP_HOST", raising=False)
with patch("meshtastic.tcp_interface.TCPInterface") as mock_tcp, patch(
"meshtastic.serial_interface.SerialInterface"
) as mock_serial:
mock_tcp.return_value.close.return_value = None
with connection.connect(port="tcp://example.com:1234", timeout_s=12.0):
pass
mock_tcp.assert_called_once_with(
hostname="example.com",
portNumber=1234,
connectNow=True,
noProto=False,
timeout=12,
)
mock_serial.assert_not_called()
def test_connect_plumbs_timeout_to_serial_interface(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Verify the serial branch also propagates `timeout_s` so callers
passing a custom timeout to `device_info` / `list_nodes` / etc. don't
silently get the library default."""
monkeypatch.delenv("MESHTASTIC_MCP_TCP_HOST", raising=False)
with patch("meshtastic.serial_interface.SerialInterface") as mock_serial, patch(
"meshtastic.tcp_interface.TCPInterface"
) as mock_tcp:
mock_serial.return_value.close.return_value = None
with connection.connect(port="/dev/cu.fake", timeout_s=20.0):
pass
mock_serial.assert_called_once_with(
devPath="/dev/cu.fake",
connectNow=True,
noProto=False,
timeout=20,
)
mock_tcp.assert_not_called()
def test_connect_releases_lock_on_tcp_failure(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.delenv("MESHTASTIC_MCP_TCP_HOST", raising=False)
with patch("meshtastic.tcp_interface.TCPInterface") as mock_tcp:
mock_tcp.side_effect = RuntimeError("boom")
with pytest.raises(RuntimeError, match="boom"):
with connection.connect(port="tcp://locktest:4403"):
pass
# Lock should be released — a second connect attempt must not fail
# with "busy".
with patch("meshtastic.tcp_interface.TCPInterface") as mock_tcp:
mock_tcp.return_value.close.return_value = None
with connection.connect(port="tcp://locktest:4403"):
pass

View File

@@ -522,11 +522,6 @@ size_t PhoneAPI::getFromRadio(uint8_t *buf)
// LOG_INFO("nodeinfo: num=0x%x, lastseen=%u, id=%s, name=%s", nodeInfoForPhone.num, nodeInfoForPhone.last_heard,
// nodeInfoForPhone.user.id, nodeInfoForPhone.user.long_name);
// Occasional progress logging. (readIndex==2 will be true for the first non-us node)
if (readIndex == 2 || readIndex % 20 == 0) {
LOG_DEBUG("nodeinfo: %d/%d", readIndex, nodeDB->getNumMeshNodes());
}
fromRadioScratch.which_payload_variant = meshtastic_FromRadio_node_info_tag;
fromRadioScratch.node_info = infoToSend;
prefetchNodeInfos();
@@ -657,9 +652,11 @@ void PhoneAPI::releaseQueueStatusPhonePacket()
void PhoneAPI::prefetchNodeInfos()
{
bool added = false;
bool wasEmpty = false;
// Keep the queue topped up so BLE reads stay responsive even if DB fetches take a moment.
{
concurrency::LockGuard guard(&nodeInfoMutex);
wasEmpty = nodeInfoQueue.empty();
while (nodeInfoQueue.size() < kNodePrefetchDepth) {
auto nextNode = nodeDB->readNextMeshNode(readIndex);
if (!nextNode)
@@ -673,11 +670,15 @@ void PhoneAPI::prefetchNodeInfos()
info.via_mqtt = isUs ? false : info.via_mqtt;
info.is_favorite = info.is_favorite || isUs;
nodeInfoQueue.push_back(info);
// Log progress here (at fetch time) so readIndex is accurate and each value logs only once.
if (readIndex == 2 || readIndex % 20 == 0) {
LOG_DEBUG("nodeinfo: %d/%d", readIndex, nodeDB->getNumMeshNodes());
}
added = true;
}
}
if (added)
if (added && wasEmpty)
onNowHasData(0);
}

View File

@@ -1,22 +1,34 @@
/*
Adds a WebServer and WebService callbacks to meshtastic as Linux Version. The WebServer & Webservices
runs in a real linux thread beside the portdunio threading emulation. It replaces the complete ESP32
Webserver libs including generation of SSL certifcicates, because the use ESP specific details in
the lib that can't be emulated.
Adds a WebServer and WebService callbacks to meshtastic via the Portduino/native target (Linux and
macOS). The WebServer & Webservices run in a real host thread beside the Portduino threading
emulation. It replaces the complete ESP32 Webserver libs including generation of SSL certificates,
because those libs use ESP-specific details that can't be emulated.
The WebServices adapt to the two major phoneapi functions "handleAPIv1FromRadio,handleAPIv1ToRadio"
The WebServer just adds basaic support to deliver WebContent, so it can be used to
deliver the WebGui definded by the WebClient Project.
The WebServer just adds basic support to deliver WebContent, so it can be used to
deliver the WebGui defined by the WebClient Project.
Steps to get it running:
1.) Add these Linux Libs to the compile and target machine:
Linux (apt):
1.) Add these libs to the compile and target machine:
sudo apt update && \
apt -y install openssl libssl-dev libopenssl libsdl2-dev \
apt -y install openssl libssl-dev libsdl2-dev \
libulfius-dev liborcania-dev
macOS (Homebrew):
1.) Install prerequisites via Homebrew:
brew install ulfius openssl@3
The PlatformIO env (native-macos) picks up compiler/linker flags via
`pkg-config`. In particular, OpenSSL needs `pkg-config --cflags --libs openssl@3`
so both the Homebrew include path and linker flags are provided; ulfius and its
dependencies (liborcania, libyder) are also resolved via `pkg-config`.
2.) Configure the root directory of the web Content in the config.yaml file.
The followinng tags should be included and set at your needs
The following tags should be included and set at your needs
Example entry in the config.yaml
Webserver:
@@ -34,7 +46,10 @@ Author: Marc Philipp Hammermann
mail: marchammermann@googlemail.com
*/
#ifdef PORTDUINO_LINUX_HARDWARE
// Mirrors the guard in PiWebServer.h — see comment there. macOS Homebrew
// provides ulfius + deps; Linux pulls them via apt. Either way, this
// translation unit only compiles when the headers are present.
#ifdef ARCH_PORTDUINO
#if __has_include(<ulfius.h>)
#include "PiWebServer.h"
#include "NodeDB.h"

View File

@@ -1,5 +1,9 @@
#pragma once
#ifdef PORTDUINO_LINUX_HARDWARE
// Portduino webserver is built whenever the ulfius headers are reachable,
// not only on Linux. macOS users can `brew install ulfius` to enable it;
// without ulfius the entire body is skipped and main.cpp's matching
// __has_include guard avoids referencing the type.
#ifdef ARCH_PORTDUINO
#if __has_include(<ulfius.h>)
#include "PhoneAPI.h"
#include "ulfius-cfg.h"

View File

@@ -13,6 +13,7 @@
#include <ErriezCRC32.h>
#include <Utility.h>
#include <assert.h>
#include <cctype>
#include <filesystem>
#include <fstream>
#include <iostream>
@@ -32,6 +33,16 @@
#include <cxxabi.h>
#endif
#ifdef __APPLE__
// Used by getMacAddr()'s macOS fallback to read the en0 link-layer address.
// `getifaddrs()` is the BSD-portable way; `<net/if_dl.h>` provides the
// `sockaddr_dl` cast and the `LLADDR()` macro that points at the 6-byte MAC.
#include <cstring> // strcmp, memcpy
#include <ifaddrs.h>
#include <net/if.h>
#include <net/if_dl.h>
#endif
#include "platform/portduino/USBHal.h"
portduino_config_struct portduino_config;
@@ -155,9 +166,35 @@ void getMacAddr(uint8_t *dmac)
dmac[3] = di.bdaddr.b[2];
dmac[4] = di.bdaddr.b[1];
dmac[5] = di.bdaddr.b[0];
#elif defined(__APPLE__)
// No BlueZ on macOS, but we can fall back to the host's primary
// network interface MAC. `en0` is Wi-Fi on every shipping Mac
// (Ethernet, when present, is en1 or higher), which gives the user
// the same kind of stable, host-derived identifier that the BlueZ
// path provides on Linux. If en0 isn't found or has no MAC, dmac is
// left untouched and the caller's "Blank MAC Address not allowed!"
// check will still fire — preserving existing behavior for users
// who deliberately rely on --hwid or YAML override.
struct ifaddrs *ifap = nullptr;
if (getifaddrs(&ifap) == 0) {
for (struct ifaddrs *p = ifap; p != nullptr; p = p->ifa_next) {
if (p->ifa_addr == nullptr || p->ifa_addr->sa_family != AF_LINK) {
continue;
}
if (strcmp(p->ifa_name, "en0") != 0) {
continue;
}
auto *sdl = reinterpret_cast<struct sockaddr_dl *>(p->ifa_addr);
if (sdl->sdl_alen == 6) {
memcpy(dmac, LLADDR(sdl), 6);
break;
}
}
freeifaddrs(ifap);
}
#else
// No BlueZ on non-Linux hosts (e.g. macOS). Leave dmac at its default;
// the caller can override via the --hwid CLI flag or the YAML config.
// No platform-specific MAC source; leave dmac at its default. Caller
// can override via the --hwid CLI flag or the YAML config.
(void)dmac;
#endif
}
@@ -1056,17 +1093,31 @@ static bool ends_with(std::string_view str, std::string_view suffix)
bool MAC_from_string(std::string mac_str, uint8_t *dmac)
{
mac_str.erase(std::remove(mac_str.begin(), mac_str.end(), ':'), mac_str.end());
if (mac_str.length() == 12) {
dmac[0] = std::stoi(portduino_config.mac_address.substr(0, 2), nullptr, 16);
dmac[1] = std::stoi(portduino_config.mac_address.substr(2, 2), nullptr, 16);
dmac[2] = std::stoi(portduino_config.mac_address.substr(4, 2), nullptr, 16);
dmac[3] = std::stoi(portduino_config.mac_address.substr(6, 2), nullptr, 16);
dmac[4] = std::stoi(portduino_config.mac_address.substr(8, 2), nullptr, 16);
dmac[5] = std::stoi(portduino_config.mac_address.substr(10, 2), nullptr, 16);
return true;
} else {
if (mac_str.length() != 12) {
return false;
}
// Validate every character is a hex digit before parsing. std::stoi
// would otherwise skip leading whitespace and silently truncate at the
// first non-digit, which is too lenient for a MAC address.
for (char c : mac_str) {
if (!isxdigit(static_cast<unsigned char>(c))) {
return false;
}
}
// Parse into a temporary so dmac is not partially modified if a later
// byte fails. At least one caller in getMacAddr() ignores the bool
// return, so leaving stale bytes in dmac on failure would silently
// produce a wrong MAC.
uint8_t tmp[6];
try {
for (int i = 0; i < 6; i++) {
tmp[i] = static_cast<uint8_t>(std::stoi(mac_str.substr(i * 2, 2), nullptr, 16));
}
} catch (const std::exception &) {
return false;
}
memcpy(dmac, tmp, 6);
return true;
}
std::string exec(const char *cmd)

View File

@@ -0,0 +1,195 @@
// Unit tests for MAC_from_string in src/platform/portduino/PortduinoGlue.cpp.
//
// Regression coverage for when the function stripped colons from
// its mac_str parameter but then read bytes from the global
// portduino_config.mac_address. Symptoms: --hwid silently ignored when
// MACAddress: was also set, and SIGABRT (stoi: no conversion) when --hwid
// was used without MACAddress: in config.yaml.
#include "Arduino.h"
#include "TestUtil.h"
#include <cstdint>
#include <cstring>
#include <string>
#include <unity.h>
// Forward-declare instead of including PortduinoGlue.h to avoid pulling in
// LR11x0Interface, USBHal, mesh.pb.h, yaml-cpp, and the full portduino_config
// struct just to test a self-contained string parser. The symbol is defined
// in PortduinoGlue.cpp and resolved at link time.
bool MAC_from_string(std::string mac_str, uint8_t *dmac);
void setUp(void) {}
void tearDown(void) {}
// --- Happy-path parsing ---
void test_colon_separated_uppercase()
{
uint8_t dmac[6] = {0};
TEST_ASSERT_TRUE(MAC_from_string("AA:BB:CC:DD:EE:FF", dmac));
TEST_ASSERT_EQUAL_HEX8(0xAA, dmac[0]);
TEST_ASSERT_EQUAL_HEX8(0xBB, dmac[1]);
TEST_ASSERT_EQUAL_HEX8(0xCC, dmac[2]);
TEST_ASSERT_EQUAL_HEX8(0xDD, dmac[3]);
TEST_ASSERT_EQUAL_HEX8(0xEE, dmac[4]);
TEST_ASSERT_EQUAL_HEX8(0xFF, dmac[5]);
}
void test_colon_separated_lowercase()
{
uint8_t dmac[6] = {0};
TEST_ASSERT_TRUE(MAC_from_string("02:ca:fe:ba:be:01", dmac));
TEST_ASSERT_EQUAL_HEX8(0x02, dmac[0]);
TEST_ASSERT_EQUAL_HEX8(0xCA, dmac[1]);
TEST_ASSERT_EQUAL_HEX8(0xFE, dmac[2]);
TEST_ASSERT_EQUAL_HEX8(0xBA, dmac[3]);
TEST_ASSERT_EQUAL_HEX8(0xBE, dmac[4]);
TEST_ASSERT_EQUAL_HEX8(0x01, dmac[5]);
}
void test_no_colons_packed_hex()
{
// The CLI form produced by some tools — 12 hex chars, no separators.
uint8_t dmac[6] = {0};
TEST_ASSERT_TRUE(MAC_from_string("AABBCCDDEEFF", dmac));
TEST_ASSERT_EQUAL_HEX8(0xAA, dmac[0]);
TEST_ASSERT_EQUAL_HEX8(0xFF, dmac[5]);
}
void test_two_distinct_inputs_yield_distinct_outputs()
{
// Direct regression for the original bug: parsing two different MAC
// strings in succession must produce two different byte sequences.
// Pre-fix, both calls would have produced identical bytes derived from
// the (untouched) global portduino_config.mac_address.
uint8_t a[6] = {0};
uint8_t b[6] = {0};
TEST_ASSERT_TRUE(MAC_from_string("AA:BB:CC:DD:EE:FF", a));
TEST_ASSERT_TRUE(MAC_from_string("02:CA:FE:BA:BE:01", b));
TEST_ASSERT_NOT_EQUAL(0, std::memcmp(a, b, 6));
TEST_ASSERT_EQUAL_HEX8(0xAA, a[0]);
TEST_ASSERT_EQUAL_HEX8(0x02, b[0]);
}
void test_does_not_read_external_state()
{
// The function must derive every byte from its parameter, not from any
// global. Provide a unique MAC and verify all six bytes match the input
// exactly — leaves no room for the function to be smuggling bytes from
// elsewhere.
uint8_t dmac[6] = {0};
TEST_ASSERT_TRUE(MAC_from_string("12:34:56:78:9A:BC", dmac));
const uint8_t expected[6] = {0x12, 0x34, 0x56, 0x78, 0x9A, 0xBC};
TEST_ASSERT_EQUAL_HEX8_ARRAY(expected, dmac, 6);
}
// --- Rejected inputs ---
// Pre-fix, the empty/short cases either crashed (stoi exception on substr("")
// of the empty global) or silently filled dmac with stale bytes. Post-fix,
// the length guard rejects them cleanly with `false` and dmac is unchanged.
void test_empty_string_returns_false()
{
uint8_t dmac[6] = {0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x11};
uint8_t before[6];
std::memcpy(before, dmac, 6);
TEST_ASSERT_FALSE(MAC_from_string("", dmac));
// dmac must be untouched on failure.
TEST_ASSERT_EQUAL_HEX8_ARRAY(before, dmac, 6);
}
void test_too_short_returns_false()
{
uint8_t dmac[6] = {0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x11};
uint8_t before[6];
std::memcpy(before, dmac, 6);
TEST_ASSERT_FALSE(MAC_from_string("AA:BB:CC", dmac));
TEST_ASSERT_EQUAL_HEX8_ARRAY(before, dmac, 6);
}
void test_too_long_returns_false()
{
uint8_t dmac[6] = {0};
// 14 hex chars after colon-strip > 12.
TEST_ASSERT_FALSE(MAC_from_string("AA:BB:CC:DD:EE:FF:00", dmac));
}
void test_only_colons_returns_false()
{
uint8_t dmac[6] = {0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x11};
uint8_t before[6];
std::memcpy(before, dmac, 6);
TEST_ASSERT_FALSE(MAC_from_string(":::::", dmac));
TEST_ASSERT_EQUAL_HEX8_ARRAY(before, dmac, 6);
}
void test_extra_colons_still_parses()
{
// Colon stripping happens before length check, so an unconventional
// grouping that totals 12 hex chars after stripping is still accepted.
uint8_t dmac[6] = {0};
TEST_ASSERT_TRUE(MAC_from_string("AABB:CCDD:EEFF", dmac));
TEST_ASSERT_EQUAL_HEX8(0xAA, dmac[0]);
TEST_ASSERT_EQUAL_HEX8(0xFF, dmac[5]);
}
void test_non_hex_input_returns_false()
{
// 12 chars of non-hex would have made std::stoi throw before the
// try/catch wrapper was added, killing the daemon. Now must return false
// and leave dmac untouched.
uint8_t dmac[6] = {0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x11};
uint8_t before[6];
std::memcpy(before, dmac, 6);
TEST_ASSERT_FALSE(MAC_from_string("ZZ:ZZ:ZZ:ZZ:ZZ:ZZ", dmac));
TEST_ASSERT_EQUAL_HEX8_ARRAY(before, dmac, 6);
}
void test_partial_hex_failure_preserves_dmac()
{
// First five bytes are valid hex; the sixth ("ZZ") is not. Without the
// temp-buffer staging, dmac would be partially overwritten with the five
// good bytes plus stale data in slot 5 — silently producing a wrong MAC
// since the only caller that uses this in getMacAddr() ignores the bool
// return value.
uint8_t dmac[6] = {0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x11};
uint8_t before[6];
std::memcpy(before, dmac, 6);
TEST_ASSERT_FALSE(MAC_from_string("AA:BB:CC:DD:EE:ZZ", dmac));
TEST_ASSERT_EQUAL_HEX8_ARRAY(before, dmac, 6);
}
void test_embedded_non_hex_returns_false()
{
// std::stoi tolerates leading whitespace and a "0x" prefix, so a stray
// space inside a 2-char window like " F" would silently parse as 0xF.
// The per-character isxdigit() pre-check rejects these. The 14-char
// "0xAABBCCDDEEFF" is also rejected by the length check.
uint8_t dmac[6] = {0};
TEST_ASSERT_FALSE(MAC_from_string("AA:BB:CC:DD:EE: F", dmac));
TEST_ASSERT_FALSE(MAC_from_string("0xAABBCCDDEEFF", dmac));
}
// --- Unity lifecycle ---
void setup()
{
initializeTestEnvironment();
UNITY_BEGIN();
RUN_TEST(test_colon_separated_uppercase);
RUN_TEST(test_colon_separated_lowercase);
RUN_TEST(test_no_colons_packed_hex);
RUN_TEST(test_two_distinct_inputs_yield_distinct_outputs);
RUN_TEST(test_does_not_read_external_state);
RUN_TEST(test_empty_string_returns_false);
RUN_TEST(test_too_short_returns_false);
RUN_TEST(test_too_long_returns_false);
RUN_TEST(test_only_colons_returns_false);
RUN_TEST(test_extra_colons_still_parses);
RUN_TEST(test_non_hex_input_returns_false);
RUN_TEST(test_partial_hex_failure_preserves_dmac);
RUN_TEST(test_embedded_non_hex_returns_false);
exit(UNITY_END());
}
void loop() {}

View File

@@ -2,7 +2,7 @@
[portduino_base]
platform =
# renovate: datasource=git-refs depName=platform-native packageName=https://github.com/meshtastic/platform-native gitBranch=develop
https://github.com/meshtastic/platform-native/archive/4ea5e09ac7d51a593e12ec7c1ebb6cd06745ce53.zip
https://github.com/meshtastic/platform-native/archive/cab4b21d902973e43c938dab3cf4844ba02547ec.zip
framework = arduino
build_src_filter =

View File

@@ -125,6 +125,8 @@ test_testing_command =
;
; Prerequisites (Homebrew):
; brew install platformio yaml-cpp libuv openssl@3 libusb argp-standalone pkg-config
; # Optional: enable the HTTP API (PiWebServer) on macOS:
; brew install ulfius
;
; The macOS-side patches now live upstream:
; * meshtastic/platform-native — `String.h`-shadow shim, `-Wno-enum-constexpr-conversion`,
@@ -191,7 +193,16 @@ build_flags = ${portduino_base.build_flags_common}
; style screen-driver hooks scattered through sensor sources.
-DHAS_SCREEN=0
-DMESHTASTIC_EXCLUDE_SCREEN=1
!pkg-config --libs openssl --silence-errors || :
; openssl@3 is the keg-only Homebrew formula; --cflags is required so the
; compiler finds <openssl/*.h> in the Homebrew prefix (not just the linker).
!pkg-config --cflags --libs openssl --silence-errors || :
; PiWebServer (src/mesh/raspihttp/PiWebServer.cpp) auto-engages when ulfius
; headers are reachable via `#if __has_include(<ulfius.h>)`. The `|| :`
; tail keeps the build green when the user hasn't run `brew install ulfius`
; — they just don't get the HTTP API in that case.
!pkg-config --cflags --libs liborcania --silence-errors || :
!pkg-config --cflags --libs libyder --silence-errors || :
!pkg-config --cflags --libs libulfius --silence-errors || :
; src/input/Linux*.{cpp,h} drive evdev (`<linux/input.h>`) which doesn't exist
; on macOS. graphics/Panel_sdl.* and graphics/TFTDisplay.cpp pull LovyanGFX
; (which we lib_ignore on macOS for the <malloc.h> issue). Neither is needed
@@ -206,3 +217,28 @@ build_src_filter = ${native_base.build_src_filter}
lib_ignore =
${portduino_base.lib_ignore}
LovyanGFX
; ---------------------------------------------------------------------------
; Same as [env:native-macos] but built with AddressSanitizer for catching
; use-after-free, leaks, and OOB access during local development. Headless
; (no SDL/X11/libinput) so it stays cheap to build. Mirrors the shape of
; [env:native-tft-debug] but without the TFT/X11 dependencies.
;
; pio run -e native-macos-debug
; .pio/build/native-macos-debug/meshtasticd -s
;
; ASan runtime tuning (set in the shell before launching):
; ASAN_OPTIONS=detect_leaks=1:halt_on_error=0:abort_on_error=1
; MallocStackLogging=1 # macOS: nicer stack traces in malloc reports
; ---------------------------------------------------------------------------
[env:native-macos-debug]
extends = native_base
build_type = debug
build_unflags = ${env:native-macos.build_unflags}
build_flags = ${env:native-macos.build_flags}
-O0
-g
-fsanitize=address
-fno-omit-frame-pointer
build_src_filter = ${env:native-macos.build_src_filter}
lib_ignore = ${env:native-macos.lib_ignore}