"""Meshtastic unit tests for stream_interface.py""" import logging from unittest.mock import MagicMock import pytest from ..stream_interface import StreamInterface # import re @pytest.mark.unit def test_StreamInterface(): """Test that we cannot instantiate a StreamInterface based on noProto""" with pytest.raises(Exception) as pytest_wrapped_e: StreamInterface() assert pytest_wrapped_e.type == RuntimeError @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_StreamInterface_close_safe_when_thread_never_started(): """close() must not raise RuntimeError when called before connect() has started the reader. Hits the cleanup path used by __init__ when the handshake raises before the reader thread is started. """ iface = StreamInterface(noProto=True, connectNow=False) iface.stream = MagicMock() # _rxThread was created in __init__ but never .start()'d. close() should # detect that and skip join() instead of raising RuntimeError. iface.close() @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_StreamInterface_close_when_thread_never_started_closes_stream(): """If no reader thread was started, close() should still close the stream.""" iface = StreamInterface(noProto=True, connectNow=False) stream = MagicMock() iface.stream = stream iface.close() stream.close.assert_called_once() @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_StreamInterface_init_cleans_up_when_connect_raises(): """If connect() raises during __init__, close() runs and the original exception propagates.""" cleanup_calls = [] class FailingConnectStream(StreamInterface): """Subclass whose connect() raises, to exercise the __init__ cleanup path.""" def __init__(self): self.stream = MagicMock() # bypass StreamInterface abstract check super().__init__(noProto=False, connectNow=True) def connect(self): raise RuntimeError("simulated handshake failure") def close(self): cleanup_calls.append("close") super().close() with pytest.raises(RuntimeError, match="simulated handshake failure"): FailingConnectStream() assert cleanup_calls == ["close"], "close() should be invoked exactly once on handshake failure" @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_StreamInterface_init_cleans_up_when_waitForConfig_raises(): """If waitForConfig() raises after a successful connect(), close() runs and exception propagates.""" cleanup_calls = [] class FailingWaitStream(StreamInterface): """Subclass whose waitForConfig() raises, to exercise the second leg of cleanup.""" def __init__(self): self.stream = MagicMock() super().__init__(noProto=False, connectNow=True) def connect(self): # No-op connect — we are simulating handshake-stage failure, not connect-stage. pass def waitForConfig(self): raise TimeoutError("simulated config-handshake timeout") def close(self): cleanup_calls.append("close") super().close() with pytest.raises(TimeoutError, match="simulated config-handshake timeout"): FailingWaitStream() assert cleanup_calls == ["close"], "close() should be invoked exactly once on handshake timeout" @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_StreamInterface_init_cleanup_does_not_shadow_original_exception(): """If close() itself raises during __init__ cleanup, the original exception still propagates. The cleanup uses contextlib.suppress(Exception) so that a secondary failure in close() doesn't replace the real reason for the failed handshake. """ class CleanupRaisesStream(StreamInterface): """Helper stream that raises during close() to verify exception precedence.""" def __init__(self): self.stream = MagicMock() super().__init__(noProto=False, connectNow=True) def connect(self): raise RuntimeError("original handshake failure") def close(self): raise RuntimeError("secondary close failure — should be suppressed") with pytest.raises(RuntimeError, match="original handshake failure"): CleanupRaisesStream() # Note: This takes a bit, so moving from unit to slow @pytest.mark.unitslow @pytest.mark.usefixtures("reset_mt_config") def test_StreamInterface_with_noProto(caplog): """Test that we can instantiate a StreamInterface based on nonProto and we can read/write bytes from a mocked stream """ stream = MagicMock() test_data = b"hello" stream.read.return_value = test_data with caplog.at_level(logging.DEBUG): iface = StreamInterface(noProto=True, connectNow=False) iface.stream = stream iface._writeBytes(test_data) data = iface._readBytes(len(test_data)) assert data == test_data # TODO ### Note: This takes a bit, so moving from unit to slow ### Tip: If you want to see the print output, run with '-s' flag: ### pytest -s meshtastic/tests/test_stream_interface.py::test_sendToRadioImpl # @pytest.mark.unitslow # @pytest.mark.usefixtures("reset_mt_config") # def test_sendToRadioImpl(caplog): # """Test _sendToRadioImpl()""" # ## def add_header(b): ## """Add header stuffs for radio""" ## bufLen = len(b) ## header = bytes([START1, START2, (bufLen >> 8) & 0xff, bufLen & 0xff]) ## return header + b # # # captured raw bytes of a Heltec2.1 radio with 2 channels (primary and a secondary channel named "gpio") # raw_1_my_info = b'\x1a,\x08\xdc\x8c\xd5\xc5\x02\x18\r2\x0e1.2.49.5354c49P\x15]\xe1%\x17Eh\xe0\xa7\x12p\xe8\x9d\x01x\x08\x90\x01\x01' # raw_2_node_info = b'"9\x08\xdc\x8c\xd5\xc5\x02\x12(\n\t!28b5465c\x12\x0cUnknown 465c\x1a\x03?5C"\x06$o(\xb5F\\0\n\x1a\x02 1%M<\xc6a' # # pylint: disable=C0301 # raw_3_node_info = b'"C\x08\xa4\x8c\xd5\xc5\x02\x12(\n\t!28b54624\x12\x0cUnknown 4624\x1a\x03?24"\x06$o(\xb5F$0\n\x1a\x07 5MH<\xc6a%G<\xc6a=\x00\x00\xc0@' # raw_4_complete = b'@\xcf\xe5\xd1\x8c\x0e' # # pylint: disable=C0301 # raw_5_prefs = b'Z6\r\\F\xb5(\x15\\F\xb5("\x1c\x08\x06\x12\x13*\x11\n\x0f0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#\xb8\t\x015]$\xddk5\xd5\x7f!b=M<\xc6aP\x03`F' # # pylint: disable=C0301 # raw_6_channel0 = b'Z.\r\\F\xb5(\x15\\F\xb5("\x14\x08\x06\x12\x0b:\t\x12\x05\x18\x01"\x01\x01\x18\x015^$\xddk5\xd6\x7f!b=M<\xc6aP\x03`F' # # pylint: disable=C0301 # raw_7_channel1 = b'ZS\r\\F\xb5(\x15\\F\xb5("9\x08\x06\x120:.\x08\x01\x12(" \xb4&\xb3\xc7\x06\xd8\xe39%\xba\xa5\xee\x8eH\x06\xf6\xf4H\xe8\xd5\xc1[ao\xb5Y\\\xb4"\xafmi*\x04gpio\x18\x025_$\xddk5\xd7\x7f!b=M<\xc6aP\x03`F' # raw_8_channel2 = b'Z)\r\\F\xb5(\x15\\F\xb5("\x0f\x08\x06\x12\x06:\x04\x08\x02\x12\x005`$\xddk5\xd8\x7f!b=M<\xc6aP\x03`F' # raw_blank = b'' # # test_data = b'hello' # stream = MagicMock() # #stream.read.return_value = add_header(test_data) # stream.read.side_effect = [ raw_1_my_info, raw_2_node_info, raw_3_node_info, raw_4_complete, # raw_5_prefs, raw_6_channel0, raw_7_channel1, raw_8_channel2, # raw_blank, raw_blank] # toRadio = MagicMock() # toRadio.SerializeToString.return_value = test_data # with caplog.at_level(logging.DEBUG): # iface = StreamInterface(noProto=True, connectNow=False) # iface.stream = stream # iface.connect() # iface._sendToRadioImpl(toRadio) # assert re.search(r'Sending: ', caplog.text, re.MULTILINE) # assert re.search(r'reading character', caplog.text, re.MULTILINE) # assert re.search(r'In reader loop', caplog.text, re.MULTILINE)