wip on adding unit tests to mesh_interface

This commit is contained in:
Mike Kinney
2021-12-16 11:51:25 -08:00
parent 876a0a13dd
commit 6b0521003c
3 changed files with 45 additions and 9 deletions

View File

@@ -155,10 +155,11 @@ def _onNodeInfoReceive(iface, asDict):
def _receiveInfoUpdate(iface, asDict):
iface._getOrCreateByNum(asDict["from"])["lastReceived"] = asDict
iface._getOrCreateByNum(asDict["from"])["lastHeard"] = asDict.get("rxTime")
iface._getOrCreateByNum(asDict["from"])["snr"] = asDict.get("rxSnr")
iface._getOrCreateByNum(asDict["from"])["hopLimit"] = asDict.get("hopLimit")
if "from" in asDict:
iface._getOrCreateByNum(asDict["from"])["lastReceived"] = asDict
iface._getOrCreateByNum(asDict["from"])["lastHeard"] = asDict.get("rxTime")
iface._getOrCreateByNum(asDict["from"])["snr"] = asDict.get("rxSnr")
iface._getOrCreateByNum(asDict["from"])["hopLimit"] = asDict.get("hopLimit")
"""Well known message payloads can register decoders for automatic protobuf parsing"""

View File

@@ -1,4 +1,4 @@
""" Mesh Interface class
"""Mesh Interface class
"""
import sys
import random
@@ -222,6 +222,7 @@ class MeshInterface:
logging.debug(f"Serializing protobuf as data: {stripnl(data)}")
data = data.SerializeToString()
logging.debug(f"len(data): {len(data)}")
if len(data) > mesh_pb2.Constants.DATA_PAYLOAD_LEN:
Exception("Data payload too big")
@@ -543,9 +544,16 @@ class MeshInterface:
self.nodesByNum[nodeNum] = n
return n
def _handlePacketFromRadio(self, meshPacket):
def _handlePacketFromRadio(self, meshPacket, hack=False):
"""Handle a MeshPacket that just arrived from the radio
hack - well, since we used 'from', which is a python keyword,
as an attribute to MeshPacket in protobufs,
there really is no way to do something like this:
meshPacket = mesh_pb2.MeshPacket()
meshPacket.from = 123
If hack is True, we can unit test this code.
Will publish one of the following events:
- meshtastic.receive.text(packet = MeshPacket dictionary)
- meshtastic.receive.position(packet = MeshPacket dictionary)
@@ -561,10 +569,10 @@ class MeshInterface:
asDict["raw"] = meshPacket
# from might be missing if the nodenum was zero.
if "from" not in asDict:
if not hack and "from" not in asDict:
asDict["from"] = 0
logging.error(
f"Device returned a packet we sent, ignoring: {stripnl(asDict)}")
logging.error(f"Device returned a packet we sent, ignoring: {stripnl(asDict)}")
print(f"Error: Device returned a packet we sent, ignoring: {stripnl(asDict)}")
return
if "to" not in asDict:
asDict["to"] = 0

View File

@@ -5,6 +5,8 @@ import re
import pytest
from ..mesh_interface import MeshInterface
from .. import mesh_pb2
#from ..mesh_pb2 import MeshPacket
@pytest.mark.unit
@@ -14,6 +16,7 @@ def test_MeshInterface(capsys):
iface.showInfo()
iface.localNode.showInfo()
iface.showNodes()
iface.sendText('hello')
iface.close()
out, err = capsys.readouterr()
assert re.search(r'Owner: None \(None\)', out, re.MULTILINE)
@@ -22,3 +25,27 @@ def test_MeshInterface(capsys):
assert re.search(r'Channels', out, re.MULTILINE)
assert re.search(r'Primary channel URL', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
def test_handlePacketFromRadio_no_from(capsys):
"""Test _handlePacketFromRadio no 'from'"""
iface = MeshInterface(noProto=True)
meshPacket = mesh_pb2.MeshPacket()
iface._handlePacketFromRadio(meshPacket)
out, err = capsys.readouterr()
assert re.search(r'Device returned a packet we sent, ignoring', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
def test_MeshPacket_set_from(capsys):
"""Test setting 'from' MeshPacket """
iface = MeshInterface(noProto=True)
meshPacket = mesh_pb2.MeshPacket()
meshPacket.decoded.payload = b''
meshPacket.decoded.portnum = 1
iface._handlePacketFromRadio(meshPacket, hack=True)
out, err = capsys.readouterr()
assert re.search(r'', out, re.MULTILINE)
assert err == ''