test: Add HTTP/WS server test coverage

Add black-box integration tests for the HTTP/WS server and rtl_tcp,
plus a gcov coverage helper and supporting CI wiring.

Refs: #3541

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Andrew Berry
2026-06-09 07:28:34 -04:00
committed by Christian W. Zuckschwerdt
parent 3ca23d5a8a
commit 7a04dbdec9
9 changed files with 755 additions and 0 deletions

View File

@@ -78,6 +78,28 @@ jobs:
run: |
./tests/symbolizer.py check
coverage_check_job:
runs-on: ubuntu-latest
name: Run tests with code coverage
steps:
- uses: actions/checkout@v5
- name: Setup
run: |
sudo apt-get update -q -y
sudo apt-get install -q -y --no-install-recommends cmake gcovr curl python3
- name: Build with coverage and run tests
# Builds with -DENABLE_COVERAGE=ON, runs ctest (incl. the HTTP server
# integration/dataflow/ws tests), and writes an HTML report. Exits
# non-zero if any test fails.
run: ./tests/coverage.sh "$GITHUB_WORKSPACE/build-coverage"
- name: Upload coverage report
if: always()
uses: actions/upload-artifact@v4
with:
name: coverage-report
path: build-coverage/coverage-report
if-no-files-found: warn
analyzer_check_job:
# https://github.com/actions/virtual-environments
# - Ubuntu 24.04 ubuntu-24.04

6
.gitignore vendored
View File

@@ -1,6 +1,12 @@
# CMake out-of-tree builds
build*/
# Code coverage artifacts (see tests/coverage.sh)
*.gcda
*.gcno
*.gcov
coverage-report/
# IDE files
.cproject
.settings

View File

@@ -112,6 +112,24 @@ if("${CMAKE_C_COMPILER_ID}" STREQUAL "GNU" AND NOT "13.2.0" VERSION_GREATER CMAK
add_definitions(-fanalyzer)
endif()
# Optional code coverage instrumentation (gcov/llvm-cov), for use with BUILD_TESTING.
# Configure with -DENABLE_COVERAGE=ON, run `ctest`, then generate a report with
# gcov/gcovr/lcov (see tests/coverage.sh). Forces -O0 and disables inlining so
# coverage maps cleanly back to source lines.
option(ENABLE_COVERAGE "Build with code coverage instrumentation (GCC/Clang)" OFF)
if(ENABLE_COVERAGE)
if("${CMAKE_C_COMPILER_ID}" MATCHES "GNU|Clang")
message(STATUS "Code coverage instrumentation enabled")
add_compile_options(--coverage -O0 -g -fno-inline)
# --coverage on the link line pulls in the gcov runtime; set on the flag
# vars rather than add_link_options() to keep cmake_minimum_required (3.10).
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} --coverage")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} --coverage")
else()
message(WARNING "ENABLE_COVERAGE is only supported on GCC/Clang, ignoring")
endif()
endif()
# Shut MSVC up about strdup and strtok
if(MSVC)
ADD_DEFINITIONS(-D_CRT_NONSTDC_NO_DEPRECATE)

View File

@@ -34,6 +34,36 @@ endforeach(testSrc)
########################################################################
add_test(rtl_433_help ../src/rtl_433 -h)
# Black-box test of the HTTP/WS API server. Starts rtl_433 in manual device
# mode (no SDR), exercises each endpoint, and shuts it down cleanly. Skipped
# on Windows (POSIX shell script) and when python3/curl are unavailable.
if(UNIX)
find_program(BASH_PROGRAM sh)
find_program(CURL_PROGRAM curl)
find_program(PYTHON3_PROGRAM python3)
if(BASH_PROGRAM AND CURL_PROGRAM)
add_test(
NAME http_server_integration
COMMAND ${BASH_PROGRAM}
${CMAKE_CURRENT_SOURCE_DIR}/http-integration-test.sh
$<TARGET_FILE:rtl_433>)
# Feeds a synthetic OOK signal over a fake rtl_tcp server so rtl_433 runs
# its live poll loop, then reads the decode back over the WS API. Needs
# python3 for the rtl_tcp server and ws probe.
if(PYTHON3_PROGRAM)
add_test(
NAME http_server_rtltcp
COMMAND ${BASH_PROGRAM}
${CMAKE_CURRENT_SOURCE_DIR}/http-rtltcp-test.sh
$<TARGET_FILE:rtl_433>)
else()
message(STATUS "Skipping http_server_rtltcp test (need python3)")
endif()
else()
message(STATUS "Skipping http_server_integration test (need sh and curl)")
endif()
endif()
########################################################################
# Define style checks
########################################################################

