From 6b0521003cc1bd97a65cd0436474f1af2e657964 Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Thu, 16 Dec 2021 11:51:25 -0800 Subject: [PATCH 1/6] wip on adding unit tests to mesh_interface --- meshtastic/__init__.py | 9 +++++---- meshtastic/mesh_interface.py | 18 ++++++++++++----- meshtastic/tests/test_mesh_interface.py | 27 +++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 9 deletions(-) diff --git a/meshtastic/__init__.py b/meshtastic/__init__.py index 112a88d..a73e6bf 100644 --- a/meshtastic/__init__.py +++ b/meshtastic/__init__.py @@ -155,10 +155,11 @@ def _onNodeInfoReceive(iface, asDict): def _receiveInfoUpdate(iface, asDict): - iface._getOrCreateByNum(asDict["from"])["lastReceived"] = asDict - iface._getOrCreateByNum(asDict["from"])["lastHeard"] = asDict.get("rxTime") - iface._getOrCreateByNum(asDict["from"])["snr"] = asDict.get("rxSnr") - iface._getOrCreateByNum(asDict["from"])["hopLimit"] = asDict.get("hopLimit") + if "from" in asDict: + iface._getOrCreateByNum(asDict["from"])["lastReceived"] = asDict + iface._getOrCreateByNum(asDict["from"])["lastHeard"] = asDict.get("rxTime") + iface._getOrCreateByNum(asDict["from"])["snr"] = asDict.get("rxSnr") + iface._getOrCreateByNum(asDict["from"])["hopLimit"] = asDict.get("hopLimit") """Well known message payloads can register decoders for automatic protobuf parsing""" diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index b5a3954..18f904a 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -1,4 +1,4 @@ -""" Mesh Interface class +"""Mesh Interface class """ import sys import random @@ -222,6 +222,7 @@ class MeshInterface: logging.debug(f"Serializing protobuf as data: {stripnl(data)}") data = data.SerializeToString() + logging.debug(f"len(data): {len(data)}") if len(data) > mesh_pb2.Constants.DATA_PAYLOAD_LEN: Exception("Data payload too big") @@ -543,9 +544,16 @@ class MeshInterface: self.nodesByNum[nodeNum] = n return n - def _handlePacketFromRadio(self, meshPacket): + def _handlePacketFromRadio(self, meshPacket, hack=False): """Handle a MeshPacket that just arrived from the radio + hack - well, since we used 'from', which is a python keyword, + as an attribute to MeshPacket in protobufs, + there really is no way to do something like this: + meshPacket = mesh_pb2.MeshPacket() + meshPacket.from = 123 + If hack is True, we can unit test this code. + Will publish one of the following events: - meshtastic.receive.text(packet = MeshPacket dictionary) - meshtastic.receive.position(packet = MeshPacket dictionary) @@ -561,10 +569,10 @@ class MeshInterface: asDict["raw"] = meshPacket # from might be missing if the nodenum was zero. - if "from" not in asDict: + if not hack and "from" not in asDict: asDict["from"] = 0 - logging.error( - f"Device returned a packet we sent, ignoring: {stripnl(asDict)}") + logging.error(f"Device returned a packet we sent, ignoring: {stripnl(asDict)}") + print(f"Error: Device returned a packet we sent, ignoring: {stripnl(asDict)}") return if "to" not in asDict: asDict["to"] = 0 diff --git a/meshtastic/tests/test_mesh_interface.py b/meshtastic/tests/test_mesh_interface.py index 63dac21..31224b8 100644 --- a/meshtastic/tests/test_mesh_interface.py +++ b/meshtastic/tests/test_mesh_interface.py @@ -5,6 +5,8 @@ import re import pytest from ..mesh_interface import MeshInterface +from .. import mesh_pb2 +#from ..mesh_pb2 import MeshPacket @pytest.mark.unit @@ -14,6 +16,7 @@ def test_MeshInterface(capsys): iface.showInfo() iface.localNode.showInfo() iface.showNodes() + iface.sendText('hello') iface.close() out, err = capsys.readouterr() assert re.search(r'Owner: None \(None\)', out, re.MULTILINE) @@ -22,3 +25,27 @@ def test_MeshInterface(capsys): assert re.search(r'Channels', out, re.MULTILINE) assert re.search(r'Primary channel URL', out, re.MULTILINE) assert err == '' + + +@pytest.mark.unit +def test_handlePacketFromRadio_no_from(capsys): + """Test _handlePacketFromRadio no 'from'""" + iface = MeshInterface(noProto=True) + meshPacket = mesh_pb2.MeshPacket() + iface._handlePacketFromRadio(meshPacket) + out, err = capsys.readouterr() + assert re.search(r'Device returned a packet we sent, ignoring', out, re.MULTILINE) + assert err == '' + + +@pytest.mark.unit +def test_MeshPacket_set_from(capsys): + """Test setting 'from' MeshPacket """ + iface = MeshInterface(noProto=True) + meshPacket = mesh_pb2.MeshPacket() + meshPacket.decoded.payload = b'' + meshPacket.decoded.portnum = 1 + iface._handlePacketFromRadio(meshPacket, hack=True) + out, err = capsys.readouterr() + assert re.search(r'', out, re.MULTILINE) + assert err == '' From 2a0c9f06695d4f65de4faaab6a20ecbc86278211 Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Thu, 16 Dec 2021 17:02:40 -0800 Subject: [PATCH 2/6] add mesh_interface unit tests for handlePacketFromRadio and getNode --- meshtastic/__main__.py | 4 +- meshtastic/mesh_interface.py | 26 +++++---- meshtastic/tests/test_mesh_interface.py | 72 +++++++++++++++++++++---- 3 files changed, 80 insertions(+), 22 deletions(-) diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 08a96bb..22ec5d0 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -29,11 +29,11 @@ def onReceive(packet, interface): d = packet.get('decoded') # Exit once we receive a reply - if args.sendtext and packet["to"] == interface.myInfo.my_node_num and d["portnum"] == portnums_pb2.PortNum.TEXT_MESSAGE_APP: + if args and args.sendtext and packet["to"] == interface.myInfo.my_node_num and d["portnum"] == portnums_pb2.PortNum.TEXT_MESSAGE_APP: interface.close() # after running command then exit # Reply to every received message with some stats - if args.reply: + if args and args.reply: msg = d.get('text') if msg: #shortName = packet['decoded']['shortName'] diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index 18f904a..4456d5d 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -16,9 +16,9 @@ from pubsub import pub from google.protobuf.json_format import MessageToJson +import meshtastic.node from . import portnums_pb2, mesh_pb2 from .util import stripnl, Timeout, our_exit -from .node import Node from .__init__ import LOCAL_ADDR, BROADCAST_NUM, BROADCAST_ADDR, ResponseHandler, publishingThread, OUR_APP_VERSION, protocols @@ -46,7 +46,7 @@ class MeshInterface: self.nodes = None # FIXME self.isConnected = threading.Event() self.noProto = noProto - self.localNode = Node(self, -1) # We fixup nodenum later + self.localNode = meshtastic.node.Node(self, -1) # We fixup nodenum later self.myInfo = None # We don't have device info yet self.responseHandlers = {} # A map from request ID to the handler self.failure = None # If we've encountered a fatal exception it will be kept here @@ -151,7 +151,8 @@ class MeshInterface: if nodeId == LOCAL_ADDR: return self.localNode else: - n = Node(self, nodeId) + n = meshtastic.node.Node(self, nodeId) + logging.debug("About to requestConfig") n.requestConfig() if not n.waitForConfig(): our_exit("Error: Timed out waiting for node config") @@ -292,6 +293,7 @@ class MeshInterface: toRadio = mesh_pb2.ToRadio() + nodeNum = 0 if destinationId is None: our_exit("Warning: destinationId must not be None") elif isinstance(destinationId, int): @@ -304,10 +306,13 @@ class MeshInterface: elif destinationId.startswith("!"): nodeNum = int(destinationId[1:], 16) else: - node = self.nodes.get(destinationId) - if not node: - our_exit(f"Warning: NodeId {destinationId} not found in DB") - nodeNum = node['num'] + if self.nodes: + node = self.nodes.get(destinationId) + if not node: + our_exit(f"Warning: NodeId {destinationId} not found in DB") + nodeNum = node['num'] + else: + logging.warning("Warning: There were no self.nodes.") meshPacket.to = nodeNum meshPacket.want_ack = wantAck @@ -600,9 +605,10 @@ class MeshInterface: # UNKNOWN_APP is the default protobuf portnum value, and therefore if not # set it will not be populated at all to make API usage easier, set # it to prevent confusion - if not "portnum" in decoded: - decoded["portnum"] = portnums_pb2.PortNum.Name( - portnums_pb2.PortNum.UNKNOWN_APP) + if "portnum" not in decoded: + new_portnum = portnums_pb2.PortNum.Name(portnums_pb2.PortNum.UNKNOWN_APP) + decoded["portnum"] = new_portnum + logging.warning(f"portnum was not in decoded. Setting to:{new_portnum}") portnum = decoded["portnum"] diff --git a/meshtastic/tests/test_mesh_interface.py b/meshtastic/tests/test_mesh_interface.py index 31224b8..3a0babc 100644 --- a/meshtastic/tests/test_mesh_interface.py +++ b/meshtastic/tests/test_mesh_interface.py @@ -1,16 +1,19 @@ """Meshtastic unit tests for mesh_interface.py""" import re +import logging +from unittest.mock import patch, MagicMock import pytest from ..mesh_interface import MeshInterface +from ..node import Node from .. import mesh_pb2 -#from ..mesh_pb2 import MeshPacket +from ..__init__ import LOCAL_ADDR @pytest.mark.unit -def test_MeshInterface(capsys): +def test_MeshInterface(capsys, reset_globals): """Test that we can instantiate a MeshInterface""" iface = MeshInterface(noProto=True) iface.showInfo() @@ -28,8 +31,8 @@ def test_MeshInterface(capsys): @pytest.mark.unit -def test_handlePacketFromRadio_no_from(capsys): - """Test _handlePacketFromRadio no 'from'""" +def test_handlePacketFromRadio_no_from(capsys, reset_globals): + """Test _handlePacketFromRadio with no 'from' in the mesh packet.""" iface = MeshInterface(noProto=True) meshPacket = mesh_pb2.MeshPacket() iface._handlePacketFromRadio(meshPacket) @@ -39,13 +42,62 @@ def test_handlePacketFromRadio_no_from(capsys): @pytest.mark.unit -def test_MeshPacket_set_from(capsys): - """Test setting 'from' MeshPacket """ +def test_handlePacketFromRadio_with_a_portnum(caplog, reset_globals): + """Test _handlePacketFromRadio with a portnum + Since we have an attribute called 'from', we cannot simply 'set' it. + Had to implement a hack just to be able to test some code. + """ iface = MeshInterface(noProto=True) meshPacket = mesh_pb2.MeshPacket() meshPacket.decoded.payload = b'' meshPacket.decoded.portnum = 1 - iface._handlePacketFromRadio(meshPacket, hack=True) - out, err = capsys.readouterr() - assert re.search(r'', out, re.MULTILINE) - assert err == '' + with caplog.at_level(logging.WARNING): + iface._handlePacketFromRadio(meshPacket, hack=True) + assert re.search(r'Not populating fromId', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_handlePacketFromRadio_no_portnum(caplog, reset_globals): + """Test _handlePacketFromRadio without a portnum""" + iface = MeshInterface(noProto=True) + meshPacket = mesh_pb2.MeshPacket() + meshPacket.decoded.payload = b'' + with caplog.at_level(logging.WARNING): + iface._handlePacketFromRadio(meshPacket, hack=True) + assert re.search(r'Not populating fromId', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_getNode_with_local(reset_globals): + """Test getNode""" + iface = MeshInterface(noProto=True) + anode = iface.getNode(LOCAL_ADDR) + assert anode == iface.localNode + + +@pytest.mark.unit +def test_getNode_not_local(reset_globals, caplog): + """Test getNode not local""" + iface = MeshInterface(noProto=True) + anode = MagicMock(autospec=Node) + with caplog.at_level(logging.DEBUG): + with patch('meshtastic.node.Node', return_value=anode): + another_node = iface.getNode('bar2') + assert another_node != iface.localNode + assert re.search(r'About to requestConfig', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_getNode_not_local_timeout(reset_globals, capsys): + """Test getNode not local, simulate timeout""" + iface = MeshInterface(noProto=True) + anode = MagicMock(autospec=Node) + anode.waitForConfig.return_value = False + with patch('meshtastic.node.Node', return_value=anode): + with pytest.raises(SystemExit) as pytest_wrapped_e: + iface.getNode('bar2') + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + out, err = capsys.readouterr() + assert re.match(r'Error: Timed out waiting for node config', out) + assert err == '' From e43d9f531846220d2cfcd5030a9b72c4a0151142 Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Thu, 16 Dec 2021 19:12:55 -0800 Subject: [PATCH 3/6] add unit tests for _handleFromRadio() and sendPosition() --- meshtastic/mesh_interface.py | 4 +- meshtastic/tests/test_mesh_interface.py | 74 +++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index 4456d5d..e4ba510 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -268,6 +268,7 @@ class MeshInterface: if timeSec == 0: timeSec = time.time() # returns unix timestamp in seconds p.time = int(timeSec) + logging.debug(f'p.time:{p.time}') return self.sendData(p, destinationId, portNum=portnums_pb2.PortNum.POSITION_APP, @@ -455,8 +456,9 @@ class MeshInterface: Called by subclasses.""" fromRadio = mesh_pb2.FromRadio() fromRadio.ParseFromString(fromRadioBytes) + #logging.debug(f"fromRadioBytes: {fromRadioBytes}") asDict = google.protobuf.json_format.MessageToDict(fromRadio) - #logging.debug(f"Received from radio: {fromRadio}") + logging.debug(f"Received from radio: {fromRadio}") if fromRadio.HasField("my_info"): self.myInfo = fromRadio.my_info self.localNode.nodeNum = self.myInfo.my_node_num diff --git a/meshtastic/tests/test_mesh_interface.py b/meshtastic/tests/test_mesh_interface.py index 3a0babc..e3357ad 100644 --- a/meshtastic/tests/test_mesh_interface.py +++ b/meshtastic/tests/test_mesh_interface.py @@ -101,3 +101,77 @@ def test_getNode_not_local_timeout(reset_globals, capsys): out, err = capsys.readouterr() assert re.match(r'Error: Timed out waiting for node config', out) assert err == '' + + +@pytest.mark.unit +def test_sendPosition(reset_globals, caplog): + """Test sendPosition""" + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.DEBUG): + iface.sendPosition() + iface.close() + assert re.search(r'p.time:', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_handleFromRadio_empty_payload(reset_globals, caplog): + """Test _handleFromRadio""" + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.DEBUG): + iface._handleFromRadio(b'') + iface.close() + assert re.search(r'Unexpected FromRadio payload', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_handleFromRadio_with_my_info(reset_globals, caplog): + """Test _handleFromRadio with my_info""" + # Note: I captured the '--debug --info' for the bytes below. + # It "translates" to this: + # my_info { + # my_node_num: 682584012 + # num_bands: 13 + # firmware_version: "1.2.49.5354c49" + # reboot_count: 13 + # bitrate: 17.088470458984375 + # message_timeout_msec: 300000 + # min_app_version: 20200 + # max_channels: 8 + # } + from_radio_bytes = b'\x1a,\x08\xcc\xcf\xbd\xc5\x02\x18\r2\x0e1.2.49.5354c49P\r]0\xb5\x88Ah\xe0\xa7\x12p\xe8\x9d\x01x\x08\x90\x01\x01' + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.DEBUG): + iface._handleFromRadio(from_radio_bytes) + iface.close() + assert re.search(r'Received myinfo', caplog.text, re.MULTILINE) + assert re.search(r'num_bands: 13', caplog.text, re.MULTILINE) + assert re.search(r'max_channels: 8', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_handleFromRadio_with_node_info(reset_globals, caplog): + """Test _handleFromRadio with node_info""" + # Note: I captured the '--debug --info' for the bytes below. + # It "translates" to this: + # node_info { + # num: 682584012 + # user { + # id: "!28af67cc" + # long_name: "Unknown 67cc" + # short_name: "?CC" + # macaddr: "$o(\257g\314" + # hw_model: HELTEC_V2_1 + # } + # position { + # } + # } + + from_radio_bytes = b'"2\x08\xcc\xcf\xbd\xc5\x02\x12(\n\t!28af67cc\x12\x0cUnknown 67cc\x1a\x03?CC"\x06$o(\xafg\xcc0\n\x1a\x00' + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.DEBUG): + iface._startConfig() + iface._handleFromRadio(from_radio_bytes) + iface.close() + assert re.search(r'Received nodeinfo', caplog.text, re.MULTILINE) + assert re.search(r'682584012', caplog.text, re.MULTILINE) + assert re.search(r'HELTEC_V2_1', caplog.text, re.MULTILINE) From fc637ee6ae765d2913b345983aceacffbb2bfb1a Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Thu, 16 Dec 2021 22:17:08 -0800 Subject: [PATCH 4/6] add to test for showNodes() --- meshtastic/tests/test_mesh_interface.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/meshtastic/tests/test_mesh_interface.py b/meshtastic/tests/test_mesh_interface.py index e3357ad..27c2f4e 100644 --- a/meshtastic/tests/test_mesh_interface.py +++ b/meshtastic/tests/test_mesh_interface.py @@ -149,7 +149,7 @@ def test_handleFromRadio_with_my_info(reset_globals, caplog): @pytest.mark.unit -def test_handleFromRadio_with_node_info(reset_globals, caplog): +def test_handleFromRadio_with_node_info(reset_globals, caplog, capsys): """Test _handleFromRadio with node_info""" # Note: I captured the '--debug --info' for the bytes below. # It "translates" to this: @@ -171,7 +171,14 @@ def test_handleFromRadio_with_node_info(reset_globals, caplog): with caplog.at_level(logging.DEBUG): iface._startConfig() iface._handleFromRadio(from_radio_bytes) - iface.close() - assert re.search(r'Received nodeinfo', caplog.text, re.MULTILINE) - assert re.search(r'682584012', caplog.text, re.MULTILINE) - assert re.search(r'HELTEC_V2_1', caplog.text, re.MULTILINE) + assert re.search(r'Received nodeinfo', caplog.text, re.MULTILINE) + assert re.search(r'682584012', caplog.text, re.MULTILINE) + assert re.search(r'HELTEC_V2_1', caplog.text, re.MULTILINE) + # validate some of showNodes() output + iface.showNodes() + out, err = capsys.readouterr() + assert re.search(r' 1 ', out, re.MULTILINE) + assert re.search(r'│ Unknown 67cc │ ', out, re.MULTILINE) + assert re.search(r'│ !28af67cc │ N/A │ N/A │ N/A', out, re.MULTILINE) + assert err == '' + iface.close() From fd6deedd8d69eab40b89d20b8eecff7121f7782e Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Thu, 16 Dec 2021 22:47:10 -0800 Subject: [PATCH 5/6] add unit tests for setOwner() --- meshtastic/node.py | 30 ++++++++++++++------- meshtastic/tests/test_node.py | 49 +++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 10 deletions(-) diff --git a/meshtastic/node.py b/meshtastic/node.py index 64b21fe..1e6e893 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -14,7 +14,7 @@ class Node: Includes methods for radioConfig and channels """ - def __init__(self, iface, nodeNum): + def __init__(self, iface, nodeNum, noProto=False): """Constructor""" self.iface = iface self.nodeNum = nodeNum @@ -22,6 +22,7 @@ class Node: self.channels = None self._timeout = Timeout(maxSecs=60) self.partialChannels = None + self.noProto = noProto def showChannels(self): """Show human readable description of our channels.""" @@ -162,6 +163,11 @@ class Node: if team is not None: p.set_owner.team = team + # Note: These debug lines are used in unit tests + logging.debug(f'p.set_owner.long_name:{p.set_owner.long_name}:') + logging.debug(f'p.set_owner.short_name:{p.set_owner.short_name}:') + logging.debug(f'p.set_owner.is_licensed:{p.set_owner.is_licensed}') + logging.debug(f'p.set_owner.team:{p.set_owner.team}') return self._sendAdmin(p) def getURL(self, includeAll: bool = True): @@ -307,22 +313,26 @@ class Node: self._fixupChannels() # FIXME, the following should only be called after we have settings and channels - self.iface._connected() # Tell everone else we are ready to go + self.iface._connected() # Tell everyone else we are ready to go else: self._requestChannel(index + 1) return self._sendAdmin(p, wantResponse=True, onResponse=onResponse) + # pylint: disable=R1710 def _sendAdmin(self, p: admin_pb2.AdminMessage, wantResponse=False, onResponse=None, adminIndex=0): """Send an admin message to the specified node (or the local node if destNodeNum is zero)""" - if adminIndex == 0: # unless a special channel index was used, we want to use the admin index - adminIndex = self.iface.localNode._getAdminChannelIndex() + if self.noProto: + logging.warning(f"Not sending packet because protocol use is disabled by noProto") + else: + if adminIndex == 0: # unless a special channel index was used, we want to use the admin index + adminIndex = self.iface.localNode._getAdminChannelIndex() - return self.iface.sendData(p, self.nodeNum, - portNum=portnums_pb2.PortNum.ADMIN_APP, - wantAck=True, - wantResponse=wantResponse, - onResponse=onResponse, - channelIndex=adminIndex) + return self.iface.sendData(p, self.nodeNum, + portNum=portnums_pb2.PortNum.ADMIN_APP, + wantAck=True, + wantResponse=wantResponse, + onResponse=onResponse, + channelIndex=adminIndex) diff --git a/meshtastic/tests/test_node.py b/meshtastic/tests/test_node.py index 3787e76..8d07f32 100644 --- a/meshtastic/tests/test_node.py +++ b/meshtastic/tests/test_node.py @@ -1,6 +1,7 @@ """Meshtastic unit tests for node.py""" import re +import logging from unittest.mock import patch, MagicMock import pytest @@ -32,3 +33,51 @@ def test_node_reqquestConfig(): with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg): anode = Node(mo, 'bar') anode.requestConfig() + + +@pytest.mark.unit +def test_setOwner_and_team(caplog): + """Test setOwner""" + anode = Node('foo', 'bar', noProto=True) + with caplog.at_level(logging.DEBUG): + anode.setOwner(long_name ='Test123', short_name='123', team=1) + assert re.search(r'p.set_owner.long_name:Test123:', caplog.text, re.MULTILINE) + assert re.search(r'p.set_owner.short_name:123:', caplog.text, re.MULTILINE) + assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE) + assert re.search(r'p.set_owner.team:1', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_setOwner_no_short_name(caplog): + """Test setOwner""" + anode = Node('foo', 'bar', noProto=True) + with caplog.at_level(logging.DEBUG): + anode.setOwner(long_name ='Test123') + assert re.search(r'p.set_owner.long_name:Test123:', caplog.text, re.MULTILINE) + assert re.search(r'p.set_owner.short_name:Tst:', caplog.text, re.MULTILINE) + assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE) + assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_setOwner_no_short_name_and_long_name_is_short(caplog): + """Test setOwner""" + anode = Node('foo', 'bar', noProto=True) + with caplog.at_level(logging.DEBUG): + anode.setOwner(long_name ='Tnt') + assert re.search(r'p.set_owner.long_name:Tnt:', caplog.text, re.MULTILINE) + assert re.search(r'p.set_owner.short_name:Tnt:', caplog.text, re.MULTILINE) + assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE) + assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_setOwner_no_short_name_and_long_name_has_words(caplog): + """Test setOwner""" + anode = Node('foo', 'bar', noProto=True) + with caplog.at_level(logging.DEBUG): + anode.setOwner(long_name ='A B C', is_licensed=True) + assert re.search(r'p.set_owner.long_name:A B C:', caplog.text, re.MULTILINE) + assert re.search(r'p.set_owner.short_name:ABC:', caplog.text, re.MULTILINE) + assert re.search(r'p.set_owner.is_licensed:True', caplog.text, re.MULTILINE) + assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE) From 95f7f560ba25bee95137975b8d4400018b08f4d6 Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Thu, 16 Dec 2021 23:23:43 -0800 Subject: [PATCH 6/6] add unit tests for reboot(), exitSimulator(), and setURL() --- meshtastic/node.py | 3 ++ meshtastic/tests/test_node.py | 58 +++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/meshtastic/node.py b/meshtastic/node.py index 1e6e893..837c06a 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -203,6 +203,7 @@ class Node: channelSet = apponly_pb2.ChannelSet() channelSet.ParseFromString(decodedURL) + if len(channelSet.settings) == 0: our_exit("Warning: There were no settings.") @@ -213,6 +214,7 @@ class Node: ch.index = i ch.settings.CopyFrom(chs) self.channels[ch.index] = ch + logging.debug(f'Channel i:{i} ch:{ch}') self.writeChannel(ch.index) i = i + 1 @@ -246,6 +248,7 @@ class Node: is ignored for other nodes)""" p = admin_pb2.AdminMessage() p.exit_simulator = True + logging.debug('in exitSimulator()') return self._sendAdmin(p) diff --git a/meshtastic/tests/test_node.py b/meshtastic/tests/test_node.py index 8d07f32..528bead 100644 --- a/meshtastic/tests/test_node.py +++ b/meshtastic/tests/test_node.py @@ -81,3 +81,61 @@ def test_setOwner_no_short_name_and_long_name_has_words(caplog): assert re.search(r'p.set_owner.short_name:ABC:', caplog.text, re.MULTILINE) assert re.search(r'p.set_owner.is_licensed:True', caplog.text, re.MULTILINE) assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_exitSimulator(caplog): + """Test exitSimulator""" + anode = Node('foo', 'bar', noProto=True) + with caplog.at_level(logging.DEBUG): + anode.exitSimulator() + assert re.search(r'in exitSimulator', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_reboot(caplog): + """Test reboot""" + anode = Node('foo', 'bar', noProto=True) + with caplog.at_level(logging.DEBUG): + anode.reboot() + assert re.search(r'Telling node to reboot', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_setURL_empty_url(): + """Test reboot""" + anode = Node('foo', 'bar', noProto=True) + with pytest.raises(SystemExit) as pytest_wrapped_e: + anode.setURL('') + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + + +@pytest.mark.unit +def test_setURL_valid_URL(caplog): + """Test setURL""" + iface = MagicMock(autospec=SerialInterface) + url = "https://www.meshtastic.org/d/#CgUYAyIBAQ" + with caplog.at_level(logging.DEBUG): + anode = Node(iface, 'bar', noProto=True) + anode.radioConfig = 'baz' + channels = ['zoo'] + anode.channels = channels + anode.setURL(url) + assert re.search(r'Channel i:0', caplog.text, re.MULTILINE) + assert re.search(r'modem_config: Bw125Cr48Sf4096', caplog.text, re.MULTILINE) + assert re.search(r'psk: "\\001"', caplog.text, re.MULTILINE) + assert re.search(r'role: PRIMARY', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_setURL_valid_URL_but_no_settings(caplog): + """Test setURL""" + iface = MagicMock(autospec=SerialInterface) + url = "https://www.meshtastic.org/d/#" + with pytest.raises(SystemExit) as pytest_wrapped_e: + anode = Node(iface, 'bar', noProto=True) + anode.radioConfig = 'baz' + anode.setURL(url) + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1