StreamInterface: prevent socket/reader-thread leak on handshake failure in __init__

If connect() or waitForConfig() raises during __init__ (handshake timeout,
bad stream, config error), the reader thread started by connect() keeps
running and the underlying stream/socket stays open — but the caller never
receives a reference to the half-initialized instance, so they cannot call
close() themselves. The leak compounds on every retry from a caller's
reconnect loop.

Fix: wrap connect() + waitForConfig() in try/except; call self.close() on
any exception before re-raising. Also guard close() against RuntimeError
from joining an unstarted reader thread (happens when close() runs from
a failed __init__ before connect() could spawn it).

Discovered while debugging a real-world Meshtastic firmware crash where
a passive logger's retrying TCPInterface() calls against a node with
250-entry NodeDB produced a reconnect storm — every retry triggered a
full config+NodeDB dump on the node, compounding heap pressure, which
then exposed null-deref bugs in Router::perhapsDecode / MeshService
(firmware side fixed in meshtastic/firmware#10226 and #10229). The
client-side leak is independent of those firmware bugs and worth fixing
on its own.
This commit is contained in:
nightjoker7
2026-04-22 19:08:32 -05:00
parent cec79a7c1f
commit bfe38ac0c7
2 changed files with 114 additions and 4 deletions

View File

@@ -1,5 +1,6 @@
"""Stream Interface base class
"""
import contextlib
import io
import logging
import threading
@@ -61,9 +62,17 @@ class StreamInterface(MeshInterface):
# Start the reader thread after superclass constructor completes init
if connectNow:
self.connect()
if not noProto:
self.waitForConfig()
try:
self.connect()
if not noProto:
self.waitForConfig()
except Exception:
# If the handshake raises, the caller never receives a reference
# to this instance and cannot call close() themselves. Clean up
# the reader thread + stream here so retries don't leak.
with contextlib.suppress(Exception):
self.close()
raise
def connect(self) -> None:
"""Connect to our radio
@@ -136,7 +145,13 @@ class StreamInterface(MeshInterface):
# reader thread to close things for us
self._wantExit = True
if self._rxThread != threading.current_thread():
self._rxThread.join() # wait for it to exit
try:
self._rxThread.join() # wait for it to exit
except RuntimeError:
# Thread was never started — happens when close() is invoked
# from a failed __init__ before connect() could spawn it.
# Nothing to join; safe to ignore.
pass
def _handleLogByte(self, b):
"""Handle a byte that is part of a log message from the device."""

View File

@@ -18,6 +18,101 @@ def test_StreamInterface():
assert pytest_wrapped_e.type == Exception
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_StreamInterface_close_safe_when_thread_never_started():
"""close() must not raise RuntimeError when called before connect() has started the reader.
Hits the cleanup path used by __init__ when the handshake raises before the
reader thread is started.
"""
iface = StreamInterface(noProto=True, connectNow=False)
iface.stream = MagicMock()
# _rxThread was created in __init__ but never .start()'d. close() should
# detect that and skip join() instead of raising RuntimeError.
iface.close()
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_StreamInterface_init_cleans_up_when_connect_raises():
"""If connect() raises during __init__, close() runs and the original exception propagates."""
cleanup_calls = []
class FailingConnectStream(StreamInterface):
"""Subclass whose connect() raises, to exercise the __init__ cleanup path."""
def __init__(self):
self.stream = MagicMock() # bypass StreamInterface abstract check
super().__init__(noProto=False, connectNow=True)
def connect(self):
raise RuntimeError("simulated handshake failure")
def close(self):
cleanup_calls.append("close")
super().close()
with pytest.raises(RuntimeError, match="simulated handshake failure"):
FailingConnectStream()
assert cleanup_calls == ["close"], "close() should be invoked exactly once on handshake failure"
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_StreamInterface_init_cleans_up_when_waitForConfig_raises():
"""If waitForConfig() raises after a successful connect(), close() runs and exception propagates."""
cleanup_calls = []
class FailingWaitStream(StreamInterface):
"""Subclass whose waitForConfig() raises, to exercise the second leg of cleanup."""
def __init__(self):
self.stream = MagicMock()
super().__init__(noProto=False, connectNow=True)
def connect(self):
# No-op connect — we are simulating handshake-stage failure, not connect-stage.
pass
def waitForConfig(self):
raise TimeoutError("simulated config-handshake timeout")
def close(self):
cleanup_calls.append("close")
super().close()
with pytest.raises(TimeoutError, match="simulated config-handshake timeout"):
FailingWaitStream()
assert cleanup_calls == ["close"], "close() should be invoked exactly once on handshake timeout"
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_StreamInterface_init_cleanup_does_not_shadow_original_exception():
"""If close() itself raises during __init__ cleanup, the original exception still propagates.
The cleanup uses contextlib.suppress(Exception) so that a secondary failure
in close() doesn't replace the real reason for the failed handshake.
"""
class CleanupRaisesStream(StreamInterface):
def __init__(self):
self.stream = MagicMock()
super().__init__(noProto=False, connectNow=True)
def connect(self):
raise RuntimeError("original handshake failure")
def close(self):
raise RuntimeError("secondary close failure — should be suppressed")
with pytest.raises(RuntimeError, match="original handshake failure"):
CleanupRaisesStream()
# Note: This takes a bit, so moving from unit to slow
@pytest.mark.unitslow
@pytest.mark.usefixtures("reset_mt_config")