From 07172f88f38e806fc95b102e3a83eae0c2527320 Mon Sep 17 00:00:00 2001 From: Stephen Thorne Date: Thu, 5 Feb 2026 21:53:18 +0100 Subject: [PATCH 1/5] Give TCPInterface reconnect logic on write errors * Moving to socket.sendall() is safer, as sendall will send the entire buffer, while send() would return the number of bytes sent and require being called multiple times if the buffer was full. * On exceptions: reconnect to the server. * On reconnection: make sure using a lock that there isn't a race between the readers and the writers triggering a reconnect. --- meshtastic/tcp_interface.py | 68 +++++++++++++++++--------- meshtastic/tests/test_tcp_interface.py | 41 ++++++++++++++++ protobufs | 1 - 3 files changed, 87 insertions(+), 23 deletions(-) delete mode 160000 protobufs diff --git a/meshtastic/tcp_interface.py b/meshtastic/tcp_interface.py index 732f37e..5d27929 100644 --- a/meshtastic/tcp_interface.py +++ b/meshtastic/tcp_interface.py @@ -4,6 +4,7 @@ import contextlib import logging import socket +import threading import time from typing import Optional @@ -12,6 +13,7 @@ from meshtastic.stream_interface import StreamInterface DEFAULT_TCP_PORT = 4403 logger = logging.getLogger(__name__) + class TCPInterface(StreamInterface): """Interface class for meshtastic devices over a TCP link""" @@ -19,10 +21,10 @@ class TCPInterface(StreamInterface): self, hostname: str, debugOut=None, - noProto: bool=False, - connectNow: bool=True, - portNumber: int=DEFAULT_TCP_PORT, - noNodes:bool=False, + noProto: bool = False, + connectNow: bool = True, + portNumber: int = DEFAULT_TCP_PORT, + noNodes: bool = False, timeout: int = 300, ): """Constructor, opens a connection to a specified IP address/hostname @@ -38,13 +40,20 @@ class TCPInterface(StreamInterface): self.portNumber: int = portNumber self.socket: Optional[socket.socket] = None + self.reconnectLock = threading.Lock() if connectNow: self.myConnect() else: self.socket = None - super().__init__(debugOut=debugOut, noProto=noProto, connectNow=connectNow, noNodes=noNodes, timeout=timeout) + super().__init__( + debugOut=debugOut, + noProto=noProto, + connectNow=connectNow, + noNodes=noNodes, + timeout=timeout, + ) def __repr__(self): rep = f"TCPInterface({self.hostname!r}" @@ -69,29 +78,35 @@ class TCPInterface(StreamInterface): self.socket.shutdown(socket.SHUT_RDWR) def myConnect(self) -> None: - """Connect to socket""" - logger.debug(f"Connecting to {self.hostname}") # type: ignore[str-bytes-safe] + """Connect to socket.""" + logger.debug(f"Connecting to {self.hostname}") # type: ignore[str-bytes-safe] server_address = (self.hostname, self.portNumber) self.socket = socket.create_connection(server_address) def close(self) -> None: - """Close a connection to the device""" + """Close a connection to the device.""" logger.debug("Closing TCP stream") super().close() # Sometimes the socket read might be blocked in the reader thread. # Therefore we force the shutdown by closing the socket here self._wantExit = True if self.socket is not None: - with contextlib.suppress(Exception): # Ignore errors in shutdown, because we might have a race with the server + with contextlib.suppress( + Exception + ): # Ignore errors in shutdown, because we might have a race with the server self._socket_shutdown() self.socket.close() self.socket = None def _writeBytes(self, b: bytes) -> None: - """Write an array of bytes to our stream and flush""" + """Write an array of bytes to our stream""" if self.socket is not None: - self.socket.send(b) + try: + self.socket.sendall(b) + except OSError as e: + logger.error(f"Socket send error, reconnecting: {e}") + self._reconnect() def _readBytes(self, length) -> Optional[bytes]: """Read an array of bytes from our stream""" @@ -99,19 +114,28 @@ class TCPInterface(StreamInterface): data = self.socket.recv(length) # empty byte indicates a disconnected socket, # we need to handle it to avoid an infinite loop reading from null socket - if data == b'': - logger.debug("dead socket, re-connecting") - # cleanup and reconnect socket without breaking reader thread - with contextlib.suppress(Exception): - self._socket_shutdown() - self.socket.close() - self.socket = None - time.sleep(1) - self.myConnect() - self._startConfig() - return None + if data == b"": + logger.debug("Closed socket, re-connecting") + self._reconnect() return data # no socket, break reader thread self._wantExit = True return None + + def _reconnect(self) -> None: + """Reconnect to the socket""" + # Save the socket reference before attempting to acquire the lock. + sock = self.socket + with self.reconnectLock: + # Don't reconnect: someone else already did it. + if sock is not self.socket: + return + + with contextlib.suppress(Exception): + self._socket_shutdown() + self.socket.close() + self.socket = None + time.sleep(1) + self.myConnect() + self._startConfig() diff --git a/meshtastic/tests/test_tcp_interface.py b/meshtastic/tests/test_tcp_interface.py index 44e79de..e123833 100644 --- a/meshtastic/tests/test_tcp_interface.py +++ b/meshtastic/tests/test_tcp_interface.py @@ -54,3 +54,44 @@ def test_TCPInterface_without_connecting(): with patch("socket.socket"): iface = TCPInterface(hostname="localhost", noProto=True, connectNow=False) assert iface.socket is None + + +@pytest.mark.unit +def test_TCPInterface_reconnect(): + """Test that _reconnect correctly reconnects""" + with patch("socket.socket") as mock_socket: + with patch("time.sleep"): + iface = TCPInterface(hostname="localhost", noProto=True) + old_socket = iface.socket + assert old_socket is not None + + iface._reconnect() + + assert old_socket.close.called + # We expect socket class to be instantiated at least twice (init + reconnect) + assert mock_socket.call_count >= 2 + + +@pytest.mark.unit +def test_TCPInterface_writeBytes_reconnects(): + """Test that _writeBytes calls _reconnect on OSError""" + with patch("socket.socket"): + iface = TCPInterface(hostname="localhost", noProto=True) + iface.socket.sendall.side_effect = OSError("Broken pipe") + + with patch.object(iface, '_reconnect') as mock_reconnect: + iface._writeBytes(b"some data") + mock_reconnect.assert_called_once() + + +@pytest.mark.unit +def test_TCPInterface_readBytes_reconnects(): + """Test that _readBytes calls _reconnect on empty bytes""" + with patch("socket.socket"): + iface = TCPInterface(hostname="localhost", noProto=True) + # Mock the socket instance on the interface + iface.socket.recv.return_value = b'' + + with patch.object(iface, '_reconnect') as mock_reconnect: + iface._readBytes(10) + mock_reconnect.assert_called_once() diff --git a/protobufs b/protobufs deleted file mode 160000 index 77c8329..0000000 --- a/protobufs +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 77c8329a59a9c96a61c447b5d5f1a52ca583e4f2 From 02485a88fbc682647163d28f08d374e7bc480421 Mon Sep 17 00:00:00 2001 From: Ian McEwen Date: Sun, 31 May 2026 14:23:34 -0700 Subject: [PATCH 2/5] some pre-merge cleanup --- meshtastic/tcp_interface.py | 10 ++++++---- meshtastic/tests/test_tcp_interface.py | 24 +++++++++++++++++++++++- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/meshtastic/tcp_interface.py b/meshtastic/tcp_interface.py index 5d27929..efa0950 100644 --- a/meshtastic/tcp_interface.py +++ b/meshtastic/tcp_interface.py @@ -86,18 +86,19 @@ class TCPInterface(StreamInterface): def close(self) -> None: """Close a connection to the device.""" logger.debug("Closing TCP stream") - super().close() # Sometimes the socket read might be blocked in the reader thread. - # Therefore we force the shutdown by closing the socket here + # Therefore force a shutdown first to unblock reader thread reads. self._wantExit = True if self.socket is not None: with contextlib.suppress( Exception ): # Ignore errors in shutdown, because we might have a race with the server self._socket_shutdown() - self.socket.close() + with contextlib.suppress(Exception): + self.socket.close() self.socket = None + super().close() def _writeBytes(self, b: bytes) -> None: """Write an array of bytes to our stream""" @@ -134,7 +135,8 @@ class TCPInterface(StreamInterface): with contextlib.suppress(Exception): self._socket_shutdown() - self.socket.close() + if self.socket is not None: + self.socket.close() self.socket = None time.sleep(1) self.myConnect() diff --git a/meshtastic/tests/test_tcp_interface.py b/meshtastic/tests/test_tcp_interface.py index e123833..60bee57 100644 --- a/meshtastic/tests/test_tcp_interface.py +++ b/meshtastic/tests/test_tcp_interface.py @@ -1,7 +1,7 @@ """Meshtastic unit tests for tcp_interface.py""" import re -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest @@ -56,6 +56,28 @@ def test_TCPInterface_without_connecting(): assert iface.socket is None +@pytest.mark.unit +def test_TCPInterface_close_shutdowns_socket_before_super_close(): + """Close should unblock socket reads before waiting on StreamInterface.close().""" + iface = TCPInterface(hostname="localhost", noProto=True, connectNow=False) + sock = MagicMock() + iface.socket = sock + call_order = [] + + with patch.object(TCPInterface, "_socket_shutdown", autospec=True) as mock_shutdown: + with patch( + "meshtastic.stream_interface.StreamInterface.close", autospec=True + ) as mock_super_close: + mock_shutdown.side_effect = lambda _self: call_order.append("shutdown") + mock_super_close.side_effect = lambda _self: call_order.append("super_close") + + iface.close() + + assert call_order == ["shutdown", "super_close"] + sock.close.assert_called_once() + assert iface.socket is None + + @pytest.mark.unit def test_TCPInterface_reconnect(): """Test that _reconnect correctly reconnects""" From 435e53eae2dcafb9f5601da4cf1ab52c8fb4c434 Mon Sep 17 00:00:00 2001 From: Ian McEwen Date: Sun, 31 May 2026 14:53:46 -0700 Subject: [PATCH 3/5] Re-establish the OSError being raised to match former behavior, but still reconnect. TBD if this is quite the right approach. --- meshtastic/tcp_interface.py | 1 + meshtastic/tests/test_tcp_interface.py | 15 ++++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/meshtastic/tcp_interface.py b/meshtastic/tcp_interface.py index 670335c..ee17e97 100644 --- a/meshtastic/tcp_interface.py +++ b/meshtastic/tcp_interface.py @@ -105,6 +105,7 @@ class TCPInterface(StreamInterface): except OSError as e: logger.error(f"Socket send error, reconnecting: {e}") self._reconnect() + raise def _readBytes(self, length) -> Optional[bytes]: """Read an array of bytes from our stream""" diff --git a/meshtastic/tests/test_tcp_interface.py b/meshtastic/tests/test_tcp_interface.py index 60bee57..5857a7c 100644 --- a/meshtastic/tests/test_tcp_interface.py +++ b/meshtastic/tests/test_tcp_interface.py @@ -86,9 +86,9 @@ def test_TCPInterface_reconnect(): iface = TCPInterface(hostname="localhost", noProto=True) old_socket = iface.socket assert old_socket is not None - + iface._reconnect() - + assert old_socket.close.called # We expect socket class to be instantiated at least twice (init + reconnect) assert mock_socket.call_count >= 2 @@ -96,13 +96,14 @@ def test_TCPInterface_reconnect(): @pytest.mark.unit def test_TCPInterface_writeBytes_reconnects(): - """Test that _writeBytes calls _reconnect on OSError""" + """Test that _writeBytes reconnects and re-raises on OSError.""" with patch("socket.socket"): iface = TCPInterface(hostname="localhost", noProto=True) iface.socket.sendall.side_effect = OSError("Broken pipe") - - with patch.object(iface, '_reconnect') as mock_reconnect: - iface._writeBytes(b"some data") + + with patch.object(iface, "_reconnect") as mock_reconnect: + with pytest.raises(OSError, match="Broken pipe"): + iface._writeBytes(b"some data") mock_reconnect.assert_called_once() @@ -113,7 +114,7 @@ def test_TCPInterface_readBytes_reconnects(): iface = TCPInterface(hostname="localhost", noProto=True) # Mock the socket instance on the interface iface.socket.recv.return_value = b'' - + with patch.object(iface, '_reconnect') as mock_reconnect: iface._readBytes(10) mock_reconnect.assert_called_once() From dbf4cabee1a8b9fc9856aacb5fccc5cfc85f7119 Mon Sep 17 00:00:00 2001 From: Ian McEwen Date: Sun, 31 May 2026 15:03:58 -0700 Subject: [PATCH 4/5] Avoid deadlocking on potentially re-entrant _startConfig call, and don't reconnect when _wantExit --- meshtastic/tcp_interface.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/meshtastic/tcp_interface.py b/meshtastic/tcp_interface.py index ee17e97..3118f4d 100644 --- a/meshtastic/tcp_interface.py +++ b/meshtastic/tcp_interface.py @@ -104,7 +104,8 @@ class TCPInterface(StreamInterface): self.socket.sendall(b) except OSError as e: logger.error(f"Socket send error, reconnecting: {e}") - self._reconnect() + if not self._wantExit: + self._reconnect() raise def _readBytes(self, length) -> Optional[bytes]: @@ -115,7 +116,8 @@ class TCPInterface(StreamInterface): # we need to handle it to avoid an infinite loop reading from null socket if data == b"": logger.debug("Closed socket, re-connecting") - self._reconnect() + if not self._wantExit: + self._reconnect() return data # no socket, break reader thread @@ -126,7 +128,10 @@ class TCPInterface(StreamInterface): """Reconnect to the socket""" # Save the socket reference before attempting to acquire the lock. sock = self.socket + start_config = False with self.reconnectLock: + if self._wantExit: + return # Don't reconnect: someone else already did it. if sock is not self.socket: return @@ -138,4 +143,7 @@ class TCPInterface(StreamInterface): self.socket = None time.sleep(1) self.myConnect() + start_config = True + + if start_config and not self._wantExit and self.socket is not None: self._startConfig() From 6909d3d21fc33074e0b01fc4c73aa0477d09a1f0 Mon Sep 17 00:00:00 2001 From: Ian McEwen Date: Sun, 31 May 2026 15:10:37 -0700 Subject: [PATCH 5/5] make test more deterministic for reconnect count testing --- meshtastic/tests/test_tcp_interface.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/meshtastic/tests/test_tcp_interface.py b/meshtastic/tests/test_tcp_interface.py index 5857a7c..4f0fec9 100644 --- a/meshtastic/tests/test_tcp_interface.py +++ b/meshtastic/tests/test_tcp_interface.py @@ -110,11 +110,10 @@ def test_TCPInterface_writeBytes_reconnects(): @pytest.mark.unit def test_TCPInterface_readBytes_reconnects(): """Test that _readBytes calls _reconnect on empty bytes""" - with patch("socket.socket"): - iface = TCPInterface(hostname="localhost", noProto=True) - # Mock the socket instance on the interface - iface.socket.recv.return_value = b'' + iface = TCPInterface(hostname="localhost", noProto=True, connectNow=False) + iface.socket = MagicMock() + iface.socket.recv.return_value = b"" - with patch.object(iface, '_reconnect') as mock_reconnect: - iface._readBytes(10) - mock_reconnect.assert_called_once() + with patch.object(iface, "_reconnect") as mock_reconnect: + iface._readBytes(10) + mock_reconnect.assert_called_once()