Merge pull request #167 from mkinney/work_on_unit_tests

Work on unit tests
This commit is contained in:
mkinney
2021-12-17 08:03:10 -08:00
committed by GitHub
6 changed files with 330 additions and 33 deletions

View File

@@ -155,10 +155,11 @@ def _onNodeInfoReceive(iface, asDict):
def _receiveInfoUpdate(iface, asDict):
iface._getOrCreateByNum(asDict["from"])["lastReceived"] = asDict
iface._getOrCreateByNum(asDict["from"])["lastHeard"] = asDict.get("rxTime")
iface._getOrCreateByNum(asDict["from"])["snr"] = asDict.get("rxSnr")
iface._getOrCreateByNum(asDict["from"])["hopLimit"] = asDict.get("hopLimit")
if "from" in asDict:
iface._getOrCreateByNum(asDict["from"])["lastReceived"] = asDict
iface._getOrCreateByNum(asDict["from"])["lastHeard"] = asDict.get("rxTime")
iface._getOrCreateByNum(asDict["from"])["snr"] = asDict.get("rxSnr")
iface._getOrCreateByNum(asDict["from"])["hopLimit"] = asDict.get("hopLimit")
"""Well known message payloads can register decoders for automatic protobuf parsing"""

View File

@@ -29,11 +29,11 @@ def onReceive(packet, interface):
d = packet.get('decoded')
# Exit once we receive a reply
if args.sendtext and packet["to"] == interface.myInfo.my_node_num and d["portnum"] == portnums_pb2.PortNum.TEXT_MESSAGE_APP:
if args and args.sendtext and packet["to"] == interface.myInfo.my_node_num and d["portnum"] == portnums_pb2.PortNum.TEXT_MESSAGE_APP:
interface.close() # after running command then exit
# Reply to every received message with some stats
if args.reply:
if args and args.reply:
msg = d.get('text')
if msg:
#shortName = packet['decoded']['shortName']

View File

