add mesh_interface unit tests for handlePacketFromRadio and getNode

This commit is contained in:
Mike Kinney
2021-12-16 17:02:40 -08:00
parent f7753b2ce8
commit 2a0c9f0669
3 changed files with 80 additions and 22 deletions

View File

@@ -29,11 +29,11 @@ def onReceive(packet, interface):
d = packet.get('decoded')
# Exit once we receive a reply
if args.sendtext and packet["to"] == interface.myInfo.my_node_num and d["portnum"] == portnums_pb2.PortNum.TEXT_MESSAGE_APP:
if args and args.sendtext and packet["to"] == interface.myInfo.my_node_num and d["portnum"] == portnums_pb2.PortNum.TEXT_MESSAGE_APP:
interface.close() # after running command then exit
# Reply to every received message with some stats
if args.reply:
if args and args.reply:
msg = d.get('text')
if msg:
#shortName = packet['decoded']['shortName']

View File

@@ -16,9 +16,9 @@ from pubsub import pub
from google.protobuf.json_format import MessageToJson
import meshtastic.node
from . import portnums_pb2, mesh_pb2
from .util import stripnl, Timeout, our_exit
from .node import Node
from .__init__ import LOCAL_ADDR, BROADCAST_NUM, BROADCAST_ADDR, ResponseHandler, publishingThread, OUR_APP_VERSION, protocols
@@ -46,7 +46,7 @@ class MeshInterface:
self.nodes = None # FIXME
self.isConnected = threading.Event()
self.noProto = noProto
self.localNode = Node(self, -1) # We fixup nodenum later
self.localNode = meshtastic.node.Node(self, -1) # We fixup nodenum later
self.myInfo = None # We don't have device info yet
self.responseHandlers = {} # A map from request ID to the handler
self.failure = None # If we've encountered a fatal exception it will be kept here
@@ -151,7 +151,8 @@ class MeshInterface:
if nodeId == LOCAL_ADDR:
return self.localNode
else:
n = Node(self, nodeId)
n = meshtastic.node.Node(self, nodeId)
logging.debug("About to requestConfig")
n.requestConfig()
if not n.waitForConfig():
our_exit("Error: Timed out waiting for node config")
@@ -292,6 +293,7 @@ class MeshInterface:
toRadio = mesh_pb2.ToRadio()
nodeNum = 0
if destinationId is None:
our_exit("Warning: destinationId must not be None")
elif isinstance(destinationId, int):
@@ -304,10 +306,13 @@ class MeshInterface:
elif destinationId.startswith("!"):
nodeNum = int(destinationId[1:], 16)
else:
node = self.nodes.get(destinationId)
if not node:
our_exit(f"Warning: NodeId {destinationId} not found in DB")
nodeNum = node['num']
if self.nodes:
node = self.nodes.get(destinationId)
if not node:
our_exit(f"Warning: NodeId {destinationId} not found in DB")
nodeNum = node['num']
else:
logging.warning("Warning: There were no self.nodes.")
meshPacket.to = nodeNum
meshPacket.want_ack = wantAck
@@ -600,9 +605,10 @@ class MeshInterface:
# UNKNOWN_APP is the default protobuf portnum value, and therefore if not
# set it will not be populated at all to make API usage easier, set
# it to prevent confusion
if not "portnum" in decoded:
decoded["portnum"] = portnums_pb2.PortNum.Name(
portnums_pb2.PortNum.UNKNOWN_APP)
if "portnum" not in decoded:
new_portnum = portnums_pb2.PortNum.Name(portnums_pb2.PortNum.UNKNOWN_APP)
decoded["portnum"] = new_portnum
logging.warning(f"portnum was not in decoded. Setting to:{new_portnum}")
portnum = decoded["portnum"]

View File

@@ -1,16 +1,19 @@
"""Meshtastic unit tests for mesh_interface.py"""
import re
import logging
from unittest.mock import patch, MagicMock
import pytest
from ..mesh_interface import MeshInterface
from ..node import Node
from .. import mesh_pb2
#from ..mesh_pb2 import MeshPacket
from ..__init__ import LOCAL_ADDR
@pytest.mark.unit
def test_MeshInterface(capsys):
def test_MeshInterface(capsys, reset_globals):
"""Test that we can instantiate a MeshInterface"""
iface = MeshInterface(noProto=True)
iface.showInfo()
@@ -28,8 +31,8 @@ def test_MeshInterface(capsys):
@pytest.mark.unit
def test_handlePacketFromRadio_no_from(capsys):
"""Test _handlePacketFromRadio no 'from'"""
def test_handlePacketFromRadio_no_from(capsys, reset_globals):
"""Test _handlePacketFromRadio with no 'from' in the mesh packet."""
iface = MeshInterface(noProto=True)
meshPacket = mesh_pb2.MeshPacket()
iface._handlePacketFromRadio(meshPacket)
@@ -39,13 +42,62 @@ def test_handlePacketFromRadio_no_from(capsys):
@pytest.mark.unit
def test_MeshPacket_set_from(capsys):
"""Test setting 'from' MeshPacket """
def test_handlePacketFromRadio_with_a_portnum(caplog, reset_globals):
"""Test _handlePacketFromRadio with a portnum
Since we have an attribute called 'from', we cannot simply 'set' it.
Had to implement a hack just to be able to test some code.
"""
iface = MeshInterface(noProto=True)
meshPacket = mesh_pb2.MeshPacket()
meshPacket.decoded.payload = b''
meshPacket.decoded.portnum = 1
iface._handlePacketFromRadio(meshPacket, hack=True)
out, err = capsys.readouterr()
assert re.search(r'', out, re.MULTILINE)
assert err == ''
with caplog.at_level(logging.WARNING):
iface._handlePacketFromRadio(meshPacket, hack=True)
assert re.search(r'Not populating fromId', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_handlePacketFromRadio_no_portnum(caplog, reset_globals):
"""Test _handlePacketFromRadio without a portnum"""
iface = MeshInterface(noProto=True)
meshPacket = mesh_pb2.MeshPacket()
meshPacket.decoded.payload = b''
with caplog.at_level(logging.WARNING):
iface._handlePacketFromRadio(meshPacket, hack=True)
assert re.search(r'Not populating fromId', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_getNode_with_local(reset_globals):
"""Test getNode"""
iface = MeshInterface(noProto=True)
anode = iface.getNode(LOCAL_ADDR)
assert anode == iface.localNode
@pytest.mark.unit
def test_getNode_not_local(reset_globals, caplog):
"""Test getNode not local"""
iface = MeshInterface(noProto=True)
anode = MagicMock(autospec=Node)
with caplog.at_level(logging.DEBUG):
with patch('meshtastic.node.Node', return_value=anode):
another_node = iface.getNode('bar2')
assert another_node != iface.localNode
assert re.search(r'About to requestConfig', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_getNode_not_local_timeout(reset_globals, capsys):
"""Test getNode not local, simulate timeout"""
iface = MeshInterface(noProto=True)
anode = MagicMock(autospec=Node)
anode.waitForConfig.return_value = False
with patch('meshtastic.node.Node', return_value=anode):
with pytest.raises(SystemExit) as pytest_wrapped_e:
iface.getNode('bar2')
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.match(r'Error: Timed out waiting for node config', out)
assert err == ''