62
tests/coverage.sh Executable file
View File

@@ -0,0 +1,62 @@
#!/bin/sh
#
# Configure, build, and run the test suite with code-coverage instrumentation,
# then produce a coverage report. Focuses the report on the HTTP server but
# gcovr/lcov see the whole tree.
#
# Requires gcovr (apt install gcovr, or pip install gcovr) for the HTML/text
# report; falls back to a raw `gcov` summary if gcovr is not installed.
#
# Usage: tests/coverage.sh [build-dir]
# build-dir defaults to ./build-coverage
set -eu
SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd)
SRC_DIR=$(CDPATH= cd -- "$SCRIPT_DIR/.." && pwd)
BUILD_DIR=${1:-"$SRC_DIR/build-coverage"}
echo ">> Configuring coverage build in $BUILD_DIR"
cmake -S "$SRC_DIR" -B "$BUILD_DIR" \
-DENABLE_COVERAGE=ON \
-DBUILD_TESTING=ON \
-DENABLE_RTLSDR=OFF \
-DENABLE_SOAPYSDR=OFF \
-DCMAKE_BUILD_TYPE=None
echo ">> Building"
cmake --build "$BUILD_DIR" -j"$(nproc 2>/dev/null || echo 2)"
echo ">> Running tests (ctest)"
# Don't abort the report if a test fails; we still want coverage of what ran,
# but remember the status so we can propagate it as our exit code (CI should
# fail on a test failure even though the report is still produced/uploaded).
ctest_rc=0
( cd "$BUILD_DIR" && ctest --output-on-failure ) || ctest_rc=$?
[ "$ctest_rc" -eq 0 ] || echo ">> (some tests failed, rc=$ctest_rc; continuing to report)"
echo ">> Generating coverage report"
if command -v gcovr >/dev/null 2>&1; then
mkdir -p "$BUILD_DIR/coverage-report"
# Whole-project summary to the terminal...
gcovr --root "$SRC_DIR" --object-directory "$BUILD_DIR" \
--exclude '.*/mongoose\.c' --exclude '.*/jsmn\.c' --print-summary \
--html-details "$BUILD_DIR/coverage-report/index.html"
echo
echo ">> HTTP server coverage:"
gcovr --root "$SRC_DIR" --object-directory "$BUILD_DIR" \
--filter '.*/http_server\.c' --txt
echo ">> Full HTML report: $BUILD_DIR/coverage-report/index.html"
else
echo ">> gcovr not found; install it (apt: 'apt-get install gcovr', or 'pip install gcovr') for a nice report." >&2
echo ">> Falling back to raw gcov on http_server.c:"
GCDA=$(find "$BUILD_DIR" -name 'http_server.c.gcda' | head -1)
if [ -n "$GCDA" ]; then
( cd "$(dirname "$GCDA")" && gcov -b http_server.c.gcno | sed -n '1,12p' )
else
echo ">> No http_server.c.gcda found - did the server exit cleanly?" >&2
fi
fi
# Propagate the test result so CI fails on a test failure (after reporting).
exit "$ctest_rc"

178
tests/http-integration-test.sh Executable file
View File