@@ -1,4 +1,4 @@
""" Mesh Interface class
"""Mesh Interface class
"""
import sys
import random
@@ -16,9 +16,9 @@ from pubsub import pub
from google.protobuf.json_format import MessageToJson
import meshtastic.node
from . import portnums_pb2, mesh_pb2
from .util import stripnl, Timeout, our_exit
from .node import Node
from .__init__ import LOCAL_ADDR, BROADCAST_NUM, BROADCAST_ADDR, ResponseHandler, publishingThread, OUR_APP_VERSION, protocols
@@ -46,7 +46,7 @@ class MeshInterface:
self.nodes = None # FIXME
self.isConnected = threading.Event()
self.noProto = noProto
self.localNode = Node(self, -1) # We fixup nodenum later
self.localNode = meshtastic.node.Node(self, -1) # We fixup nodenum later
self.myInfo = None # We don't have device info yet
self.responseHandlers = {} # A map from request ID to the handler
self.failure = None # If we've encountered a fatal exception it will be kept here
@@ -151,7 +151,8 @@ class MeshInterface:
if nodeId == LOCAL_ADDR:
return self.localNode
else:
n = Node(self, nodeId)
n = meshtastic.node.Node(self, nodeId)
logging.debug("About to requestConfig")
n.requestConfig()
if not n.waitForConfig():
our_exit("Error: Timed out waiting for node config")
@@ -222,6 +223,7 @@ class MeshInterface:
logging.debug(f"Serializing protobuf as data: {stripnl(data)}")
data = data.SerializeToString()
logging.debug(f"len(data): {len(data)}")
if len(data) > mesh_pb2.Constants.DATA_PAYLOAD_LEN:
Exception("Data payload too big")
@@ -266,6 +268,7 @@ class MeshInterface:
if timeSec == 0:
timeSec = time.time() # returns unix timestamp in seconds
p.time = int(timeSec)
logging.debug(f'p.time:{p.time}')
return self.sendData(p, destinationId,
portNum=portnums_pb2.PortNum.POSITION_APP,
@@ -291,6 +294,7 @@ class MeshInterface:
toRadio = mesh_pb2.ToRadio()
nodeNum = 0
if destinationId is None:
our_exit("Warning: destinationId must not be None")
elif isinstance(destinationId, int):
@@ -303,10 +307,13 @@ class MeshInterface:
elif destinationId.startswith("!"):
nodeNum = int(destinationId[1:], 16)
else:
node = self.nodes.get(destinationId)
if not node:
our_exit(f"Warning: NodeId {destinationId} not found in DB")
nodeNum = node['num']
if self.nodes:
node = self.nodes.get(destinationId)
if not node:
our_exit(f"Warning: NodeId {destinationId} not found in DB")
nodeNum = node['num']
else:
logging.warning("Warning: There were no self.nodes.")
meshPacket.to = nodeNum
meshPacket.want_ack = wantAck
@@ -449,8 +456,9 @@ class MeshInterface:
Called by subclasses."""
fromRadio = mesh_pb2.FromRadio()
fromRadio.ParseFromString(fromRadioBytes)
#logging.debug(f"fromRadioBytes: {fromRadioBytes}")
asDict = google.protobuf.json_format.MessageToDict(fromRadio)
#logging.debug(f"Received from radio: {fromRadio}")
logging.debug(f"Received from radio: {fromRadio}")
if fromRadio.HasField("my_info"):
self.myInfo = fromRadio.my_info
self.localNode.nodeNum = self.myInfo.my_node_num
@@ -543,9 +551,16 @@ class MeshInterface:
self.nodesByNum[nodeNum] = n
return n
def _handlePacketFromRadio(self, meshPacket):
def _handlePacketFromRadio(self, meshPacket, hack=False):
"""Handle a MeshPacket that just arrived from the radio
hack - well, since we used 'from', which is a python keyword,
as an attribute to MeshPacket in protobufs,
there really is no way to do something like this:
meshPacket = mesh_pb2.MeshPacket()
meshPacket.from = 123
If hack is True, we can unit test this code.
Will publish one of the following events:
- meshtastic.receive.text(packet = MeshPacket dictionary)
- meshtastic.receive.position(packet = MeshPacket dictionary)
@@ -561,10 +576,10 @@ class MeshInterface:
asDict["raw"] = meshPacket
# from might be missing if the nodenum was zero.
if "from" not in asDict:
if not hack and "from" not in asDict:
asDict["from"] = 0
logging.error(
f"Device returned a packet we sent, ignoring: {stripnl(asDict)}")
logging.error(f"Device returned a packet we sent, ignoring: {stripnl(asDict)}")
print(f"Error: Device returned a packet we sent, ignoring: {stripnl(asDict)}")
return
if "to" not in asDict:
asDict["to"] = 0
@@ -592,9 +607,10 @@ class MeshInterface:
# UNKNOWN_APP is the default protobuf portnum value, and therefore if not
# set it will not be populated at all to make API usage easier, set
# it to prevent confusion
if not "portnum" in decoded:
decoded["portnum"] = portnums_pb2.PortNum.Name(
portnums_pb2.PortNum.UNKNOWN_APP)
if "portnum" not in decoded:
new_portnum = portnums_pb2.PortNum.Name(portnums_pb2.PortNum.UNKNOWN_APP)
decoded["portnum"] = new_portnum
logging.warning(f"portnum was not in decoded. Setting to:{new_portnum}")
portnum = decoded["portnum"]

View File

@@ -14,7 +14,7 @@ class Node:
Includes methods for radioConfig and channels
"""
def __init__(self, iface, nodeNum):
def __init__(self, iface, nodeNum, noProto=False):
"""Constructor"""
self.iface = iface
self.nodeNum = nodeNum
@@ -22,6 +22,7 @@ class Node:
self.channels = None
self._timeout = Timeout(maxSecs=60)
self.partialChannels = None
self.noProto = noProto
def showChannels(self):
"""Show human readable description of our channels."""
@@ -162,6 +163,11 @@ class Node:
if team is not None:
p.set_owner.team = team
# Note: These debug lines are used in unit tests
logging.debug(f'p.set_owner.long_name:{p.set_owner.long_name}:')
logging.debug(f'p.set_owner.short_name:{p.set_owner.short_name}:')
logging.debug(f'p.set_owner.is_licensed:{p.set_owner.is_licensed}')
logging.debug(f'p.set_owner.team:{p.set_owner.team}')
return self._sendAdmin(p)
def getURL(self, includeAll: bool = True):
@@ -197,6 +203,7 @@ class Node:
channelSet = apponly_pb2.ChannelSet()
channelSet.ParseFromString(decodedURL)
if len(channelSet.settings) == 0:
our_exit("Warning: There were no settings.")
@@ -207,6 +214,7 @@ class Node:
ch.index = i
ch.settings.CopyFrom(chs)
self.channels[ch.index] = ch
logging.debug(f'Channel i:{i} ch:{ch}')
self.writeChannel(ch.index)
i = i + 1
@@ -240,6 +248,7 @@ class Node:
is ignored for other nodes)"""
p = admin_pb2.AdminMessage()
p.exit_simulator = True
logging.debug('in exitSimulator()')
return self._sendAdmin(p)
@@ -307,22 +316,26 @@ class Node:
self._fixupChannels()
# FIXME, the following should only be called after we have settings and channels
self.iface._connected() # Tell everone else we are ready to go
self.iface._connected() # Tell everyone else we are ready to go
else:
self._requestChannel(index + 1)
return self._sendAdmin(p, wantResponse=True, onResponse=onResponse)
# pylint: disable=R1710
def _sendAdmin(self, p: admin_pb2.AdminMessage, wantResponse=False,
onResponse=None, adminIndex=0):
"""Send an admin message to the specified node (or the local node if destNodeNum is zero)"""
if adminIndex == 0: # unless a special channel index was used, we want to use the admin index
adminIndex = self.iface.localNode._getAdminChannelIndex()
if self.noProto:
logging.warning(f"Not sending packet because protocol use is disabled by noProto")
else:
if adminIndex == 0: # unless a special channel index was used, we want to use the admin index
adminIndex = self.iface.localNode._getAdminChannelIndex()
return self.iface.sendData(p, self.nodeNum,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=True,
wantResponse=wantResponse,
onResponse=onResponse,
channelIndex=adminIndex)
return self.iface.sendData(p, self.nodeNum,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=True,
wantResponse=wantResponse,
onResponse=onResponse,
channelIndex=adminIndex)

View File

@@ -1,19 +1,25 @@
"""Meshtastic unit tests for mesh_interface.py"""
import re
import logging
from unittest.mock import patch, MagicMock
import pytest
from ..mesh_interface import MeshInterface
from ..node import Node
from .. import mesh_pb2
from ..__init__ import LOCAL_ADDR
@pytest.mark.unit
def test_MeshInterface(capsys):
def test_MeshInterface(capsys, reset_globals):
"""Test that we can instantiate a MeshInterface"""
iface = MeshInterface(noProto=True)
iface.showInfo()
iface.localNode.showInfo()
iface.showNodes()
iface.sendText('hello')
iface.close()
out, err = capsys.readouterr()
assert re.search(r'Owner: None \(None\)', out, re.MULTILINE)
@@ -22,3 +28,157 @@ def test_MeshInterface(capsys):
assert re.search(r'Channels', out, re.MULTILINE)
assert re.search(r'Primary channel URL', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
def test_handlePacketFromRadio_no_from(capsys, reset_globals):
"""Test _handlePacketFromRadio with no 'from' in the mesh packet."""
iface = MeshInterface(noProto=True)
meshPacket = mesh_pb2.MeshPacket()
iface._handlePacketFromRadio(meshPacket)
out, err = capsys.readouterr()
assert re.search(r'Device returned a packet we sent, ignoring', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
def test_handlePacketFromRadio_with_a_portnum(caplog, reset_globals):
"""Test _handlePacketFromRadio with a portnum
Since we have an attribute called 'from', we cannot simply 'set' it.
Had to implement a hack just to be able to test some code.
"""
iface = MeshInterface(noProto=True)
meshPacket = mesh_pb2.MeshPacket()
meshPacket.decoded.payload = b''
meshPacket.decoded.portnum = 1
with caplog.at_level(logging.WARNING):
iface._handlePacketFromRadio(meshPacket, hack=True)
assert re.search(r'Not populating fromId', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_handlePacketFromRadio_no_portnum(caplog, reset_globals):
"""Test _handlePacketFromRadio without a portnum"""
iface = MeshInterface(noProto=True)
meshPacket = mesh_pb2.MeshPacket()
meshPacket.decoded.payload = b''
with caplog.at_level(logging.WARNING):
iface._handlePacketFromRadio(meshPacket, hack=True)
assert re.search(r'Not populating fromId', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_getNode_with_local(reset_globals):
"""Test getNode"""
iface = MeshInterface(noProto=True)
anode = iface.getNode(LOCAL_ADDR)
assert anode == iface.localNode
@pytest.mark.unit
def test_getNode_not_local(reset_globals, caplog):
"""Test getNode not local"""
iface = MeshInterface(noProto=True)
anode = MagicMock(autospec=Node)
with caplog.at_level(logging.DEBUG):
with patch('meshtastic.node.Node', return_value=anode):
another_node = iface.getNode('bar2')
assert another_node != iface.localNode
assert re.search(r'About to requestConfig', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_getNode_not_local_timeout(reset_globals, capsys):
"""Test getNode not local, simulate timeout"""
iface = MeshInterface(noProto=True)
anode = MagicMock(autospec=Node)
anode.waitForConfig.return_value = False
with patch('meshtastic.node.Node', return_value=anode):
with pytest.raises(SystemExit) as pytest_wrapped_e:
iface.getNode('bar2')
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.match(r'Error: Timed out waiting for node config', out)
assert err == ''
@pytest.mark.unit
def test_sendPosition(reset_globals, caplog):
"""Test sendPosition"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
iface.sendPosition()
iface.close()
assert re.search(r'p.time:', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_handleFromRadio_empty_payload(reset_globals, caplog):
"""Test _handleFromRadio"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
iface._handleFromRadio(b'')
iface.close()
assert re.search(r'Unexpected FromRadio payload', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_handleFromRadio_with_my_info(reset_globals, caplog):
"""Test _handleFromRadio with my_info"""
# Note: I captured the '--debug --info' for the bytes below.
# It "translates" to this:
# my_info {
# my_node_num: 682584012
# num_bands: 13
# firmware_version: "1.2.49.5354c49"
# reboot_count: 13
# bitrate: 17.088470458984375
# message_timeout_msec: 300000
# min_app_version: 20200
# max_channels: 8
# }
from_radio_bytes = b'\x1a,\x08\xcc\xcf\xbd\xc5\x02\x18\r2\x0e1.2.49.5354c49P\r]0\xb5\x88Ah\xe0\xa7\x12p\xe8\x9d\x01x\x08\x90\x01\x01'
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
iface._handleFromRadio(from_radio_bytes)
iface.close()
assert re.search(r'Received myinfo', caplog.text, re.MULTILINE)
assert re.search(r'num_bands: 13', caplog.text, re.MULTILINE)
assert re.search(r'max_channels: 8', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_handleFromRadio_with_node_info(reset_globals, caplog, capsys):
"""Test _handleFromRadio with node_info"""
# Note: I captured the '--debug --info' for the bytes below.
# It "translates" to this:
# node_info {
# num: 682584012
# user {
# id: "!28af67cc"
# long_name: "Unknown 67cc"
# short_name: "?CC"
# macaddr: "$o(\257g\314"
# hw_model: HELTEC_V2_1
# }
# position {
# }
# }
from_radio_bytes = b'"2\x08\xcc\xcf\xbd\xc5\x02\x12(\n\t!28af67cc\x12\x0cUnknown 67cc\x1a\x03?CC"\x06$o(\xafg\xcc0\n\x1a\x00'
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
iface._startConfig()
iface._handleFromRadio(from_radio_bytes)
assert re.search(r'Received nodeinfo', caplog.text, re.MULTILINE)
assert re.search(r'682584012', caplog.text, re.MULTILINE)
assert re.search(r'HELTEC_V2_1', caplog.text, re.MULTILINE)
# validate some of showNodes() output
iface.showNodes()
out, err = capsys.readouterr()
assert re.search(r' 1 ', out, re.MULTILINE)
assert re.search(r'│ Unknown 67cc │ ', out, re.MULTILINE)
assert re.search(r'│ !28af67cc │ N/A │ N/A │ N/A', out, re.MULTILINE)
assert err == ''
iface.close()

View File

@@ -1,6 +1,7 @@
"""Meshtastic unit tests for node.py"""
import re
import logging
from unittest.mock import patch, MagicMock
import pytest
@@ -32,3 +33,109 @@ def test_node_reqquestConfig():
with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
anode = Node(mo, 'bar')
anode.requestConfig()
@pytest.mark.unit
def test_setOwner_and_team(caplog):
"""Test setOwner"""
anode = Node('foo', 'bar', noProto=True)
with caplog.at_level(logging.DEBUG):
anode.setOwner(long_name ='Test123', short_name='123', team=1)
assert re.search(r'p.set_owner.long_name:Test123:', caplog.text, re.MULTILINE)
assert re.search(r'p.set_owner.short_name:123:', caplog.text, re.MULTILINE)
assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)
assert re.search(r'p.set_owner.team:1', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_setOwner_no_short_name(caplog):
"""Test setOwner"""
anode = Node('foo', 'bar', noProto=True)
with caplog.at_level(logging.DEBUG):
anode.setOwner(long_name ='Test123')
assert re.search(r'p.set_owner.long_name:Test123:', caplog.text, re.MULTILINE)
assert re.search(r'p.set_owner.short_name:Tst:', caplog.text, re.MULTILINE)
assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)
assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_setOwner_no_short_name_and_long_name_is_short(caplog):
"""Test setOwner"""
anode = Node('foo', 'bar', noProto=True)
with caplog.at_level(logging.DEBUG):
anode.setOwner(long_name ='Tnt')
assert re.search(r'p.set_owner.long_name:Tnt:', caplog.text, re.MULTILINE)
assert re.search(r'p.set_owner.short_name:Tnt:', caplog.text, re.MULTILINE)
assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)
assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_setOwner_no_short_name_and_long_name_has_words(caplog):
"""Test setOwner"""
anode = Node('foo', 'bar', noProto=True)
with caplog.at_level(logging.DEBUG):
anode.setOwner(long_name ='A B C', is_licensed=True)
assert re.search(r'p.set_owner.long_name:A B C:', caplog.text, re.MULTILINE)
assert re.search(r'p.set_owner.short_name:ABC:', caplog.text, re.MULTILINE)
assert re.search(r'p.set_owner.is_licensed:True', caplog.text, re.MULTILINE)
assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_exitSimulator(caplog):
"""Test exitSimulator"""
anode = Node('foo', 'bar', noProto=True)
with caplog.at_level(logging.DEBUG):
anode.exitSimulator()
assert re.search(r'in exitSimulator', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_reboot(caplog):
"""Test reboot"""
anode = Node('foo', 'bar', noProto=True)
with caplog.at_level(logging.DEBUG):
anode.reboot()
assert re.search(r'Telling node to reboot', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_setURL_empty_url():
"""Test reboot"""
anode = Node('foo', 'bar', noProto=True)
with pytest.raises(SystemExit) as pytest_wrapped_e:
anode.setURL('')
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
@pytest.mark.unit
def test_setURL_valid_URL(caplog):
"""Test setURL"""
iface = MagicMock(autospec=SerialInterface)
url = "https://www.meshtastic.org/d/#CgUYAyIBAQ"
with caplog.at_level(logging.DEBUG):
anode = Node(iface, 'bar', noProto=True)
anode.radioConfig = 'baz'
channels = ['zoo']
anode.channels = channels
anode.setURL(url)
assert re.search(r'Channel i:0', caplog.text, re.MULTILINE)
assert re.search(r'modem_config: Bw125Cr48Sf4096', caplog.text, re.MULTILINE)
assert re.search(r'psk: "\\001"', caplog.text, re.MULTILINE)
assert re.search(r'role: PRIMARY', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_setURL_valid_URL_but_no_settings(caplog):
"""Test setURL"""
iface = MagicMock(autospec=SerialInterface)
url = "https://www.meshtastic.org/d/#"
with pytest.raises(SystemExit) as pytest_wrapped_e:
anode = Node(iface, 'bar', noProto=True)
anode.radioConfig = 'baz'
anode.setURL(url)
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1