From 87b95c58932a6d7067dd056d03cc8da5a845bfbf Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Fri, 24 Dec 2021 13:48:10 -0800 Subject: [PATCH 1/7] add more unit tests for onReceive in main --- meshtastic/__main__.py | 6 +- meshtastic/tests/test_main.py | 106 +++++++++++++++++++++++++++++++++- 2 files changed, 105 insertions(+), 7 deletions(-) diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index c5eea07..cbd67c9 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -27,7 +27,7 @@ def onReceive(packet, interface): args = our_globals.get_args() try: d = packet.get('decoded') - logging.debug(f'd:{d}') + logging.debug(f'in onReceive() d:{d}') # Exit once we receive a reply if args and args.sendtext and packet["to"] == interface.myInfo.my_node_num and d["portnum"] == portnums_pb2.PortNum.TEXT_MESSAGE_APP: @@ -37,12 +37,10 @@ def onReceive(packet, interface): if args and args.reply: msg = d.get('text') if msg: - #shortName = packet['decoded']['shortName'] rxSnr = packet['rxSnr'] hopLimit = packet['hopLimit'] print(f"message: {msg}") - reply = "got msg \'{}\' with rxSnr: {} and hopLimit: {}".format( - msg, rxSnr, hopLimit) + reply = "got msg \'{}\' with rxSnr: {} and hopLimit: {}".format(msg, rxSnr, hopLimit) print("Sending reply: ", reply) interface.sendText(reply) diff --git a/meshtastic/tests/test_main.py b/meshtastic/tests/test_main.py index 83f212a..d90c931 100644 --- a/meshtastic/tests/test_main.py +++ b/meshtastic/tests/test_main.py @@ -1234,14 +1234,113 @@ def test_main_setchan(capsys, reset_globals): @pytest.mark.unit -def test_main_onReceive_empty(reset_globals): +def test_main_onReceive_empty(caplog, reset_globals): """Test onReceive""" sys.argv = [''] Globals.getInstance().set_args(sys.argv) iface = MagicMock(autospec=SerialInterface) packet = {'decoded': 'foo'} - onReceive(packet, iface) - # TODO: how do we know we actually called it? + with caplog.at_level(logging.DEBUG): + onReceive(packet, iface) + assert re.search(r'in onReceive', caplog.text, re.MULTILINE) + + +# TODO: use this captured position app message (might want/need in the future) +# packet = { +# 'to': 4294967295, +# 'decoded': { +# 'portnum': 'POSITION_APP', +# 'payload': "M69\306a" +# }, +# 'id': 334776976, +# 'hop_limit': 3 +# } + +@pytest.mark.unit +def test_main_onReceive_with_sendtext(caplog, reset_globals): + """Test onReceive with sendtext + The entire point of this test is to make sure the interface.close() call + is made in onReceive(). + """ + sys.argv = ['', '--sendtext', 'hello'] + Globals.getInstance().set_args(sys.argv) + + # Note: 'TEXT_MESSAGE_APP' value is 1 + packet = { + 'to': 4294967295, + 'decoded': { + 'portnum': 1, + 'payload': "hello" + }, + 'id': 334776977, + 'hop_limit': 3, + 'want_ack': True + } + + iface = MagicMock(autospec=SerialInterface) + iface.myInfo.my_node_num = 4294967295 + + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + with caplog.at_level(logging.DEBUG): + main() + onReceive(packet, iface) + assert re.search(r'in onReceive', caplog.text, re.MULTILINE) + mo.assert_called() + + +@pytest.mark.unit +def test_main_onReceive_with_reply(caplog, capsys, reset_globals): + """Test onReceive with a reply + To capture: on one device run '--sendtext aaa --reply' and on another + device run '--sendtext bbb --reply', then back to the first device and + run '--sendtext aaa2 --reply'. You should now see a "Sending reply" message. + """ + sys.argv = ['', '--sendtext', 'hello', '--reply'] + Globals.getInstance().set_args(sys.argv) + + # Note: 'TEXT_MESSAGE_APP' value is 1 + + send_packet = { + 'to': 4294967295, + 'decoded': { + 'portnum': 1, + 'payload': "hello" + }, + 'id': 334776977, + 'hop_limit': 3, + 'want_ack': True + } + + reply_packet = { + 'from': 682968668, + 'to': 4294967295, + 'decoded': { + 'portnum': 'TEXT_MESSAGE_APP', + 'payload': b'bbb', + 'text': 'bbb' + }, + 'id': 1709936182, + 'rxTime': 1640381999, + 'rxSnr': 6.0, + 'hopLimit': 3, + 'raw': 'faked', + 'fromId': '!28b5465c', + 'toId': '^all' + } + + iface = MagicMock(autospec=SerialInterface) + iface.myInfo.my_node_num = 4294967295 + + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + with caplog.at_level(logging.DEBUG): + main() + onReceive(send_packet, iface) + onReceive(reply_packet, iface) + assert re.search(r'in onReceive', caplog.text, re.MULTILINE) + out, err = capsys.readouterr() + assert re.search(r'got msg ', out, re.MULTILINE) + assert err == '' + mo.assert_called() @pytest.mark.unit @@ -1371,6 +1470,7 @@ def test_main_gpio_rd(caplog, capsys, reset_globals): channel.settings.psk = b'\x01' packet = { + 'from': 682968668, 'to': 682968612, 'channel': 1, From 7ef6465b5fc8cf779f45fc3b7fd3a51f1f3ebdfe Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Fri, 24 Dec 2021 14:42:39 -0800 Subject: [PATCH 2/7] add some tests for getPref() --- meshtastic/tests/test_main.py | 49 ++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/meshtastic/tests/test_main.py b/meshtastic/tests/test_main.py index d90c931..b94dcd6 100644 --- a/meshtastic/tests/test_main.py +++ b/meshtastic/tests/test_main.py @@ -9,7 +9,8 @@ import logging from unittest.mock import patch, MagicMock import pytest -from meshtastic.__main__ import initParser, main, Globals, onReceive, onConnection, export_config +from meshtastic.__main__ import initParser, main, Globals, onReceive, onConnection, export_config, getPref +#from ..radioconfig_pb2 import UserPreferences import meshtastic.radioconfig_pb2 from ..serial_interface import SerialInterface from ..tcp_interface import TCPInterface @@ -1504,3 +1505,49 @@ def test_main_gpio_rd(caplog, capsys, reset_globals): assert re.search(r'Reading GPIO mask 0x1000 ', out, re.MULTILINE) assert re.search(r'Received RemoteHardware typ=READ_GPIOS_REPLY, gpio_value=4096', out, re.MULTILINE) assert err == '' + + +@pytest.mark.unit +def test_main_getPref_valid_field(capsys, reset_globals): + """Test getPref() with a valid field""" + prefs = MagicMock() + prefs.DESCRIPTOR.fields_by_name.get.return_value = 'ls_secs' + prefs.wifi_ssid = 'foo' + prefs.ls_secs = 300 + prefs.fixed_position = False + + getPref(prefs, 'ls_secs') + out, err = capsys.readouterr() + assert re.search(r'ls_secs: 300', out, re.MULTILINE) + assert err == '' + + +@pytest.mark.unit +def test_main_getPref_invalid_field(capsys, reset_globals): + """Test getPref() with an invalid field""" + + class Field: + """Simple class for testing.""" + + def __init__(self, name): + """constructor""" + self.name = name + + prefs = MagicMock() + prefs.DESCRIPTOR.fields_by_name.get.return_value = None + + # Note: This is a subset of the real fields + ls_secs_field = Field('ls_secs') + is_router = Field('is_router') + fixed_position = Field('fixed_position') + + fields = [ ls_secs_field, is_router, fixed_position ] + prefs.DESCRIPTOR.fields = fields + + getPref(prefs, 'foo') + + out, err = capsys.readouterr() + assert re.search(r'does not have an attribute called foo', out, re.MULTILINE) + # ensure they are sorted + assert re.search(r'fixed_position\s+is_router\s+ls_secs', out, re.MULTILINE) + assert err == '' From e1f0c7f1543870f39f85a7692e84a38f0683a100 Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Fri, 24 Dec 2021 14:56:15 -0800 Subject: [PATCH 3/7] add tests for setPref() --- meshtastic/tests/test_main.py | 55 ++++++++++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/meshtastic/tests/test_main.py b/meshtastic/tests/test_main.py index b94dcd6..28ef303 100644 --- a/meshtastic/tests/test_main.py +++ b/meshtastic/tests/test_main.py @@ -9,7 +9,7 @@ import logging from unittest.mock import patch, MagicMock import pytest -from meshtastic.__main__ import initParser, main, Globals, onReceive, onConnection, export_config, getPref +from meshtastic.__main__ import initParser, main, Globals, onReceive, onConnection, export_config, getPref, setPref #from ..radioconfig_pb2 import UserPreferences import meshtastic.radioconfig_pb2 from ..serial_interface import SerialInterface @@ -1551,3 +1551,56 @@ def test_main_getPref_invalid_field(capsys, reset_globals): # ensure they are sorted assert re.search(r'fixed_position\s+is_router\s+ls_secs', out, re.MULTILINE) assert err == '' + + +@pytest.mark.unit +def test_main_setPref_valid_field(capsys, reset_globals): + """Test setPref() with a valid field""" + + class Field: + """Simple class for testing.""" + + def __init__(self, name, enum_type): + """constructor""" + self.name = name + self.enum_type = enum_type + + ls_secs_field = Field('ls_secs', 'int') + prefs = MagicMock() + prefs.DESCRIPTOR.fields_by_name.get.return_value = ls_secs_field + + setPref(prefs, 'ls_secs', '300') + out, err = capsys.readouterr() + assert re.search(r'Set ls_secs to 300', out, re.MULTILINE) + assert err == '' + + +@pytest.mark.unit +def test_main_setPref_invalid_field(capsys, reset_globals): + """Test setPref() with a invalid field""" + + + class Field: + """Simple class for testing.""" + + def __init__(self, name): + """constructor""" + self.name = name + + prefs = MagicMock() + prefs.DESCRIPTOR.fields_by_name.get.return_value = None + + # Note: This is a subset of the real fields + ls_secs_field = Field('ls_secs') + is_router = Field('is_router') + fixed_position = Field('fixed_position') + + fields = [ ls_secs_field, is_router, fixed_position ] + prefs.DESCRIPTOR.fields = fields + + setPref(prefs, 'foo', '300') + out, err = capsys.readouterr() + assert re.search(r'does not have an attribute called foo', out, re.MULTILINE) + # ensure they are sorted + assert re.search(r'fixed_position\s+is_router\s+ls_secs', out, re.MULTILINE) + assert err == '' From 9fea479af46594786675e57937bfcfc77a72d252 Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Fri, 24 Dec 2021 15:08:56 -0800 Subject: [PATCH 4/7] do not add the test.py to the code coverage --- .coveragerc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.coveragerc b/.coveragerc index 055fbe5..dc58f07 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,2 +1,2 @@ [run] -omit = meshtastic/*_pb2.py,meshtastic/tests/*.py +omit = meshtastic/*_pb2.py,meshtastic/tests/*.py,meshtastic/test.py From a243fab9af2d879cc37653aa7af31d43ed9df684 Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Sat, 25 Dec 2021 11:39:04 -0800 Subject: [PATCH 5/7] figured out how to unit test __reader() --- meshtastic/stream_interface.py | 24 +++++++------ meshtastic/tcp_interface.py | 27 ++++++++++----- meshtastic/tests/conftest.py | 1 + meshtastic/tests/test_stream_interface.py | 41 +++++++++++++++++++++-- 4 files changed, 73 insertions(+), 20 deletions(-) diff --git a/meshtastic/stream_interface.py b/meshtastic/stream_interface.py index 3d4ee76..a1658f2 100644 --- a/meshtastic/stream_interface.py +++ b/meshtastic/stream_interface.py @@ -24,7 +24,6 @@ class StreamInterface(MeshInterface): """Constructor, opens a connection to self.stream Keyword Arguments: - devPath {string} -- A filepath to a device, i.e. /dev/ttyUSB0 (default: {None}) debugOut {stream} -- If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None}) @@ -33,15 +32,16 @@ class StreamInterface(MeshInterface): Exception: [description] """ - if not hasattr(self, 'stream'): + if not hasattr(self, 'stream') and not noProto: raise Exception( "StreamInterface is now abstract (to update existing code create SerialInterface instead)") self._rxBuf = bytes() # empty self._wantExit = False + self.stream = None + # FIXME, figure out why daemon=True causes reader thread to exit too early - self._rxThread = threading.Thread( - target=self.__reader, args=(), daemon=True) + self._rxThread = threading.Thread(target=self.__reader, args=(), daemon=True) MeshInterface.__init__(self, debugOut=debugOut, noProto=noProto) @@ -93,7 +93,10 @@ class StreamInterface(MeshInterface): def _readBytes(self, length): """Read an array of bytes from our stream""" - return self.stream.read(length) + if self.stream: + return self.stream.read(length) + else: + return None def _sendToRadioImpl(self, toRadio): """Send a ToRadio protobuf to the device""" @@ -116,14 +119,15 @@ class StreamInterface(MeshInterface): def __reader(self): """The reader thread that reads bytes from our stream""" + logging.debug('in __reader()') empty = bytes() try: while not self._wantExit: - # logging.debug("reading character") + logging.debug("reading character") b = self._readBytes(1) - # logging.debug("In reader loop") - # logging.debug(f"read returned {b}") + logging.debug("In reader loop") + logging.debug(f"read returned {b}") if len(b) > 0: c = b[0] ptr = len(self._rxBuf) @@ -144,12 +148,12 @@ class StreamInterface(MeshInterface): if c != START2: self._rxBuf = empty # failed to find start2 elif ptr >= HEADER_LEN - 1: # we've at least got a header - # big endian length follos header + # big endian length follows header packetlen = (self._rxBuf[2] << 8) + self._rxBuf[3] if ptr == HEADER_LEN - 1: # we _just_ finished reading the header, validate length if packetlen > MAX_TO_FROM_RADIO_SIZE: - self._rxBuf = empty # length ws out out bounds, restart + self._rxBuf = empty # length was out out bounds, restart if len(self._rxBuf) != 0 and ptr + 1 >= packetlen + HEADER_LEN: try: diff --git a/meshtastic/tcp_interface.py b/meshtastic/tcp_interface.py index a892448..25e5ab8 100644 --- a/meshtastic/tcp_interface.py +++ b/meshtastic/tcp_interface.py @@ -17,18 +17,29 @@ class TCPInterface(StreamInterface): hostname {string} -- Hostname/IP address of the device to connect to """ - logging.debug(f"Connecting to {hostname}") - - server_address = (hostname, portNumber) - sock = socket.create_connection(server_address) - # Instead of wrapping as a stream, we use the native socket API # self.stream = sock.makefile('rw') self.stream = None - self.socket = sock - StreamInterface.__init__( - self, debugOut=debugOut, noProto=noProto, connectNow=connectNow) + self.hostname = hostname + self.portNumber = portNumber + + if connectNow: + logging.debug(f"Connecting to {hostname}") + server_address = (hostname, portNumber) + sock = socket.create_connection(server_address) + self.socket = sock + else: + self.socket = None + + StreamInterface.__init__(self, debugOut=debugOut, noProto=noProto, + connectNow=connectNow) + + def myConnect(self): + """Connect to socket""" + server_address = (self.hostname, self.portNumber) + sock = socket.create_connection(server_address) + self.socket = sock def close(self): """Close a connection to the device""" diff --git a/meshtastic/tests/conftest.py b/meshtastic/tests/conftest.py index 12c566c..56df27c 100644 --- a/meshtastic/tests/conftest.py +++ b/meshtastic/tests/conftest.py @@ -8,6 +8,7 @@ import pytest from meshtastic.__main__ import Globals from ..mesh_interface import MeshInterface + @pytest.fixture def reset_globals(): """Fixture to reset globals.""" diff --git a/meshtastic/tests/test_stream_interface.py b/meshtastic/tests/test_stream_interface.py index a3e400c..57eb16f 100644 --- a/meshtastic/tests/test_stream_interface.py +++ b/meshtastic/tests/test_stream_interface.py @@ -1,6 +1,9 @@ """Meshtastic unit tests for stream_interface.py""" +import logging +import re +from unittest.mock import MagicMock import pytest from ..stream_interface import StreamInterface @@ -8,7 +11,41 @@ from ..stream_interface import StreamInterface @pytest.mark.unit def test_StreamInterface(): - """Test that we cannot instantiate a StreamInterface""" + """Test that we cannot instantiate a StreamInterface based on noProto""" with pytest.raises(Exception) as pytest_wrapped_e: - StreamInterface(noProto=True) + StreamInterface() assert pytest_wrapped_e.type == Exception + + +@pytest.mark.unit +def test_StreamInterface_with_noProto(caplog, reset_globals): + """Test that we can instantiate a StreamInterface based on nonProto + and we can read/write bytes from a mocked stream + """ + stream = MagicMock() + test_data = b'hello' + stream.read.return_value = test_data + with caplog.at_level(logging.DEBUG): + iface = StreamInterface(noProto=True) + iface.stream = stream + iface._writeBytes(test_data) + data = iface._readBytes(len(test_data)) + assert data == test_data + + +@pytest.mark.unit +def test_sendToRadioImpl(caplog, reset_globals): + """Test _sendToRadioImpl()""" + test_data = b'hello' + stream = MagicMock() + stream.read.return_value = test_data + toRadio = MagicMock() + toRadio.SerializeToString.return_value = test_data + with caplog.at_level(logging.DEBUG): + iface = StreamInterface(noProto=True, connectNow=False) + iface.stream = stream + iface.connect() + iface._sendToRadioImpl(toRadio) + assert re.search(r'Sending: ', caplog.text, re.MULTILINE) + assert re.search(r'reading character', caplog.text, re.MULTILINE) + assert re.search(r'In reader loop', caplog.text, re.MULTILINE) From 988540c77df422f8060959c23f10a133b1abe197 Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Sat, 25 Dec 2021 13:09:23 -0800 Subject: [PATCH 6/7] move the slow unit tests into own group --- Makefile | 4 ++-- meshtastic/tests/test_stream_interface.py | 6 ++++-- pytest.ini | 1 + 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index 0e3428b..4e3177a 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ -# unit test +# only run the fast unit tests test: - pytest + pytest -m unit # local install install: diff --git a/meshtastic/tests/test_stream_interface.py b/meshtastic/tests/test_stream_interface.py index 57eb16f..0995390 100644 --- a/meshtastic/tests/test_stream_interface.py +++ b/meshtastic/tests/test_stream_interface.py @@ -17,7 +17,8 @@ def test_StreamInterface(): assert pytest_wrapped_e.type == Exception -@pytest.mark.unit +# Note: This takes a bit, so moving from unit to slow +@pytest.mark.unitslow def test_StreamInterface_with_noProto(caplog, reset_globals): """Test that we can instantiate a StreamInterface based on nonProto and we can read/write bytes from a mocked stream @@ -33,7 +34,8 @@ def test_StreamInterface_with_noProto(caplog, reset_globals): assert data == test_data -@pytest.mark.unit +# Note: This takes a bit, so moving from unit to slow +@pytest.mark.unitslow def test_sendToRadioImpl(caplog, reset_globals): """Test _sendToRadioImpl()""" test_data = b'hello' diff --git a/pytest.ini b/pytest.ini index bb6fbde..31b01db 100644 --- a/pytest.ini +++ b/pytest.ini @@ -4,6 +4,7 @@ addopts = -m "not int and not smoke1 and not smoke2 and not smokewifi and not ex markers = unit: marks tests as unit tests + unitslow: marks slow unit tests int: marks tests as integration tests smoke1: runs smoke tests on a single device connected via USB smoke2: runs smoke tests on a two devices connected via USB From 580c081303f87f7470359c2a50a24407228f7456 Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Sat, 25 Dec 2021 16:19:13 -0800 Subject: [PATCH 7/7] captured packets and simulate what server would send --- meshtastic/stream_interface.py | 7 ++--- meshtastic/tests/test_stream_interface.py | 32 +++++++++++++++++++++-- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/meshtastic/stream_interface.py b/meshtastic/stream_interface.py index a1658f2..a3c20b5 100644 --- a/meshtastic/stream_interface.py +++ b/meshtastic/stream_interface.py @@ -38,8 +38,6 @@ class StreamInterface(MeshInterface): self._rxBuf = bytes() # empty self._wantExit = False - self.stream = None - # FIXME, figure out why daemon=True causes reader thread to exit too early self._rxThread = threading.Thread(target=self.__reader, args=(), daemon=True) @@ -105,6 +103,7 @@ class StreamInterface(MeshInterface): bufLen = len(b) # We convert into a string, because the TCP code doesn't work with byte arrays header = bytes([START1, START2, (bufLen >> 8) & 0xff, bufLen & 0xff]) + logging.debug(f'sending header:{header} b:{b}') self._writeBytes(header + b) def close(self): @@ -127,9 +126,10 @@ class StreamInterface(MeshInterface): logging.debug("reading character") b = self._readBytes(1) logging.debug("In reader loop") - logging.debug(f"read returned {b}") + #logging.debug(f"read returned {b}") if len(b) > 0: c = b[0] + #logging.debug(f'c:{c}') ptr = len(self._rxBuf) # Assume we want to append this byte, fixme use bytearray instead @@ -148,6 +148,7 @@ class StreamInterface(MeshInterface): if c != START2: self._rxBuf = empty # failed to find start2 elif ptr >= HEADER_LEN - 1: # we've at least got a header + #logging.debug('at least we received a header') # big endian length follows header packetlen = (self._rxBuf[2] << 8) + self._rxBuf[3] diff --git a/meshtastic/tests/test_stream_interface.py b/meshtastic/tests/test_stream_interface.py index 0995390..a0e1550 100644 --- a/meshtastic/tests/test_stream_interface.py +++ b/meshtastic/tests/test_stream_interface.py @@ -27,7 +27,7 @@ def test_StreamInterface_with_noProto(caplog, reset_globals): test_data = b'hello' stream.read.return_value = test_data with caplog.at_level(logging.DEBUG): - iface = StreamInterface(noProto=True) + iface = StreamInterface(noProto=True, connectNow=False) iface.stream = stream iface._writeBytes(test_data) data = iface._readBytes(len(test_data)) @@ -35,12 +35,39 @@ def test_StreamInterface_with_noProto(caplog, reset_globals): # Note: This takes a bit, so moving from unit to slow +# Tip: If you want to see the print output, run with '-s' flag: +# pytest -s meshtastic/tests/test_stream_interface.py::test_sendToRadioImpl @pytest.mark.unitslow def test_sendToRadioImpl(caplog, reset_globals): """Test _sendToRadioImpl()""" + +# def add_header(b): +# """Add header stuffs for radio""" +# bufLen = len(b) +# header = bytes([START1, START2, (bufLen >> 8) & 0xff, bufLen & 0xff]) +# return header + b + + # captured raw bytes of a Heltec2.1 radio with 2 channels (primary and a secondary channel named "gpio") + raw_1_my_info = b'\x1a,\x08\xdc\x8c\xd5\xc5\x02\x18\r2\x0e1.2.49.5354c49P\x15]\xe1%\x17Eh\xe0\xa7\x12p\xe8\x9d\x01x\x08\x90\x01\x01' + raw_2_node_info = b'"9\x08\xdc\x8c\xd5\xc5\x02\x12(\n\t!28b5465c\x12\x0cUnknown 465c\x1a\x03?5C"\x06$o(\xb5F\\0\n\x1a\x02 1%M<\xc6a' + # pylint: disable=C0301 + raw_3_node_info = b'"C\x08\xa4\x8c\xd5\xc5\x02\x12(\n\t!28b54624\x12\x0cUnknown 4624\x1a\x03?24"\x06$o(\xb5F$0\n\x1a\x07 5MH<\xc6a%G<\xc6a=\x00\x00\xc0@' + raw_4_complete = b'@\xcf\xe5\xd1\x8c\x0e' + # pylint: disable=C0301 + raw_5_prefs = b'Z6\r\\F\xb5(\x15\\F\xb5("\x1c\x08\x06\x12\x13*\x11\n\x0f0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#\xb8\t\x015]$\xddk5\xd5\x7f!b=M<\xc6aP\x03`F' + # pylint: disable=C0301 + raw_6_channel0 = b'Z.\r\\F\xb5(\x15\\F\xb5("\x14\x08\x06\x12\x0b:\t\x12\x05\x18\x01"\x01\x01\x18\x015^$\xddk5\xd6\x7f!b=M<\xc6aP\x03`F' + # pylint: disable=C0301 + raw_7_channel1 = b'ZS\r\\F\xb5(\x15\\F\xb5("9\x08\x06\x120:.\x08\x01\x12(" \xb4&\xb3\xc7\x06\xd8\xe39%\xba\xa5\xee\x8eH\x06\xf6\xf4H\xe8\xd5\xc1[ao\xb5Y\\\xb4"\xafmi*\x04gpio\x18\x025_$\xddk5\xd7\x7f!b=M<\xc6aP\x03`F' + raw_8_channel2 = b'Z)\r\\F\xb5(\x15\\F\xb5("\x0f\x08\x06\x12\x06:\x04\x08\x02\x12\x005`$\xddk5\xd8\x7f!b=M<\xc6aP\x03`F' + raw_blank = b'' + test_data = b'hello' stream = MagicMock() - stream.read.return_value = test_data + #stream.read.return_value = add_header(test_data) + stream.read.side_effect = [ raw_1_my_info, raw_2_node_info, raw_3_node_info, raw_4_complete, + raw_5_prefs, raw_6_channel0, raw_7_channel1, raw_8_channel2, + raw_blank, raw_blank] toRadio = MagicMock() toRadio.SerializeToString.return_value = test_data with caplog.at_level(logging.DEBUG): @@ -51,3 +78,4 @@ def test_sendToRadioImpl(caplog, reset_globals): assert re.search(r'Sending: ', caplog.text, re.MULTILINE) assert re.search(r'reading character', caplog.text, re.MULTILINE) assert re.search(r'In reader loop', caplog.text, re.MULTILINE) + print(caplog.text)