@@ -0,0 +1,178 @@
#!/bin/sh
#
# Black-box integration test for the rtl_433 HTTP/WS API server.
#
# Starts rtl_433 in "manual" device mode (-D manual), which runs the HTTP
# server event loop without any SDR hardware or input file, exercises each
# endpoint over the loopback interface, then shuts the server down with
# SIGTERM so it exits cleanly (this is what lets gcov flush .gcda files when
# the binary was built with -DENABLE_COVERAGE=ON).
#
# Usage: http-integration-test.sh [path-to-rtl_433-binary]
# Defaults to ../src/rtl_433 relative to this script.
# Override the port with PORT=NNNN, otherwise a free port is chosen.
set -u
SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd)
RTL_433=${1:-"$SCRIPT_DIR/../src/rtl_433"}
HOST=127.0.0.1
if [ ! -x "$RTL_433" ]; then
echo "ERROR: rtl_433 binary not found or not executable: $RTL_433" >&2
exit 99
fi
# Pick a free TCP port unless one was given, so parallel ctest runs don't collide.
if [ -z "${PORT:-}" ]; then
PORT=$(python3 -c 'import socket; s=socket.socket(); s.bind(("127.0.0.1",0)); print(s.getsockname()[1]); s.close()' 2>/dev/null)
fi
PORT=${PORT:-18433}
BASE="http://$HOST:$PORT"
SERVER_LOG=$(mktemp 2>/dev/null || echo /tmp/rtl_433_http_test.$$.log)
SERVER_PID=""
FAILED=0
PASSED=0
cleanup() {
if [ -n "$SERVER_PID" ] && kill -0 "$SERVER_PID" 2>/dev/null; then
# SIGTERM -> clean shutdown -> atexit/gcov flush. Do NOT use SIGKILL.
kill -TERM "$SERVER_PID" 2>/dev/null
# give it a moment to drain and write coverage data
for _ in 1 2 3 4 5 6 7 8 9 10; do
kill -0 "$SERVER_PID" 2>/dev/null || break
sleep 0.3
done
kill -0 "$SERVER_PID" 2>/dev/null && kill -KILL "$SERVER_PID" 2>/dev/null
wait "$SERVER_PID" 2>/dev/null
fi
rm -f "$SERVER_LOG"
}
trap cleanup EXIT INT TERM
fail() { echo " NOT OK: $1" >&2; FAILED=$((FAILED + 1)); }
pass() { echo " ok: $1"; PASSED=$((PASSED + 1)); }
# expect_status DESC EXPECTED_CODE curl-args...
expect_status() {
desc=$1; want=$2; shift 2
got=$(curl -s -o /dev/null -w '%{http_code}' --max-time 10 "$@")
[ "$got" = "$want" ] && pass "$desc (HTTP $got)" || fail "$desc: expected HTTP $want, got $got"
}
# expect_body_contains DESC SUBSTRING curl-args...
expect_body_contains() {
desc=$1; needle=$2; shift 2
body=$(curl -s --max-time 10 "$@")
case "$body" in
*"$needle"*) pass "$desc (body contains '$needle')" ;;
*) fail "$desc: body did not contain '$needle'"; printf ' got: %.200s\n' "$body" >&2 ;;
esac
}
# expect_stream DESC URL -- streaming endpoints stay open; we only check the
# response headers, tolerating the curl timeout (exit 28) that follows.
expect_stream() {
desc=$1; url=$2
hdr=$(curl -s -D - -o /dev/null --max-time 2 "$url" 2>/dev/null)
case "$hdr" in
*"200 OK"*) pass "$desc (200, stream open)" ;;
*) fail "$desc: no '200 OK' in response headers"; printf ' got: %.200s\n' "$hdr" >&2 ;;
esac
}
echo "Starting rtl_433 HTTP server: $RTL_433 -D manual -F $BASE"
"$RTL_433" -D manual -F "http://$HOST:$PORT" >"$SERVER_LOG" 2>&1 &
SERVER_PID=$!
# Wait for readiness (server prints/serves once the manual loop is up).
ready=0
for _ in $(seq 1 50); do
if ! kill -0 "$SERVER_PID" 2>/dev/null; then
echo "ERROR: server exited during startup" >&2
cat "$SERVER_LOG" >&2
exit 98
fi
if curl -s -o /dev/null --max-time 1 "$BASE/"; then
ready=1
break
fi
sleep 0.2
done
if [ "$ready" != 1 ]; then
echo "ERROR: server did not become ready on $BASE" >&2
cat "$SERVER_LOG" >&2
exit 97
fi
echo "Server ready on port $PORT (pid $SERVER_PID). Running checks:"
# --- static / UI ---
expect_status "GET / returns 200" 200 "$BASE/"
expect_body_contains "GET / serves index html" "DOCTYPE html" "$BASE/"
expect_status "OPTIONS / CORS preflight" 204 -X OPTIONS "$BASE/"
expect_status "GET /ui redirects" 307 "$BASE/ui"
# --- OpenMetrics / Prometheus ---
expect_status "GET /metrics returns 200" 200 "$BASE/metrics"
expect_body_contains "GET /metrics is OpenMetrics" "# TYPE uptime_seconds counter" "$BASE/metrics"
# Non-GET is rejected; send an explicit (empty) body so mongoose frames the
# request rather than waiting for a body that curl's bare -X POST never sends.
expect_status "POST /metrics is rejected" 405 -d '' "$BASE/metrics"
# --- /cmd RPC (GET query string and POST form) ---
expect_body_contains "GET /cmd?cmd=get_meta" "\"samp_rate\"" "$BASE/cmd?cmd=get_meta"
expect_status "GET /cmd?cmd=get_sample_rate" 200 "$BASE/cmd?cmd=get_sample_rate"
expect_body_contains "POST /cmd form get_meta" "\"samp_rate\"" -d "cmd=get_meta" "$BASE/cmd"
expect_status "GET /cmd with no cmd" 200 "$BASE/cmd"
# --- more RPC methods, to exercise the rpc_exec getter/setter branches ---
expect_body_contains "GET /cmd get_protocols" "Nice" "$BASE/cmd?cmd=get_protocols"
expect_status "GET /cmd get_stats" 200 "$BASE/cmd?cmd=get_stats"
expect_status "GET /cmd get_center_frequency" 200 "$BASE/cmd?cmd=get_center_frequency"
expect_status "GET /cmd setter center_frequency" 200 "$BASE/cmd?cmd=center_frequency&val=433920000"
expect_body_contains "GET /cmd unknown method is rejected" "Unknown method" "$BASE/cmd?cmd=no_such_method"
# --- JSON-RPC ---
expect_status "POST /jsonrpc valid method" 200 \
-H 'Content-Type: application/json' \
-d '{"jsonrpc":"2.0","id":1,"method":"get_sample_rate"}' "$BASE/jsonrpc"
expect_body_contains "POST /jsonrpc method with params" "result" \
-H 'Content-Type: application/json' \
-d '{"jsonrpc":"2.0","id":2,"method":"sample_rate","params":[1024000]}' "$BASE/jsonrpc"
expect_status "POST /jsonrpc malformed body does not crash" 200 \
-H 'Content-Type: application/json' -d 'not json at all' "$BASE/jsonrpc"
# --- streaming endpoints ---
expect_stream "GET /stream opens" "$BASE/stream"
expect_stream "GET /events opens" "$BASE/events"
# --- WebSocket: handshake, meta push, and an RPC round-trip ---
if command -v python3 >/dev/null 2>&1; then
if ws_out=$(python3 "$SCRIPT_DIR/ws-probe.py" "$HOST" "$PORT" 10 2>&1); then
pass "websocket handshake + meta + rpc round-trip"
else
fail "websocket probe failed"
printf '%s\n' "$ws_out" | sed 's/^/ /' >&2
fi
else
echo " skip: websocket probe (python3 not found)"
fi
# --- robustness: unknown path must not crash the server ---
curl -s -o /dev/null --max-time 2 "$BASE/no/such/path" 2>/dev/null
if kill -0 "$SERVER_PID" 2>/dev/null; then
pass "server still alive after unknown path"
else
fail "server died after request to unknown path"
fi
echo
echo "Results: $PASSED passed, $FAILED failed"
if [ "$FAILED" -ne 0 ]; then
echo "--- server log ---" >&2
cat "$SERVER_LOG" >&2
exit 1
fi
exit 0

