From 302f52469a38a311c64e1dbeaef49fbb49fea208 Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Tue, 21 Dec 2021 14:47:05 -0800 Subject: [PATCH 01/10] add unit tests for toRadio, sendData() and sendPosition(); found and fixed Exception without raise --- meshtastic/mesh_interface.py | 6 ++- meshtastic/tests/test_mesh_interface.py | 68 +++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index fee9166..ce74979 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -227,8 +227,9 @@ class MeshInterface: data = data.SerializeToString() logging.debug(f"len(data): {len(data)}") + logging.debug(f"mesh_pb2.Constants.DATA_PAYLOAD_LEN: {mesh_pb2.Constants.DATA_PAYLOAD_LEN}") if len(data) > mesh_pb2.Constants.DATA_PAYLOAD_LEN: - Exception("Data payload too big") + raise Exception("Data payload too big") if portNum == portnums_pb2.PortNum.UNKNOWN_APP: # we are now more strict wrt port numbers our_exit("Warning: A non-zero port number must be specified") @@ -261,12 +262,15 @@ class MeshInterface: p = mesh_pb2.Position() if latitude != 0.0: p.latitude_i = int(latitude / 1e-7) + logging.debug(f'p.latitude_i:{p.latitude_i}') if longitude != 0.0: p.longitude_i = int(longitude / 1e-7) + logging.debug(f'p.longitude_i:{p.longitude_i}') if altitude != 0: p.altitude = int(altitude) + logging.debug(f'p.altitude:{p.altitude}') if timeSec == 0: timeSec = time.time() # returns unix timestamp in seconds diff --git a/meshtastic/tests/test_mesh_interface.py b/meshtastic/tests/test_mesh_interface.py index 9429911..98a6b9a 100644 --- a/meshtastic/tests/test_mesh_interface.py +++ b/meshtastic/tests/test_mesh_interface.py @@ -216,3 +216,71 @@ def test_handleFromRadio_with_node_info_tbeam_with_bad_data(reset_globals, caplo with caplog.at_level(logging.DEBUG): iface._startConfig() iface._handleFromRadio(from_radio_bytes) + + +@pytest.mark.unit +def test_MeshInterface_sendToRadioImpl(caplog, reset_globals): + """Test _sendToRadioImp()""" + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.DEBUG): + iface._sendToRadioImpl('foo') + assert re.search(r'Subclass must provide toradio', caplog.text, re.MULTILINE) + iface.close() + + +@pytest.mark.unit +def test_MeshInterface_sendToRadio_no_proto(caplog, reset_globals): + """Test sendToRadio()""" + iface = MeshInterface() + with caplog.at_level(logging.DEBUG): + iface._sendToRadioImpl('foo') + assert re.search(r'Subclass must provide toradio', caplog.text, re.MULTILINE) + iface.close() + + +@pytest.mark.unit +def test_sendData_too_long(caplog, reset_globals): + """Test when data payload is too big""" + iface = MeshInterface(noProto=True) + some_large_text = b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + with caplog.at_level(logging.DEBUG): + with pytest.raises(Exception) as pytest_wrapped_e: + iface.sendData(some_large_text) + assert re.search('Data payload too big', caplog.text, re.MULTILINE) + assert pytest_wrapped_e.type == Exception + iface.close() + + +@pytest.mark.unit +def test_sendData_unknown_app(capsys, reset_globals): + """Test sendData when unknown app""" + iface = MeshInterface(noProto=True) + with pytest.raises(SystemExit) as pytest_wrapped_e: + iface.sendData(b'hello', portNum=0) + out, err = capsys.readouterr() + assert re.search(r'Warning: A non-zero port number', out, re.MULTILINE) + assert err == '' + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + + +@pytest.mark.unit +def test_sendPosition_with_a_position(caplog, reset_globals): + """Test sendPosition when lat/long/alt""" + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.DEBUG): + iface.sendPosition(latitude=40.8, longitude=-111.86, altitude=201) + assert re.search(r'p.latitude_i:408', caplog.text, re.MULTILINE) + assert re.search(r'p.longitude_i:-11186', caplog.text, re.MULTILINE) + assert re.search(r'p.altitude:201', caplog.text, re.MULTILINE) From 2919930473773cbb60d146ce19fc41bf2aad46c2 Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Tue, 21 Dec 2021 15:21:30 -0800 Subject: [PATCH 02/10] unit tests for sendPacket() --- README.md | 4 +- meshtastic/mesh_interface.py | 12 +++-- meshtastic/tests/test_mesh_interface.py | 61 ++++++++++++++++++++++++- 3 files changed, 70 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 003c2be..37b26ed 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ An example using Python 3 code to send a message to the mesh: ``` import meshtastic -interface = meshtastic.SerialInterface() # By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0 +interface = meshtastic.serial_interface.SerialInterface() # By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0 interface.sendText("hello mesh") # or sendData to send binary data, see documentations for other options. interface.close() ``` @@ -103,7 +103,7 @@ You can even set the channel preshared key to a particular AES128 or AES256 sequ meshtastic --ch-set psk 0x1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b --info ``` -Use "--ch-set psk none" to turn off encryption. +Use "--ch-set psk none" to turn off encryption. Use "--ch-set psk random" will assign a new (high quality) random AES256 key to the primary channel (similar to what the Android app does when making new channels). diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index ce74979..844e729 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -311,7 +311,10 @@ class MeshInterface: elif destinationId == BROADCAST_ADDR: nodeNum = BROADCAST_NUM elif destinationId == LOCAL_ADDR: - nodeNum = self.myInfo.my_node_num + if self.myInfo: + nodeNum = self.myInfo.my_node_num + else: + our_exit("Warning: No myInfo found.") # A simple hex style nodeid - we can parse this without needing the DB elif destinationId.startswith("!"): nodeNum = int(destinationId[1:], 16) @@ -334,7 +337,7 @@ class MeshInterface: meshPacket.id = self._generatePacketId() toRadio.packet.CopyFrom(meshPacket) - #logging.debug(f"Sending packet: {stripnl(meshPacket)}") + logging.debug(f"Sending packet: {stripnl(meshPacket)}") self._sendToRadio(toRadio) return meshPacket @@ -374,8 +377,9 @@ class MeshInterface: def _waitConnected(self): """Block until the initial node db download is complete, or timeout and raise an exception""" - if not self.isConnected.wait(15.0): # timeout after x seconds - raise Exception("Timed out waiting for connection completion") + if not self.noProto: + if not self.isConnected.wait(15.0): # timeout after x seconds + raise Exception("Timed out waiting for connection completion") # If we failed while connecting, raise the connection to the client if self.failure: diff --git a/meshtastic/tests/test_mesh_interface.py b/meshtastic/tests/test_mesh_interface.py index 98a6b9a..2917c4e 100644 --- a/meshtastic/tests/test_mesh_interface.py +++ b/meshtastic/tests/test_mesh_interface.py @@ -9,7 +9,7 @@ import pytest from ..mesh_interface import MeshInterface from ..node import Node from .. import mesh_pb2 -from ..__init__ import LOCAL_ADDR +from ..__init__ import LOCAL_ADDR, BROADCAST_ADDR @pytest.mark.unit @@ -284,3 +284,62 @@ def test_sendPosition_with_a_position(caplog, reset_globals): assert re.search(r'p.latitude_i:408', caplog.text, re.MULTILINE) assert re.search(r'p.longitude_i:-11186', caplog.text, re.MULTILINE) assert re.search(r'p.altitude:201', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_sendPacket_with_no_destination(capsys, reset_globals): + """Test _sendPacket()""" + iface = MeshInterface(noProto=True) + with pytest.raises(SystemExit) as pytest_wrapped_e: + iface._sendPacket(b'', destinationId=None) + out, err = capsys.readouterr() + assert re.search(r'Warning: destinationId must not be None', out, re.MULTILINE) + assert err == '' + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + + +@pytest.mark.unit +def test_sendPacket_with_destination_as_int(caplog, reset_globals): + """Test _sendPacket() with int as a destination""" + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.DEBUG): + meshPacket = mesh_pb2.MeshPacket() + iface._sendPacket(meshPacket, destinationId=123) + assert re.search(r'Sending packet', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog, reset_globals): + """Test _sendPacket() with BROADCAST_ADDR as a destination""" + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.DEBUG): + meshPacket = mesh_pb2.MeshPacket() + iface._sendPacket(meshPacket, destinationId=BROADCAST_ADDR) + assert re.search(r'Sending packet', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_sendPacket_with_destination_as_LOCAL_ADDR_no_myInfo(capsys, reset_globals): + """Test _sendPacket() with LOCAL_ADDR as a destination with no myInfo""" + iface = MeshInterface(noProto=True) + with pytest.raises(SystemExit) as pytest_wrapped_e: + meshPacket = mesh_pb2.MeshPacket() + iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR) + out, err = capsys.readouterr() + assert re.search(r'Warning: No myInfo', out, re.MULTILINE) + assert err == '' + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + + +@pytest.mark.unit +def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog, reset_globals): + """Test _sendPacket() with LOCAL_ADDR as a destination with myInfo""" + iface = MeshInterface(noProto=True) + myInfo = MagicMock() + iface.myInfo = myInfo + with caplog.at_level(logging.DEBUG): + meshPacket = mesh_pb2.MeshPacket() + iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR) + assert re.search(r'Sending packet', caplog.text, re.MULTILINE) From 8e578c3b246b082b12791a9f505a55d4c93bc8d5 Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Tue, 21 Dec 2021 16:27:43 -0800 Subject: [PATCH 03/10] add unit test for getMyNodeInfo() --- meshtastic/tests/test_mesh_interface.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/meshtastic/tests/test_mesh_interface.py b/meshtastic/tests/test_mesh_interface.py index 2917c4e..5888fd0 100644 --- a/meshtastic/tests/test_mesh_interface.py +++ b/meshtastic/tests/test_mesh_interface.py @@ -343,3 +343,17 @@ def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog, reset_glo meshPacket = mesh_pb2.MeshPacket() iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR) assert re.search(r'Sending packet', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_getMyNodeInfo(reset_globals): + """Test getMyNodeInfo()""" + iface = MeshInterface(noProto=True) + anode = iface.getNode(LOCAL_ADDR) + iface.nodesByNum = {1: anode } + assert iface.nodesByNum.get(1) == anode + myInfo = MagicMock() + iface.myInfo = myInfo + iface.myInfo.my_node_num = 1 + myinfo = iface.getMyNodeInfo() + assert myinfo == anode From 8bf1dd6dcb992dea655e892e3ce5ec8c63d1e595 Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Tue, 21 Dec 2021 22:13:02 -0800 Subject: [PATCH 04/10] add unit tests for showChannels(), deleteChannel(), getChannelByName(), getDisabledChannel(), getAdminChannelIndex(), turnOffEncryptionOnPrimaryChannel(), and writeConfig() --- meshtastic/node.py | 2 + meshtastic/tests/test_node.py | 275 ++++++++++++++++++++++++++++++++++ 2 files changed, 277 insertions(+) diff --git a/meshtastic/node.py b/meshtastic/node.py index 4b682b3..92ce433 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -28,7 +28,9 @@ class Node: """Show human readable description of our channels.""" print("Channels:") if self.channels: + logging.debug(f'self.channels:{self.channels}') for c in self.channels: + #print('c.settings.psk:', c.settings.psk) cStr = stripnl(MessageToJson(c.settings)) # only show if there is no psk (meaning disabled channel) if c.settings.psk: diff --git a/meshtastic/tests/test_node.py b/meshtastic/tests/test_node.py index 528bead..ff1b288 100644 --- a/meshtastic/tests/test_node.py +++ b/meshtastic/tests/test_node.py @@ -9,6 +9,7 @@ import pytest from ..node import Node from ..serial_interface import SerialInterface from ..admin_pb2 import AdminMessage +from ..channel_pb2 import Channel @pytest.mark.unit @@ -139,3 +140,277 @@ def test_setURL_valid_URL_but_no_settings(caplog): anode.setURL(url) assert pytest_wrapped_e.type == SystemExit assert pytest_wrapped_e.value.code == 1 + + +@pytest.mark.unit +def test_showChannels(capsys): + """Test showChannels""" + anode = Node('foo', 'bar') + + # primary channel + # role: 0=Disabled, 1=Primary, 2=Secondary + # modem_config: 0-5 + # role: 0=Disabled, 1=Primary, 2=Secondary + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel2.settings.name = 'testing' + + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + anode.showChannels() + out, err = capsys.readouterr() + assert re.search(r'Channels:', out, re.MULTILINE) + # primary channel + assert re.search(r'Primary channel URL', out, re.MULTILINE) + assert re.search(r'PRIMARY psk=default ', out, re.MULTILINE) + assert re.search(r'"modemConfig": "Bw125Cr48Sf4096"', out, re.MULTILINE) + assert re.search(r'"psk": "AQ=="', out, re.MULTILINE) + # secondary channel + assert re.search(r'SECONDARY psk=secret ', out, re.MULTILINE) + assert re.search(r'"psk": "ipR5DsbJHjWREkCmMKi0M4cA8ksO539Bes31sJAwqDQ="', out, re.MULTILINE) + assert err == '' + + +@pytest.mark.unit +def test_deleteChannel_try_to_delete_primary_channel(capsys): + """Try to delete primary channel.""" + anode = Node('foo', 'bar') + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + # no secondary channels + channel2 = Channel(index=2, role=0) + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + with pytest.raises(SystemExit) as pytest_wrapped_e: + anode.deleteChannel(0) + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + out, err = capsys.readouterr() + assert re.search(r'Warning: Only SECONDARY channels can be deleted', out, re.MULTILINE) + assert err == '' + + +@pytest.mark.unit +def test_getChannelByName(capsys): + """Get a channel by the name.""" + anode = Node('foo', 'bar') + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel2.settings.name = 'admin' + + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + ch = anode.getChannelByName('admin') + assert ch.index == 2 + + +@pytest.mark.unit +def test_getChannelByName_invalid_name(capsys): + """Get a channel by the name but one that is not present.""" + anode = Node('foo', 'bar') + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel2.settings.name = 'admin' + + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + ch = anode.getChannelByName('testing') + assert ch is None + + +@pytest.mark.unit +def test_getDisabledChannel(capsys): + """Get the first disabled channel.""" + anode = Node('foo', 'bar') + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel2.settings.name = 'testingA' + + channel3 = Channel(index=3, role=2) + channel3.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel3.settings.name = 'testingB' + + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + ch = anode.getDisabledChannel() + assert ch.index == 4 + + +@pytest.mark.unit +def test_getDisabledChannel_where_all_channels_are_used(capsys): + """Get the first disabled channel.""" + anode = Node('foo', 'bar') + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel3 = Channel(index=3, role=2) + channel4 = Channel(index=4, role=2) + channel5 = Channel(index=5, role=2) + channel6 = Channel(index=6, role=2) + channel7 = Channel(index=7, role=2) + channel8 = Channel(index=8, role=2) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + ch = anode.getDisabledChannel() + assert ch is None + + +@pytest.mark.unit +def test_getAdminChannelIndex(capsys): + """Get the 'admin' channel index.""" + anode = Node('foo', 'bar') + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel2.settings.name = 'admin' + + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + i = anode._getAdminChannelIndex() + assert i == 2 + + +@pytest.mark.unit +def test_getAdminChannelIndex_when_no_admin_named_channel(capsys): + """Get the 'admin' channel when there is not one.""" + anode = Node('foo', 'bar') + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=0) + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + i = anode._getAdminChannelIndex() + assert i == 0 + + +# TODO: should we check if we need to turn it off? +@pytest.mark.unit +def test_turnOffEncryptionOnPrimaryChannel(capsys): + """Turn off encryption when there is a psk.""" + #iface = MagicMock(autospec=SerialInterface) + anode = Node('foo', 'bar', noProto=True) + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + # value from using "--ch-set psk 0x1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b " + channel1.settings.psk = b'\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++' + + channel2 = Channel(index=2, role=0) + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + anode.turnOffEncryptionOnPrimaryChannel() + out, err = capsys.readouterr() + assert re.search(r'Writing modified channels to device', out) + assert err == '' + + +@pytest.mark.unit +def test_writeConfig_with_no_radioConfig(capsys): + """Test writeConfig with no radioConfig.""" + #iface = MagicMock(autospec=SerialInterface) + anode = Node('foo', 'bar', noProto=True) + + with pytest.raises(SystemExit) as pytest_wrapped_e: + anode.writeConfig() + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + out, err = capsys.readouterr() + assert re.search(r'Error: No RadioConfig has been read', out) + assert err == '' From 002a7caf386236dc5568bd05401296cd79f129b7 Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Wed, 22 Dec 2021 09:10:39 -0800 Subject: [PATCH 05/10] add more unit tests to deleteChannel() and writeConfig() --- meshtastic/node.py | 3 +- meshtastic/tests/test_node.py | 180 +++++++++++++++++++++++++++++++++- 2 files changed, 180 insertions(+), 3 deletions(-) diff --git a/meshtastic/node.py b/meshtastic/node.py index 92ce433..9ad88b0 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -110,7 +110,7 @@ class Node: # *moving* the admin channel index as we are writing if (self.iface.localNode == self) and index >= adminIndex: # We've now passed the old location for admin index - # (and writen it), so we can start finding it by name again + # (and written it), so we can start finding it by name again adminIndex = 0 def getChannelByName(self, name): @@ -237,6 +237,7 @@ class Node: print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}') if errorFound is False: self.radioConfig = p["decoded"]["admin"]["raw"].get_radio_response + logging.debug(f'self.radioConfig:{self.radioConfig}') logging.debug("Received radio config, now fetching channels...") self._timeout.reset() # We made foreward progress self._requestChannel(0) # now start fetching channels diff --git a/meshtastic/tests/test_node.py b/meshtastic/tests/test_node.py index ff1b288..b99e032 100644 --- a/meshtastic/tests/test_node.py +++ b/meshtastic/tests/test_node.py @@ -10,6 +10,7 @@ from ..node import Node from ..serial_interface import SerialInterface from ..admin_pb2 import AdminMessage from ..channel_pb2 import Channel +from ..radioconfig_pb2 import RadioConfig @pytest.mark.unit @@ -213,6 +214,171 @@ def test_deleteChannel_try_to_delete_primary_channel(capsys): assert err == '' +@pytest.mark.unit +def test_deleteChannel_secondary(): + """Try to delete a secondary channel.""" + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel2.settings.name = 'testing' + + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + mo.myInfo.max_channels = 8 + anode = Node(mo, 'bar', noProto=True) + + anode.channels = channels + assert len(anode.channels) == 8 + assert channels[0].settings.modem_config == 3 + assert channels[1].settings.name == 'testing' + assert channels[2].settings.name == '' + assert channels[3].settings.name == '' + assert channels[4].settings.name == '' + assert channels[5].settings.name == '' + assert channels[6].settings.name == '' + assert channels[7].settings.name == '' + + anode.deleteChannel(1) + + assert len(anode.channels) == 8 + assert channels[0].settings.modem_config == 3 + assert channels[1].settings.name == '' + assert channels[2].settings.name == '' + assert channels[3].settings.name == '' + assert channels[4].settings.name == '' + assert channels[5].settings.name == '' + assert channels[6].settings.name == '' + assert channels[7].settings.name == '' + + +@pytest.mark.unit +def test_deleteChannel_secondary_with_admin_channel_after_testing(): + """Try to delete a secondary channel where there is an admin channel.""" + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel2.settings.name = 'testing' + + channel3 = Channel(index=3, role=2) + channel3.settings.name = 'admin' + + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + mo.myInfo.max_channels = 8 + anode = Node(mo, 'bar', noProto=True) + + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + assert mo.localNode == anode + + anode.channels = channels + assert len(anode.channels) == 8 + assert channels[0].settings.modem_config == 3 + assert channels[1].settings.name == 'testing' + assert channels[2].settings.name == 'admin' + assert channels[3].settings.name == '' + assert channels[4].settings.name == '' + assert channels[5].settings.name == '' + assert channels[6].settings.name == '' + assert channels[7].settings.name == '' + + anode.deleteChannel(1) + + assert len(anode.channels) == 8 + assert channels[0].settings.modem_config == 3 + assert channels[1].settings.name == 'admin' + assert channels[2].settings.name == '' + assert channels[3].settings.name == '' + assert channels[4].settings.name == '' + assert channels[5].settings.name == '' + assert channels[6].settings.name == '' + assert channels[7].settings.name == '' + + +@pytest.mark.unit +def test_deleteChannel_secondary_with_admin_channel_before_testing(): + """Try to delete a secondary channel where there is an admin channel.""" + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel2.settings.name = 'admin' + + channel3 = Channel(index=3, role=2) + channel3.settings.name = 'testing' + + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + mo.myInfo.max_channels = 8 + anode = Node(mo, 'bar', noProto=True) + + anode.channels = channels + assert len(anode.channels) == 8 + assert channels[0].settings.modem_config == 3 + assert channels[1].settings.name == 'admin' + assert channels[2].settings.name == 'testing' + assert channels[3].settings.name == '' + assert channels[4].settings.name == '' + assert channels[5].settings.name == '' + assert channels[6].settings.name == '' + assert channels[7].settings.name == '' + + anode.deleteChannel(2) + + assert len(anode.channels) == 8 + assert channels[0].settings.modem_config == 3 + assert channels[1].settings.name == 'admin' + assert channels[2].settings.name == '' + assert channels[3].settings.name == '' + assert channels[4].settings.name == '' + assert channels[5].settings.name == '' + assert channels[6].settings.name == '' + assert channels[7].settings.name == '' + + @pytest.mark.unit def test_getChannelByName(capsys): """Get a channel by the name.""" @@ -376,7 +542,6 @@ def test_getAdminChannelIndex_when_no_admin_named_channel(capsys): @pytest.mark.unit def test_turnOffEncryptionOnPrimaryChannel(capsys): """Turn off encryption when there is a psk.""" - #iface = MagicMock(autospec=SerialInterface) anode = Node('foo', 'bar', noProto=True) channel1 = Channel(index=1, role=1) @@ -404,7 +569,6 @@ def test_turnOffEncryptionOnPrimaryChannel(capsys): @pytest.mark.unit def test_writeConfig_with_no_radioConfig(capsys): """Test writeConfig with no radioConfig.""" - #iface = MagicMock(autospec=SerialInterface) anode = Node('foo', 'bar', noProto=True) with pytest.raises(SystemExit) as pytest_wrapped_e: @@ -414,3 +578,15 @@ def test_writeConfig_with_no_radioConfig(capsys): out, err = capsys.readouterr() assert re.search(r'Error: No RadioConfig has been read', out) assert err == '' + + +@pytest.mark.unit +def test_writeConfig(caplog): + """Test writeConfig""" + anode = Node('foo', 'bar', noProto=True) + radioConfig = RadioConfig() + anode.radioConfig = radioConfig + + with caplog.at_level(logging.DEBUG): + anode.writeConfig() + assert re.search(r'Wrote config', caplog.text, re.MULTILINE) From 2eeffdcd498018110ad7786f54277b14bce0353f Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Wed, 22 Dec 2021 09:21:47 -0800 Subject: [PATCH 06/10] add unit tests for _requestChannel() --- meshtastic/tests/test_node.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/meshtastic/tests/test_node.py b/meshtastic/tests/test_node.py index b99e032..b48bced 100644 --- a/meshtastic/tests/test_node.py +++ b/meshtastic/tests/test_node.py @@ -17,6 +17,8 @@ from ..radioconfig_pb2 import RadioConfig def test_node(capsys): """Test that we can instantiate a Node""" anode = Node('foo', 'bar') + radioConfig = RadioConfig() + anode.radioConfig = radioConfig anode.showChannels() anode.showInfo() out, err = capsys.readouterr() @@ -590,3 +592,34 @@ def test_writeConfig(caplog): with caplog.at_level(logging.DEBUG): anode.writeConfig() assert re.search(r'Wrote config', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_requestChannel_not_localNode(caplog): + """Test _requestChannel()""" + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + mo.myInfo.max_channels = 8 + anode = Node(mo, 'bar', noProto=True) + with caplog.at_level(logging.DEBUG): + anode._requestChannel(0) + assert re.search(r'Requesting channel 0 info from remote node', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_requestChannel_localNode(caplog): + """Test _requestChannel()""" + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + mo.myInfo.max_channels = 8 + anode = Node(mo, 'bar', noProto=True) + + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + with caplog.at_level(logging.DEBUG): + anode._requestChannel(0) + assert re.search(r'Requesting channel 0', caplog.text, re.MULTILINE) + assert not re.search(r'from remote node', caplog.text, re.MULTILINE) From 586a3eb463ec81c0271ad384f03b586646de7fcd Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Wed, 22 Dec 2021 11:11:17 -0800 Subject: [PATCH 07/10] wip extracting onResponse() closures to class methods so they can be tested --- meshtastic/node.py | 91 +++++++++++++----------- meshtastic/tests/test_node.py | 130 ++++++++++++++++++++++++++++++++++ 2 files changed, 179 insertions(+), 42 deletions(-) diff --git a/meshtastic/node.py b/meshtastic/node.py index 9ad88b0..3158d57 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -8,6 +8,8 @@ from . import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2 from .util import pskToString, stripnl, Timeout, our_exit, fromPSK + + class Node: """A model of a (local or remote) node in the mesh @@ -222,26 +224,29 @@ class Node: self.writeChannel(ch.index) i = i + 1 + + def onResponseRequestSettings(self, p): + """Handle the response packet for requesting settings _requestSettings()""" + logging.debug(f'onResponseRequestSetting() p:{p}') + errorFound = False + if 'routing' in p["decoded"]: + if p["decoded"]["routing"]["errorReason"] != "NONE": + errorFound = True + print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}') + if errorFound is False: + self.radioConfig = p["decoded"]["admin"]["raw"].get_radio_response + logging.debug(f'self.radioConfig:{self.radioConfig}') + logging.debug("Received radio config, now fetching channels...") + self._timeout.reset() # We made foreward progress + self._requestChannel(0) # now start fetching channels + + def _requestSettings(self): """Done with initial config messages, now send regular MeshPackets to ask for settings.""" p = admin_pb2.AdminMessage() p.get_radio_request = True - def onResponse(p): - """A closure to handle the response packet""" - errorFound = False - if 'routing' in p["decoded"]: - if p["decoded"]["routing"]["errorReason"] != "NONE": - errorFound = True - print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}') - if errorFound is False: - self.radioConfig = p["decoded"]["admin"]["raw"].get_radio_response - logging.debug(f'self.radioConfig:{self.radioConfig}') - logging.debug("Received radio config, now fetching channels...") - self._timeout.reset() # We made foreward progress - self._requestChannel(0) # now start fetching channels - # Show progress message for super slow operations if self != self.iface.localNode: print("Requesting preferences from remote node.") @@ -252,7 +257,7 @@ class Node: print(" 4. All devices have been rebooted after all of the above. (optional, but recommended)") print("Note: This could take a while (it requests remote channel configs, then writes config)") - return self._sendAdmin(p, wantResponse=True, onResponse=onResponse) + return self._sendAdmin(p, wantResponse=True, onResponse=self.onResponseRequestSettings) def exitSimulator(self): """Tell a simulator node to exit (this message @@ -293,6 +298,34 @@ class Node: self.channels.append(ch) index += 1 + + def onResponseRequestChannel(self, p): + """Handle the response packet for requesting a channel _requestChannel()""" + logging.debug(f'onResponseRequestChannel() p:{p}') + c = p["decoded"]["admin"]["raw"].get_channel_response + self.partialChannels.append(c) + self._timeout.reset() # We made foreward progress + logging.debug(f"Received channel {stripnl(c)}") + index = c.index + + # for stress testing, we can always download all channels + fastChannelDownload = True + + # Once we see a response that has NO settings, assume + # we are at the end of channels and stop fetching + quitEarly = (c.role == channel_pb2.Channel.Role.DISABLED) and fastChannelDownload + + if quitEarly or index >= self.iface.myInfo.max_channels - 1: + logging.debug("Finished downloading channels") + + self.channels = self.partialChannels + self._fixupChannels() + + # FIXME, the following should only be called after we have settings and channels + self.iface._connected() # Tell everyone else we are ready to go + else: + self._requestChannel(index + 1) + def _requestChannel(self, channelNum: int): """Done with initial config messages, now send regular MeshPackets to ask for settings""" @@ -306,33 +339,7 @@ class Node: else: logging.debug(f"Requesting channel {channelNum}") - def onResponse(p): - """A closure to handle the response packet for requesting a channel""" - c = p["decoded"]["admin"]["raw"].get_channel_response - self.partialChannels.append(c) - self._timeout.reset() # We made foreward progress - logging.debug(f"Received channel {stripnl(c)}") - index = c.index - - # for stress testing, we can always download all channels - fastChannelDownload = True - - # Once we see a response that has NO settings, assume - # we are at the end of channels and stop fetching - quitEarly = (c.role == channel_pb2.Channel.Role.DISABLED) and fastChannelDownload - - if quitEarly or index >= self.iface.myInfo.max_channels - 1: - logging.debug("Finished downloading channels") - - self.channels = self.partialChannels - self._fixupChannels() - - # FIXME, the following should only be called after we have settings and channels - self.iface._connected() # Tell everyone else we are ready to go - else: - self._requestChannel(index + 1) - - return self._sendAdmin(p, wantResponse=True, onResponse=onResponse) + return self._sendAdmin(p, wantResponse=True, onResponse=self.onResponseRequestChannel) # pylint: disable=R1710 diff --git a/meshtastic/tests/test_node.py b/meshtastic/tests/test_node.py index b48bced..680ddda 100644 --- a/meshtastic/tests/test_node.py +++ b/meshtastic/tests/test_node.py @@ -623,3 +623,133 @@ def test_requestChannel_localNode(caplog): anode._requestChannel(0) assert re.search(r'Requesting channel 0', caplog.text, re.MULTILINE) assert not re.search(r'from remote node', caplog.text, re.MULTILINE) + + +# TODO: +#@pytest.mark.unit +#def test_onResponseRequestChannel(caplog): +# """Test onResponseRequestChannel()""" +# anode = Node('foo', 'bar') +# radioConfig = RadioConfig() +# anode.radioConfig = radioConfig + + +@pytest.mark.unit +def test_onResponseRequestSetting(caplog): + """Test onResponseRequestSetting()""" + # Note: Split out the get_radio_response to a MagicMock + # so it could be "returned" (not really sure how to do that + # in a python dict. + amsg = MagicMock(autospec=AdminMessage) + amsg.get_radio_response = """{ + preferences { + phone_timeout_secs: 900 + ls_secs: 300 + position_broadcast_smart: true + position_flags: 35 + } +}""" + # TODO: not sure if this tmpraw is formatted correctly + tmpraw = """from: 2475227164 +to: 2475227164 +decoded { + portnum: ADMIN_APP + payload: "*\016\n\0140\204\007P\254\002\210\001\001\260\t#" + request_id: 3145147848 +} +id: 365963704 +rx_time: 1640195197 +hop_limit: 3 +priority: RELIABLE""" + + packet = { + 'from': 2475227164, + 'to': 2475227164, + 'decoded': { + 'portnum': 'ADMIN_APP', + 'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#', + 'requestId': 3145147848, + 'admin': { + 'getRadioResponse': { + 'preferences': { + 'phoneTimeoutSecs': 900, + 'lsSecs': 300, + 'positionBroadcastSmart': True, + 'positionFlags': 35 + } + }, + 'raw': amsg + }, + 'id': 365963704, + 'rxTime': 1640195197, + 'hopLimit': 3, + 'priority': 'RELIABLE', + 'raw': tmpraw, + 'fromId': '!9388f81c', + 'toId': '!9388f81c' + } + } + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + mo.myInfo.max_channels = 8 + anode = Node(mo, 'bar', noProto=True) + + radioConfig = RadioConfig() + anode.radioConfig = radioConfig + + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + with caplog.at_level(logging.DEBUG): + anode.onResponseRequestSettings(packet) + assert re.search(r'Received radio config, now fetching channels..', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_onResponseRequestSetting_with_error(capsys): + """Test onResponseRequestSetting() with an error""" + packet = { + 'from': 2475227164, + 'to': 2475227164, + 'decoded': { + 'portnum': 'ADMIN_APP', + 'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#', + 'requestId': 3145147848, + 'routing': { + 'errorReason': 'some made up error', + }, + 'admin': { + 'getRadioResponse': { + 'preferences': { + 'phoneTimeoutSecs': 900, + 'lsSecs': 300, + 'positionBroadcastSmart': True, + 'positionFlags': 35 + } + }, + }, + 'id': 365963704, + 'rxTime': 1640195197, + 'hopLimit': 3, + 'priority': 'RELIABLE', + 'fromId': '!9388f81c', + 'toId': '!9388f81c' + } + } + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + mo.myInfo.max_channels = 8 + anode = Node(mo, 'bar', noProto=True) + + radioConfig = RadioConfig() + anode.radioConfig = radioConfig + + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + anode.onResponseRequestSettings(packet) + out, err = capsys.readouterr() + assert re.search(r'Error on response', out) + assert err == '' From dbb5af1f350a3607e634fbc8822667aea3c5f8ae Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Wed, 22 Dec 2021 12:11:27 -0800 Subject: [PATCH 08/10] added unit test for test_onResponseRequestChannel() --- Makefile | 2 + meshtastic/tests/test_node.py | 118 ++++++++++++++++++++++++++++------ 2 files changed, 99 insertions(+), 21 deletions(-) diff --git a/Makefile b/Makefile index aa8f164..dc80927 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,7 @@ install: lint: pylint meshtastic +# run the coverage report and open results in a browser cov: pytest --cov-report html --cov=meshtastic # on mac, this will open the coverage report in a browser @@ -19,4 +20,5 @@ cov: examples: FORCE pytest -mexamples +# Makefile hack to get the examples to always run FORCE: ; diff --git a/meshtastic/tests/test_node.py b/meshtastic/tests/test_node.py index 680ddda..e9b2ff8 100644 --- a/meshtastic/tests/test_node.py +++ b/meshtastic/tests/test_node.py @@ -625,13 +625,102 @@ def test_requestChannel_localNode(caplog): assert not re.search(r'from remote node', caplog.text, re.MULTILINE) -# TODO: -#@pytest.mark.unit -#def test_onResponseRequestChannel(caplog): -# """Test onResponseRequestChannel()""" -# anode = Node('foo', 'bar') -# radioConfig = RadioConfig() -# anode.radioConfig = radioConfig +@pytest.mark.unit +def test_onResponseRequestChannel(caplog): + """Test onResponseRequestChannel()""" + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + msg1 = MagicMock(autospec=AdminMessage) + msg1.get_channel_response = channel1 + + msg2 = MagicMock(autospec=AdminMessage) + channel2 = Channel(index=2, role=0) # disabled + msg2.get_channel_response = channel2 + + # default primary channel + packet1 = { + 'from': 2475227164, + 'to': 2475227164, + 'decoded': { + 'portnum': 'ADMIN_APP', + 'payload': b':\t\x12\x05\x18\x03"\x01\x01\x18\x01', + 'requestId': 2615094405, + 'admin': { + 'getChannelResponse': { + 'settings': { + 'modemConfig': 'Bw125Cr48Sf4096', + 'psk': 'AQ==' + }, + 'role': 'PRIMARY' + }, + 'raw': msg1, + } + }, + 'id': 1692918436, + 'hopLimit': 3, + 'priority': + 'RELIABLE', + 'raw': 'fake', + 'fromId': '!9388f81c', + 'toId': '!9388f81c' + } + + # no other channels + packet2 = { + 'from': 2475227164, + 'to': 2475227164, + 'decoded': { + 'portnum': 'ADMIN_APP', + 'payload': b':\x04\x08\x02\x12\x00', + 'requestId': 743049663, + 'admin': { + 'getChannelResponse': { + 'index': 2, + 'settings': {} + }, + 'raw': msg2, + } + }, + 'id': 1692918456, + 'rxTime': 1640202239, + 'hopLimit': 3, + 'priority': 'RELIABLE', + 'raw': 'faked', + 'fromId': '!9388f81c', + 'toId': '!9388f81c' + } + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + mo.myInfo.max_channels = 8 + anode = Node(mo, 'bar', noProto=True) + + radioConfig = RadioConfig() + anode.radioConfig = radioConfig + + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + with caplog.at_level(logging.DEBUG): + anode.requestConfig() + anode.onResponseRequestChannel(packet1) + assert re.search(r'Received channel', caplog.text, re.MULTILINE) + anode.onResponseRequestChannel(packet2) + assert re.search(r'Received channel', caplog.text, re.MULTILINE) + assert re.search(r'Finished downloading channels', caplog.text, re.MULTILINE) + assert len(anode.channels) == 8 + assert anode.channels[0].settings.modem_config == 3 + assert anode.channels[1].settings.name == '' + assert anode.channels[2].settings.name == '' + assert anode.channels[3].settings.name == '' + assert anode.channels[4].settings.name == '' + assert anode.channels[5].settings.name == '' + assert anode.channels[6].settings.name == '' + assert anode.channels[7].settings.name == '' @pytest.mark.unit @@ -649,19 +738,6 @@ def test_onResponseRequestSetting(caplog): position_flags: 35 } }""" - # TODO: not sure if this tmpraw is formatted correctly - tmpraw = """from: 2475227164 -to: 2475227164 -decoded { - portnum: ADMIN_APP - payload: "*\016\n\0140\204\007P\254\002\210\001\001\260\t#" - request_id: 3145147848 -} -id: 365963704 -rx_time: 1640195197 -hop_limit: 3 -priority: RELIABLE""" - packet = { 'from': 2475227164, 'to': 2475227164, @@ -684,7 +760,7 @@ priority: RELIABLE""" 'rxTime': 1640195197, 'hopLimit': 3, 'priority': 'RELIABLE', - 'raw': tmpraw, + 'raw': 'faked', 'fromId': '!9388f81c', 'toId': '!9388f81c' } From 4428ccfe596f977c9ba02bc5bbe68485f6b67b3b Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Wed, 22 Dec 2021 16:15:16 -0800 Subject: [PATCH 09/10] add unit tests for getMyUser(), getLongName(), getShortName(), and sendPacket() --- meshtastic/mesh_interface.py | 2 + meshtastic/tests/conftest.py | 45 +++++++++ meshtastic/tests/test_mesh_interface.py | 129 ++++++++++++++++++++++++ 3 files changed, 176 insertions(+) diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index 844e729..92581c5 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -103,6 +103,7 @@ class MeshInterface: rows = [] if self.nodes: + logging.debug(f'self.nodes:{self.nodes}') for node in self.nodes.values(): if not includeSelf and node['num'] == self.localNode.nodeNum: continue @@ -351,6 +352,7 @@ class MeshInterface: """Get info about my node.""" if self.myInfo is None: return None + logging.debug(f'self.nodesByNum:{self.nodesByNum}') return self.nodesByNum.get(self.myInfo.my_node_num) def getMyUser(self): diff --git a/meshtastic/tests/conftest.py b/meshtastic/tests/conftest.py index 3bcaecb..38b2554 100644 --- a/meshtastic/tests/conftest.py +++ b/meshtastic/tests/conftest.py @@ -3,8 +3,10 @@ import argparse import pytest +from unittest.mock import patch, MagicMock from meshtastic.__main__ import Globals +from ..mesh_interface import MeshInterface @pytest.fixture def reset_globals(): @@ -13,3 +15,46 @@ def reset_globals(): parser = argparse.ArgumentParser() Globals.getInstance().reset() Globals.getInstance().set_parser(parser) + + +@pytest.fixture +def iface_with_nodes(): + """Fixture to setup some nodes.""" + nodesById = { + '!9388f81c': { + 'num': 2475227164, + 'user': { + 'id': '!9388f81c', + 'longName': 'Unknown f81c', + 'shortName': '?1C', + 'macaddr': 'RBeTiPgc', + 'hwModel': 'TBEAM' + }, + 'position': {}, + 'lastHeard': 1640204888 + } + } + + nodesByNum = { + 2475227164: { + 'num': 2475227164, + 'user': { + 'id': '!9388f81c', + 'longName': 'Unknown f81c', + 'shortName': '?1C', + 'macaddr': 'RBeTiPgc', + 'hwModel': 'TBEAM' + }, + 'position': { + 'time': 1640206266 + }, + 'lastHeard': 1640206266 + } + } + iface = MeshInterface(noProto=True) + iface.nodes = nodesById + iface.nodesByNum = nodesByNum + myInfo = MagicMock() + iface.myInfo = myInfo + iface.myInfo.my_node_num = 2475227164 + return iface diff --git a/meshtastic/tests/test_mesh_interface.py b/meshtastic/tests/test_mesh_interface.py index 5888fd0..2bb865b 100644 --- a/meshtastic/tests/test_mesh_interface.py +++ b/meshtastic/tests/test_mesh_interface.py @@ -16,6 +16,29 @@ from ..__init__ import LOCAL_ADDR, BROADCAST_ADDR def test_MeshInterface(capsys, reset_globals): """Test that we can instantiate a MeshInterface""" iface = MeshInterface(noProto=True) + anode = Node('foo', 'bar') + + nodes = { + '!9388f81c': { + 'num': 2475227164, + 'user': { + 'id': '!9388f81c', + 'longName': 'Unknown f81c', + 'shortName': '?1C', + 'macaddr': 'RBeTiPgc', + 'hwModel': 'TBEAM' + }, + 'position': {}, + 'lastHeard': 1640204888 + } + } + + iface.nodesByNum = {1: anode } + iface.nodes = nodes + + myInfo = MagicMock() + iface.myInfo = myInfo + iface.showInfo() iface.localNode.showInfo() iface.showNodes() @@ -30,6 +53,62 @@ def test_MeshInterface(capsys, reset_globals): assert err == '' +@pytest.mark.unit +def test_getMyUser_and_LongName_and_ShortName(reset_globals): + """Test getMyUser(), getLongName(), and getShortName(). + Note: These should be separate tests, but feeling lazy. + Could move these nodes out to a fixture and + create smaller tests. + """ + + nodesById = { + '!9388f81c': { + 'num': 2475227164, + 'user': { + 'id': '!9388f81c', + 'longName': 'Unknown f81c', + 'shortName': '?1C', + 'macaddr': 'RBeTiPgc', + 'hwModel': 'TBEAM' + }, + 'position': {}, + 'lastHeard': 1640204888 + } + } + + nodesByNum = { + 2475227164: { + 'num': 2475227164, + 'user': { + 'id': '!9388f81c', + 'longName': 'Unknown f81c', + 'shortName': '?1C', + 'macaddr': 'RBeTiPgc', + 'hwModel': 'TBEAM' + }, + 'position': { + 'time': 1640206266 + }, + 'lastHeard': 1640206266 + } + } + + iface = MeshInterface(noProto=True) + iface.nodes = nodesById + iface.nodesByNum = nodesByNum + myInfo = MagicMock() + iface.myInfo = myInfo + iface.myInfo.my_node_num = 2475227164 + myuser = iface.getMyUser() + print(f'myuser:{myuser}') + assert myuser is not None + assert myuser["id"] == '!9388f81c' + mylongname = iface.getLongName() + assert mylongname == 'Unknown f81c' + myshortname = iface.getShortName() + assert myshortname == '?1C' + + @pytest.mark.unit def test_handlePacketFromRadio_no_from(capsys, reset_globals): """Test _handlePacketFromRadio with no 'from' in the mesh packet.""" @@ -309,6 +388,16 @@ def test_sendPacket_with_destination_as_int(caplog, reset_globals): assert re.search(r'Sending packet', caplog.text, re.MULTILINE) +@pytest.mark.unit +def test_sendPacket_with_destination_starting_with_a_bang(caplog, reset_globals): + """Test _sendPacket() with int as a destination""" + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.DEBUG): + meshPacket = mesh_pb2.MeshPacket() + iface._sendPacket(meshPacket, destinationId='!1234') + assert re.search(r'Sending packet', caplog.text, re.MULTILINE) + + @pytest.mark.unit def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog, reset_globals): """Test _sendPacket() with BROADCAST_ADDR as a destination""" @@ -345,6 +434,31 @@ def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog, reset_glo assert re.search(r'Sending packet', caplog.text, re.MULTILINE) +@pytest.mark.unit +def test_sendPacket_with_destination_is_blank_with_nodes(capsys, reset_globals, iface_with_nodes): + """Test _sendPacket() with '' as a destination with myInfo""" + iface = iface_with_nodes + meshPacket = mesh_pb2.MeshPacket() + with pytest.raises(SystemExit) as pytest_wrapped_e: + iface._sendPacket(meshPacket, destinationId='') + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + out, err = capsys.readouterr() + assert re.match(r'Warning: NodeId not found in DB', out, re.MULTILINE) + assert err == '' + + +@pytest.mark.unit +def test_sendPacket_with_destination_is_blank_without_nodes(caplog, reset_globals, iface_with_nodes): + """Test _sendPacket() with '' as a destination with myInfo""" + iface = iface_with_nodes + iface.nodes = None + meshPacket = mesh_pb2.MeshPacket() + with caplog.at_level(logging.WARNING): + iface._sendPacket(meshPacket, destinationId='') + assert re.search(r'Warning: There were no self.nodes.', caplog.text, re.MULTILINE) + + @pytest.mark.unit def test_getMyNodeInfo(reset_globals): """Test getMyNodeInfo()""" @@ -357,3 +471,18 @@ def test_getMyNodeInfo(reset_globals): iface.myInfo.my_node_num = 1 myinfo = iface.getMyNodeInfo() assert myinfo == anode + + +@pytest.mark.unit +def test_generatePacketId(capsys, reset_globals): + """Test _generatePacketId() when no currentPacketId (not connected)""" + iface = MeshInterface(noProto=True) + # not sure when this condition would ever happen... but we can simulate it + iface.currentPacketId = None + assert iface.currentPacketId is None + with pytest.raises(Exception) as pytest_wrapped_e: + iface._generatePacketId() + out, err = capsys.readouterr() + assert re.search(r'Not connected yet, can not generate packet', out, re.MULTILINE) + assert err == '' + assert pytest_wrapped_e.type == Exception From 0f0a9786920d7817be7d21da3759013af62d6858 Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Wed, 22 Dec 2021 16:20:09 -0800 Subject: [PATCH 10/10] minor refactor now that there is a fixture --- meshtastic/tests/conftest.py | 2 +- meshtastic/tests/test_mesh_interface.py | 60 +++++++------------------ 2 files changed, 18 insertions(+), 44 deletions(-) diff --git a/meshtastic/tests/conftest.py b/meshtastic/tests/conftest.py index 38b2554..12c566c 100644 --- a/meshtastic/tests/conftest.py +++ b/meshtastic/tests/conftest.py @@ -2,8 +2,8 @@ import argparse +from unittest.mock import MagicMock import pytest -from unittest.mock import patch, MagicMock from meshtastic.__main__ import Globals from ..mesh_interface import MeshInterface diff --git a/meshtastic/tests/test_mesh_interface.py b/meshtastic/tests/test_mesh_interface.py index 2bb865b..2f56ebb 100644 --- a/meshtastic/tests/test_mesh_interface.py +++ b/meshtastic/tests/test_mesh_interface.py @@ -54,57 +54,31 @@ def test_MeshInterface(capsys, reset_globals): @pytest.mark.unit -def test_getMyUser_and_LongName_and_ShortName(reset_globals): - """Test getMyUser(), getLongName(), and getShortName(). - Note: These should be separate tests, but feeling lazy. - Could move these nodes out to a fixture and - create smaller tests. - """ +def test_getMyUser(reset_globals, iface_with_nodes): + """Test getMyUser()""" + iface = iface_with_nodes - nodesById = { - '!9388f81c': { - 'num': 2475227164, - 'user': { - 'id': '!9388f81c', - 'longName': 'Unknown f81c', - 'shortName': '?1C', - 'macaddr': 'RBeTiPgc', - 'hwModel': 'TBEAM' - }, - 'position': {}, - 'lastHeard': 1640204888 - } - } - - nodesByNum = { - 2475227164: { - 'num': 2475227164, - 'user': { - 'id': '!9388f81c', - 'longName': 'Unknown f81c', - 'shortName': '?1C', - 'macaddr': 'RBeTiPgc', - 'hwModel': 'TBEAM' - }, - 'position': { - 'time': 1640206266 - }, - 'lastHeard': 1640206266 - } - } - - iface = MeshInterface(noProto=True) - iface.nodes = nodesById - iface.nodesByNum = nodesByNum - myInfo = MagicMock() - iface.myInfo = myInfo iface.myInfo.my_node_num = 2475227164 myuser = iface.getMyUser() print(f'myuser:{myuser}') assert myuser is not None assert myuser["id"] == '!9388f81c' + + +@pytest.mark.unit +def test_getLongName(reset_globals, iface_with_nodes): + """Test getLongName()""" + iface = iface_with_nodes + iface.myInfo.my_node_num = 2475227164 mylongname = iface.getLongName() assert mylongname == 'Unknown f81c' + + +@pytest.mark.unit +def test_getShortName(reset_globals, iface_with_nodes): + """Test getShortName().""" + iface = iface_with_nodes + iface.myInfo.my_node_num = 2475227164 myshortname = iface.getShortName() assert myshortname == '?1C'