141
tests/http-rtltcp-test.sh Executable file
View File

@@ -0,0 +1,141 @@
#!/bin/sh
#
# End-to-end read-back test of the rtl_433 HTTP/WS API over the *live* input
# path.
#
# http-integration-test.sh can't do this: it serves an idle server (no decodes).
# The one-shot -y/-r file path can't either -- it exit()s as soon as the input
# is consumed (before the mg_mgr_poll loop), so no client could connect in time.
# Only the live SDR/poll loop keeps the server up while a decode flows, which is
# what the fake rtl_tcp source below gives us.
#
# Here a fake rtl_tcp server (tests/rtl_tcp_serve.py) streams a synthetic OOK
# signal, so rtl_433 runs through its live SDR/poll loop and the HTTP server
# stays up. We feed a known Nice Flor-s vector, wait for the decode, then connect
# a WebSocket client and assert the event comes back in the server's history
# replay -- a genuine event -> history -> broadcast -> wire round-trip.
#
# Usage: http-rtltcp-test.sh [path-to-rtl_433-binary]
set -u
SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd)
RTL_433=${1:-"$SCRIPT_DIR/../src/rtl_433"}
HOST=127.0.0.1
if [ ! -x "$RTL_433" ]; then
echo "ERROR: rtl_433 binary not found or not executable: $RTL_433" >&2
exit 99
fi
# A documented Nice Flor-s test vector (see src/devices/nice_flor_s.c). The
# bitstring is 0xe7a760b94372e (52 bits) as the pulse slicer sees it;
# rtl_tcp_serve.py renders short=1 / long=0 OOK pulses.
DEVICE_NAME="Nice Flor-s remote control"
BITS="1110011110100111011000001011100101000011011100101110"
EXPECT_MODEL="Nice-FlorS"
PROTO=$("$RTL_433" -R help 2>&1 \
| grep "$DEVICE_NAME" \
| grep -oE '\[[0-9]+\]' | head -1 | tr -d '[]')
if [ -z "$PROTO" ]; then
echo "ERROR: could not find protocol number for '$DEVICE_NAME'" >&2
exit 98
fi
echo "Resolved '$DEVICE_NAME' to protocol -R $PROTO"
# Pick two free ports (HTTP API + fake rtl_tcp) so parallel ctest runs don't collide.
pick_port() {
python3 -c 'import socket; s=socket.socket(); s.bind(("127.0.0.1",0)); print(s.getsockname()[1]); s.close()' 2>/dev/null
}
HTTPPORT=${HTTPPORT:-$(pick_port)}
TCPPORT=${TCPPORT:-$(pick_port)}
HTTPPORT=${HTTPPORT:-18433}
TCPPORT=${TCPPORT:-18533}
BASE="http://$HOST:$HTTPPORT"
SERVER_LOG=$(mktemp 2>/dev/null || echo /tmp/rtl_433_rtltcp_test.$$.log)
TCP_LOG=$(mktemp 2>/dev/null || echo /tmp/rtl_433_rtltcp_srv.$$.log)
RTL_PID=""
TCP_PID=""
cleanup() {
if [ -n "$RTL_PID" ] && kill -0 "$RTL_PID" 2>/dev/null; then
# SIGTERM -> clean shutdown -> atexit/gcov flush. Do NOT use SIGKILL.
kill -TERM "$RTL_PID" 2>/dev/null
for _ in 1 2 3 4 5 6 7 8 9 10; do
kill -0 "$RTL_PID" 2>/dev/null || break
sleep 0.3
done
kill -0 "$RTL_PID" 2>/dev/null && kill -KILL "$RTL_PID" 2>/dev/null
wait "$RTL_PID" 2>/dev/null
fi
if [ -n "$TCP_PID" ] && kill -0 "$TCP_PID" 2>/dev/null; then
kill -TERM "$TCP_PID" 2>/dev/null
wait "$TCP_PID" 2>/dev/null
fi
rm -f "$SERVER_LOG" "$TCP_LOG"
}
trap cleanup EXIT INT TERM
# --- start the fake rtl_tcp server and wait until it is listening ---
echo "Starting fake rtl_tcp server on $HOST:$TCPPORT"
python3 "$SCRIPT_DIR/rtl_tcp_serve.py" --port "$TCPPORT" --bits "$BITS" \
>"$TCP_LOG" 2>&1 &
TCP_PID=$!
listening=0
for _ in $(seq 1 50); do
if ! kill -0 "$TCP_PID" 2>/dev/null; then
echo "ERROR: rtl_tcp server exited during startup" >&2
cat "$TCP_LOG" >&2
exit 96
fi
grep -q "LISTENING" "$TCP_LOG" 2>/dev/null && { listening=1; break; }
sleep 0.1
done
if [ "$listening" != 1 ]; then
echo "ERROR: rtl_tcp server did not start listening" >&2
cat "$TCP_LOG" >&2
exit 96
fi
# --- start rtl_433 against it, with HTTP API + json so we can observe the decode ---
echo "Starting rtl_433: -d rtl_tcp:$HOST:$TCPPORT -R $PROTO -F $BASE"
"$RTL_433" -d "rtl_tcp:$HOST:$TCPPORT" -R "$PROTO" \
-F "http://$HOST:$HTTPPORT" -F json >"$SERVER_LOG" 2>&1 &
RTL_PID=$!
# Wait for the HTTP server to come up *and* for the decode to land (the signal
# plays ~0.5 s into the stream). Polling the json output makes the WS read-back
# deterministic: once the event is decoded it is in the history ring for replay.
ready=0
for _ in $(seq 1 100); do
if ! kill -0 "$RTL_PID" 2>/dev/null; then
echo "ERROR: rtl_433 exited during startup" >&2
cat "$SERVER_LOG" >&2
exit 95
fi
if grep -q "$EXPECT_MODEL" "$SERVER_LOG" 2>/dev/null \
&& curl -s -o /dev/null --max-time 1 "$BASE/"; then
ready=1
break
fi
sleep 0.2
done
if [ "$ready" != 1 ]; then
echo "ERROR: never saw a '$EXPECT_MODEL' decode with the HTTP server up" >&2
cat "$SERVER_LOG" >&2
exit 94
fi
echo "Decode observed and HTTP server ready on $BASE"
# --- the actual assertion: the decode is retrievable over the WS API ---
echo "--- ws read-back ---"
if python3 "$SCRIPT_DIR/ws-probe.py" "$HOST" "$HTTPPORT" 10 "$EXPECT_MODEL"; then
echo "ok: '$EXPECT_MODEL' decode round-tripped through the HTTP/WS API"
exit 0
else
echo "NOT OK: decode did not come back over the WS API" >&2
cat "$SERVER_LOG" >&2
exit 1
fi

137
tests/rtl_tcp_serve.py Executable file
View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
"""Minimal fake rtl_tcp server for black-box testing rtl_433's live input path.
Speaks just enough of the rtl_tcp protocol to feed rtl_433 a synthetic OOK
signal over TCP. This drives rtl_433 through its live SDR/poll loop -- which
keeps the HTTP/WS server responsive -- instead of the one-shot `-r`/`-y` file
path that exit()s as soon as the input is consumed (rtl_433.c, the test-data
block that ends in exit(0), runs *before* the mg_mgr_poll loop). That makes it
possible to feed a decode and then read it back over the HTTP API while the
server is still up.
rtl_tcp wire protocol (only the parts rtl_433 needs):
server -> client: 12-byte header b"RTL0" + uint32be tuner_type + uint32be gain_count
client -> server: 5-byte commands (1 byte cmd + uint32be param) -- drained and ignored
server -> client: raw CU8 IQ sample stream (interleaved unsigned 8-bit I,Q)
The signal is synthesized as OOK_PULSE_PWM to match the rtl_433 pulse slicer:
short pulse = bit 1, long pulse = bit 0, an optional trailing sync pulse to
close the row, framed by a lead-in gap (long enough to settle the detector's
noise estimate) and a final reset gap. After the burst it streams silence so the
TCP connection -- and therefore rtl_433's server -- stays up until the client
goes away or we are signalled.
Pure standard library so it runs in CI without extra packages.
Usage: rtl_tcp_serve.py --port N --bits BITSTRING [options]
--port N TCP port to listen on (required; use the same one in -d rtl_tcp:...)
--bits S data bits as a string of '0'/'1' (short=1, long=0)
--rate HZ sample rate the signal is generated for (default 250000)
--short US short pulse width (default 500)
--long US long pulse width (default 1000)
--sync US trailing sync pulse width, 0 to omit (default 1500)
--gap US inter-pulse gap (default 500)
--lead-in US leading silence (default 8000; must exceed ~1024 samples)
--reset US trailing silence (default 6000; must exceed the reset_limit)
Prints "LISTENING <port>" to stdout once bound, so a caller can synchronize.
"""
import argparse
import math
import socket
import struct
import sys
import threading
def synth_cu8(bits, rate, short_us, long_us, sync_us, gap_us, lead_in_us, reset_us):
"""Return CU8 bytes for an OOK_PULSE_PWM burst (short=1, long=0)."""
samples_per_us = rate / 1e6
tone_hz = 50000.0 # IF offset so "on" is a real tone, not pure DC
buf = bytearray()
phase = [0.0]
def emit(us, on):
for _ in range(int(round(us * samples_per_us))):
if on:
i = 128 + int(100 * math.cos(phase[0]))
q = 128 + int(100 * math.sin(phase[0]))
phase[0] += 2 * math.pi * tone_hz / rate
else:
i = q = 128
buf.append(max(0, min(255, i)))
buf.append(max(0, min(255, q)))
emit(lead_in_us, False)
for b in bits:
emit(short_us if b == "1" else long_us, True)
emit(gap_us, False)
if sync_us > 0:
emit(sync_us, True) # closes the data row -> trailing empty row
emit(reset_us, False)
return bytes(buf)
def drain_commands(conn):
"""Read and discard the 5-byte rtl_tcp commands rtl_433 sends (set freq,
rate, gain, ...). We honor none of them; the signal is pre-synthesized."""
try:
while True:
if not conn.recv(4096):
return
except OSError:
return
def serve(port, payload):
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind(("127.0.0.1", port))
srv.listen(1)
bound = srv.getsockname()[1]
print("LISTENING %d" % bound, flush=True)
conn, _ = srv.accept()
srv.close()
try:
# rtl_tcp dongle-info header: magic + tuner type (5 = R820T) + gain count.
conn.sendall(b"RTL0" + struct.pack(">II", 5, 0))
threading.Thread(target=drain_commands, args=(conn,), daemon=True).start()
conn.sendall(payload)
# Keep the stream (and thus rtl_433's server) alive with silence until
# the client disconnects or we're killed.
silence = bytes([128]) * 16384
while True:
conn.sendall(silence)
except OSError:
pass # client went away -- normal shutdown
finally:
try:
conn.close()
except OSError:
pass
def main():
p = argparse.ArgumentParser()
p.add_argument("--port", type=int, required=True)
p.add_argument("--bits", required=True)
p.add_argument("--rate", type=int, default=250000)
p.add_argument("--short", type=float, default=500)
p.add_argument("--long", type=float, default=1000)
p.add_argument("--sync", type=float, default=1500)
p.add_argument("--gap", type=float, default=500)
p.add_argument("--lead-in", type=float, default=8000)
p.add_argument("--reset", type=float, default=6000)
a = p.parse_args()
if any(c not in "01" for c in a.bits):
print("error: --bits must be a string of 0/1", file=sys.stderr)
return 2
payload = synth_cu8(a.bits, a.rate, a.short, a.long, a.sync,
a.gap, getattr(a, "lead_in"), a.reset)
serve(a.port, payload)
return 0
if __name__ == "__main__":
sys.exit(main())

161
tests/ws-probe.py Executable file
View File

@@ -0,0 +1,161 @@
#!/usr/bin/env python3
"""Minimal WebSocket probe for the rtl_433 HTTP/WS API.
Connects to a running rtl_433 HTTP server (started elsewhere, e.g. with
`-D manual -F http://host:port`), completes the WebSocket handshake, reads the
meta frame the server pushes on connect, then sends a JSON-RPC command frame
and reads the reply. This exercises the server's WebSocket code paths
(handshake -> meta/history push -> handle_ws_rpc -> rpc_response_ws).
Pure standard library so it runs in CI without extra packages.
Usage: ws-probe.py HOST PORT [timeout_seconds] [expect_substring]
If expect_substring is given, the probe also asserts that a pushed frame (the
history the server replays on connect, or a live broadcast) contains it -- used
to confirm a decoded event is retrievable over the WS API.
Exit 0 on success, non-zero otherwise; received text frames go to stdout.
"""
import base64
import json
import os
import socket
import sys
OP_TEXT = 0x1
OP_CLOSE = 0x8
OP_PING = 0x9
def recv_exact(sock, n):
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise EOFError("connection closed by server")
buf += chunk
return buf
def read_frame(sock):
"""Read one WebSocket frame, return (opcode, payload_bytes)."""
b0, b1 = recv_exact(sock, 2)
opcode = b0 & 0x0F
masked = b1 & 0x80
length = b1 & 0x7F
if length == 126:
length = int.from_bytes(recv_exact(sock, 2), "big")
elif length == 127:
length = int.from_bytes(recv_exact(sock, 8), "big")
mask = recv_exact(sock, 4) if masked else b""
payload = recv_exact(sock, length) if length else b""
if masked:
payload = bytes(p ^ mask[i % 4] for i, p in enumerate(payload))
return opcode, payload
def send_text(sock, text):
"""Send a masked text frame (clients MUST mask, per RFC 6455)."""
data = text.encode("utf-8")
n = len(data)
header = bytes([0x80 | OP_TEXT]) # FIN + text
if n < 126:
header += bytes([0x80 | n])
elif n < 65536:
header += bytes([0x80 | 126]) + n.to_bytes(2, "big")
else:
header += bytes([0x80 | 127]) + n.to_bytes(8, "big")
mask = os.urandom(4)
masked = bytes(b ^ mask[i % 4] for i, b in enumerate(data))
sock.sendall(header + mask + masked)
def handshake(sock, host, port):
key = base64.b64encode(os.urandom(16)).decode()
req = (
"GET / HTTP/1.1\r\n"
f"Host: {host}:{port}\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
f"Sec-WebSocket-Key: {key}\r\n"
"Sec-WebSocket-Version: 13\r\n"
"\r\n"
)
sock.sendall(req.encode())
resp = b""
while b"\r\n\r\n" not in resp:
resp += recv_exact(sock, 1)
status = resp.split(b"\r\n", 1)[0].decode(errors="replace")
if "101" not in status:
raise RuntimeError(f"handshake failed: {status!r}")
def read_text_frame(sock):
"""Read frames until a text frame arrives, answering pings, ignoring close."""
while True:
opcode, payload = read_frame(sock)
if opcode == OP_TEXT:
return payload.decode("utf-8", errors="replace")
if opcode == OP_PING:
continue # server ping; ignore for this short-lived probe
if opcode == OP_CLOSE:
raise EOFError("server sent close frame")
def read_until_contains(sock, needle, max_frames=20):
"""Read text frames until one contains needle. The server also broadcasts
log messages over the same socket, so skip frames that aren't the one we
want. Returns the matching frame or None if max_frames is exhausted."""
for _ in range(max_frames):
frame = read_text_frame(sock)
print(" <<", frame)
if needle in frame:
return frame
return None
def main():
if len(sys.argv) < 3:
print("usage: ws-probe.py HOST PORT [timeout]", file=sys.stderr)
return 2
host = sys.argv[1]
port = int(sys.argv[2])
timeout = float(sys.argv[3]) if len(sys.argv) > 3 else 10.0
expect = sys.argv[4] if len(sys.argv) > 4 else None
sock = socket.create_connection((host, port), timeout=timeout)
sock.settimeout(timeout)
try:
handshake(sock, host, port)
# On connect the server pushes the meta frame (and history, empty here).
if read_until_contains(sock, "center_frequency") is None:
print("FAIL: never received a meta frame with 'center_frequency'", file=sys.stderr)
return 1
# If asked, confirm a decoded event is retrievable over the WS API: the
# server replays its history ring on connect, so a decode that already
# happened arrives as a pushed frame (a live broadcast also matches).
if expect is not None:
if read_until_contains(sock, expect, max_frames=200) is None:
print("FAIL: never received a frame containing %r" % expect, file=sys.stderr)
return 1
# The WebSocket RPC uses the simple {"cmd": ...} form (json_parse),
# NOT the JSON-RPC envelope the /jsonrpc HTTP endpoint expects.
send_text(sock, json.dumps({"cmd": "get_sample_rate"}))
reply = read_until_contains(sock, "\"result\"")
if reply is None:
print("FAIL: no rpc reply containing 'result'", file=sys.stderr)
return 1
print("OK: websocket handshake, meta push, and rpc round-trip")
return 0
finally:
try:
sock.close()
except OSError:
pass
if __name__ == "__main__":
sys.exit(main())