mirror of
https://github.com/meshtastic/python.git
synced 2026-01-16 03:37:57 -05:00
Merge pull request #179 from mkinney/more_boring_unit_testing
add more unit tests for onReceive in main
This commit is contained in:
@@ -1,2 +1,2 @@
|
||||
[run]
|
||||
omit = meshtastic/*_pb2.py,meshtastic/tests/*.py
|
||||
omit = meshtastic/*_pb2.py,meshtastic/tests/*.py,meshtastic/test.py
|
||||
|
||||
4
Makefile
4
Makefile
@@ -1,6 +1,6 @@
|
||||
# unit test
|
||||
# only run the fast unit tests
|
||||
test:
|
||||
pytest
|
||||
pytest -m unit
|
||||
|
||||
# local install
|
||||
install:
|
||||
|
||||
@@ -27,7 +27,7 @@ def onReceive(packet, interface):
|
||||
args = our_globals.get_args()
|
||||
try:
|
||||
d = packet.get('decoded')
|
||||
logging.debug(f'd:{d}')
|
||||
logging.debug(f'in onReceive() d:{d}')
|
||||
|
||||
# Exit once we receive a reply
|
||||
if args and args.sendtext and packet["to"] == interface.myInfo.my_node_num and d["portnum"] == portnums_pb2.PortNum.TEXT_MESSAGE_APP:
|
||||
@@ -37,12 +37,10 @@ def onReceive(packet, interface):
|
||||
if args and args.reply:
|
||||
msg = d.get('text')
|
||||
if msg:
|
||||
#shortName = packet['decoded']['shortName']
|
||||
rxSnr = packet['rxSnr']
|
||||
hopLimit = packet['hopLimit']
|
||||
print(f"message: {msg}")
|
||||
reply = "got msg \'{}\' with rxSnr: {} and hopLimit: {}".format(
|
||||
msg, rxSnr, hopLimit)
|
||||
reply = "got msg \'{}\' with rxSnr: {} and hopLimit: {}".format(msg, rxSnr, hopLimit)
|
||||
print("Sending reply: ", reply)
|
||||
interface.sendText(reply)
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@ class StreamInterface(MeshInterface):
|
||||
"""Constructor, opens a connection to self.stream
|
||||
|
||||
Keyword Arguments:
|
||||
devPath {string} -- A filepath to a device, i.e. /dev/ttyUSB0 (default: {None})
|
||||
debugOut {stream} -- If a stream is provided, any debug serial output from the
|
||||
device will be emitted to that stream. (default: {None})
|
||||
|
||||
@@ -33,15 +32,14 @@ class StreamInterface(MeshInterface):
|
||||
Exception: [description]
|
||||
"""
|
||||
|
||||
if not hasattr(self, 'stream'):
|
||||
if not hasattr(self, 'stream') and not noProto:
|
||||
raise Exception(
|
||||
"StreamInterface is now abstract (to update existing code create SerialInterface instead)")
|
||||
self._rxBuf = bytes() # empty
|
||||
self._wantExit = False
|
||||
|
||||
# FIXME, figure out why daemon=True causes reader thread to exit too early
|
||||
self._rxThread = threading.Thread(
|
||||
target=self.__reader, args=(), daemon=True)
|
||||
self._rxThread = threading.Thread(target=self.__reader, args=(), daemon=True)
|
||||
|
||||
MeshInterface.__init__(self, debugOut=debugOut, noProto=noProto)
|
||||
|
||||
@@ -93,7 +91,10 @@ class StreamInterface(MeshInterface):
|
||||
|
||||
def _readBytes(self, length):
|
||||
"""Read an array of bytes from our stream"""
|
||||
return self.stream.read(length)
|
||||
if self.stream:
|
||||
return self.stream.read(length)
|
||||
else:
|
||||
return None
|
||||
|
||||
def _sendToRadioImpl(self, toRadio):
|
||||
"""Send a ToRadio protobuf to the device"""
|
||||
@@ -102,6 +103,7 @@ class StreamInterface(MeshInterface):
|
||||
bufLen = len(b)
|
||||
# We convert into a string, because the TCP code doesn't work with byte arrays
|
||||
header = bytes([START1, START2, (bufLen >> 8) & 0xff, bufLen & 0xff])
|
||||
logging.debug(f'sending header:{header} b:{b}')
|
||||
self._writeBytes(header + b)
|
||||
|
||||
def close(self):
|
||||
@@ -116,16 +118,18 @@ class StreamInterface(MeshInterface):
|
||||
|
||||
def __reader(self):
|
||||
"""The reader thread that reads bytes from our stream"""
|
||||
logging.debug('in __reader()')
|
||||
empty = bytes()
|
||||
|
||||
try:
|
||||
while not self._wantExit:
|
||||
# logging.debug("reading character")
|
||||
logging.debug("reading character")
|
||||
b = self._readBytes(1)
|
||||
# logging.debug("In reader loop")
|
||||
# logging.debug(f"read returned {b}")
|
||||
logging.debug("In reader loop")
|
||||
#logging.debug(f"read returned {b}")
|
||||
if len(b) > 0:
|
||||
c = b[0]
|
||||
#logging.debug(f'c:{c}')
|
||||
ptr = len(self._rxBuf)
|
||||
|
||||
# Assume we want to append this byte, fixme use bytearray instead
|
||||
@@ -144,12 +148,13 @@ class StreamInterface(MeshInterface):
|
||||
if c != START2:
|
||||
self._rxBuf = empty # failed to find start2
|
||||
elif ptr >= HEADER_LEN - 1: # we've at least got a header
|
||||
# big endian length follos header
|
||||
#logging.debug('at least we received a header')
|
||||
# big endian length follows header
|
||||
packetlen = (self._rxBuf[2] << 8) + self._rxBuf[3]
|
||||
|
||||
if ptr == HEADER_LEN - 1: # we _just_ finished reading the header, validate length
|
||||
if packetlen > MAX_TO_FROM_RADIO_SIZE:
|
||||
self._rxBuf = empty # length ws out out bounds, restart
|
||||
self._rxBuf = empty # length was out out bounds, restart
|
||||
|
||||
if len(self._rxBuf) != 0 and ptr + 1 >= packetlen + HEADER_LEN:
|
||||
try:
|
||||
|
||||
@@ -17,18 +17,29 @@ class TCPInterface(StreamInterface):
|
||||
hostname {string} -- Hostname/IP address of the device to connect to
|
||||
"""
|
||||
|
||||
logging.debug(f"Connecting to {hostname}")
|
||||
|
||||
server_address = (hostname, portNumber)
|
||||
sock = socket.create_connection(server_address)
|
||||
|
||||
# Instead of wrapping as a stream, we use the native socket API
|
||||
# self.stream = sock.makefile('rw')
|
||||
self.stream = None
|
||||
self.socket = sock
|
||||
|
||||
StreamInterface.__init__(
|
||||
self, debugOut=debugOut, noProto=noProto, connectNow=connectNow)
|
||||
self.hostname = hostname
|
||||
self.portNumber = portNumber
|
||||
|
||||
if connectNow:
|
||||
logging.debug(f"Connecting to {hostname}")
|
||||
server_address = (hostname, portNumber)
|
||||
sock = socket.create_connection(server_address)
|
||||
self.socket = sock
|
||||
else:
|
||||
self.socket = None
|
||||
|
||||
StreamInterface.__init__(self, debugOut=debugOut, noProto=noProto,
|
||||
connectNow=connectNow)
|
||||
|
||||
def myConnect(self):
|
||||
"""Connect to socket"""
|
||||
server_address = (self.hostname, self.portNumber)
|
||||
sock = socket.create_connection(server_address)
|
||||
self.socket = sock
|
||||
|
||||
def close(self):
|
||||
"""Close a connection to the device"""
|
||||
|
||||
@@ -8,6 +8,7 @@ import pytest
|
||||
from meshtastic.__main__ import Globals
|
||||
from ..mesh_interface import MeshInterface
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def reset_globals():
|
||||
"""Fixture to reset globals."""
|
||||
|
||||
@@ -9,7 +9,8 @@ import logging
|
||||
from unittest.mock import patch, MagicMock
|
||||
import pytest
|
||||
|
||||
from meshtastic.__main__ import initParser, main, Globals, onReceive, onConnection, export_config
|
||||
from meshtastic.__main__ import initParser, main, Globals, onReceive, onConnection, export_config, getPref, setPref
|
||||
#from ..radioconfig_pb2 import UserPreferences
|
||||
import meshtastic.radioconfig_pb2
|
||||
from ..serial_interface import SerialInterface
|
||||
from ..tcp_interface import TCPInterface
|
||||
@@ -1234,14 +1235,113 @@ def test_main_setchan(capsys, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_main_onReceive_empty(reset_globals):
|
||||
def test_main_onReceive_empty(caplog, reset_globals):
|
||||
"""Test onReceive"""
|
||||
sys.argv = ['']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
packet = {'decoded': 'foo'}
|
||||
onReceive(packet, iface)
|
||||
# TODO: how do we know we actually called it?
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
onReceive(packet, iface)
|
||||
assert re.search(r'in onReceive', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
# TODO: use this captured position app message (might want/need in the future)
|
||||
# packet = {
|
||||
# 'to': 4294967295,
|
||||
# 'decoded': {
|
||||
# 'portnum': 'POSITION_APP',
|
||||
# 'payload': "M69\306a"
|
||||
# },
|
||||
# 'id': 334776976,
|
||||
# 'hop_limit': 3
|
||||
# }
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_main_onReceive_with_sendtext(caplog, reset_globals):
|
||||
"""Test onReceive with sendtext
|
||||
The entire point of this test is to make sure the interface.close() call
|
||||
is made in onReceive().
|
||||
"""
|
||||
sys.argv = ['', '--sendtext', 'hello']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
# Note: 'TEXT_MESSAGE_APP' value is 1
|
||||
packet = {
|
||||
'to': 4294967295,
|
||||
'decoded': {
|
||||
'portnum': 1,
|
||||
'payload': "hello"
|
||||
},
|
||||
'id': 334776977,
|
||||
'hop_limit': 3,
|
||||
'want_ack': True
|
||||
}
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
iface.myInfo.my_node_num = 4294967295
|
||||
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
main()
|
||||
onReceive(packet, iface)
|
||||
assert re.search(r'in onReceive', caplog.text, re.MULTILINE)
|
||||
mo.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_main_onReceive_with_reply(caplog, capsys, reset_globals):
|
||||
"""Test onReceive with a reply
|
||||
To capture: on one device run '--sendtext aaa --reply' and on another
|
||||
device run '--sendtext bbb --reply', then back to the first device and
|
||||
run '--sendtext aaa2 --reply'. You should now see a "Sending reply" message.
|
||||
"""
|
||||
sys.argv = ['', '--sendtext', 'hello', '--reply']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
# Note: 'TEXT_MESSAGE_APP' value is 1
|
||||
|
||||
send_packet = {
|
||||
'to': 4294967295,
|
||||
'decoded': {
|
||||
'portnum': 1,
|
||||
'payload': "hello"
|
||||
},
|
||||
'id': 334776977,
|
||||
'hop_limit': 3,
|
||||
'want_ack': True
|
||||
}
|
||||
|
||||
reply_packet = {
|
||||
'from': 682968668,
|
||||
'to': 4294967295,
|
||||
'decoded': {
|
||||
'portnum': 'TEXT_MESSAGE_APP',
|
||||
'payload': b'bbb',
|
||||
'text': 'bbb'
|
||||
},
|
||||
'id': 1709936182,
|
||||
'rxTime': 1640381999,
|
||||
'rxSnr': 6.0,
|
||||
'hopLimit': 3,
|
||||
'raw': 'faked',
|
||||
'fromId': '!28b5465c',
|
||||
'toId': '^all'
|
||||
}
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
iface.myInfo.my_node_num = 4294967295
|
||||
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
main()
|
||||
onReceive(send_packet, iface)
|
||||
onReceive(reply_packet, iface)
|
||||
assert re.search(r'in onReceive', caplog.text, re.MULTILINE)
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'got msg ', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
mo.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -1371,6 +1471,7 @@ def test_main_gpio_rd(caplog, capsys, reset_globals):
|
||||
channel.settings.psk = b'\x01'
|
||||
|
||||
packet = {
|
||||
|
||||
'from': 682968668,
|
||||
'to': 682968612,
|
||||
'channel': 1,
|
||||
@@ -1404,3 +1505,102 @@ def test_main_gpio_rd(caplog, capsys, reset_globals):
|
||||
assert re.search(r'Reading GPIO mask 0x1000 ', out, re.MULTILINE)
|
||||
assert re.search(r'Received RemoteHardware typ=READ_GPIOS_REPLY, gpio_value=4096', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_main_getPref_valid_field(capsys, reset_globals):
|
||||
"""Test getPref() with a valid field"""
|
||||
prefs = MagicMock()
|
||||
prefs.DESCRIPTOR.fields_by_name.get.return_value = 'ls_secs'
|
||||
prefs.wifi_ssid = 'foo'
|
||||
prefs.ls_secs = 300
|
||||
prefs.fixed_position = False
|
||||
|
||||
getPref(prefs, 'ls_secs')
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'ls_secs: 300', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_main_getPref_invalid_field(capsys, reset_globals):
|
||||
"""Test getPref() with an invalid field"""
|
||||
|
||||
class Field:
|
||||
"""Simple class for testing."""
|
||||
|
||||
def __init__(self, name):
|
||||
"""constructor"""
|
||||
self.name = name
|
||||
|
||||
prefs = MagicMock()
|
||||
prefs.DESCRIPTOR.fields_by_name.get.return_value = None
|
||||
|
||||
# Note: This is a subset of the real fields
|
||||
ls_secs_field = Field('ls_secs')
|
||||
is_router = Field('is_router')
|
||||
fixed_position = Field('fixed_position')
|
||||
|
||||
fields = [ ls_secs_field, is_router, fixed_position ]
|
||||
prefs.DESCRIPTOR.fields = fields
|
||||
|
||||
getPref(prefs, 'foo')
|
||||
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'does not have an attribute called foo', out, re.MULTILINE)
|
||||
# ensure they are sorted
|
||||
assert re.search(r'fixed_position\s+is_router\s+ls_secs', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_main_setPref_valid_field(capsys, reset_globals):
|
||||
"""Test setPref() with a valid field"""
|
||||
|
||||
class Field:
|
||||
"""Simple class for testing."""
|
||||
|
||||
def __init__(self, name, enum_type):
|
||||
"""constructor"""
|
||||
self.name = name
|
||||
self.enum_type = enum_type
|
||||
|
||||
ls_secs_field = Field('ls_secs', 'int')
|
||||
prefs = MagicMock()
|
||||
prefs.DESCRIPTOR.fields_by_name.get.return_value = ls_secs_field
|
||||
|
||||
setPref(prefs, 'ls_secs', '300')
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Set ls_secs to 300', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_main_setPref_invalid_field(capsys, reset_globals):
|
||||
"""Test setPref() with a invalid field"""
|
||||
|
||||
|
||||
class Field:
|
||||
"""Simple class for testing."""
|
||||
|
||||
def __init__(self, name):
|
||||
"""constructor"""
|
||||
self.name = name
|
||||
|
||||
prefs = MagicMock()
|
||||
prefs.DESCRIPTOR.fields_by_name.get.return_value = None
|
||||
|
||||
# Note: This is a subset of the real fields
|
||||
ls_secs_field = Field('ls_secs')
|
||||
is_router = Field('is_router')
|
||||
fixed_position = Field('fixed_position')
|
||||
|
||||
fields = [ ls_secs_field, is_router, fixed_position ]
|
||||
prefs.DESCRIPTOR.fields = fields
|
||||
|
||||
setPref(prefs, 'foo', '300')
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'does not have an attribute called foo', out, re.MULTILINE)
|
||||
# ensure they are sorted
|
||||
assert re.search(r'fixed_position\s+is_router\s+ls_secs', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
"""Meshtastic unit tests for stream_interface.py"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
import pytest
|
||||
|
||||
from ..stream_interface import StreamInterface
|
||||
@@ -8,7 +11,71 @@ from ..stream_interface import StreamInterface
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_StreamInterface():
|
||||
"""Test that we cannot instantiate a StreamInterface"""
|
||||
"""Test that we cannot instantiate a StreamInterface based on noProto"""
|
||||
with pytest.raises(Exception) as pytest_wrapped_e:
|
||||
StreamInterface(noProto=True)
|
||||
StreamInterface()
|
||||
assert pytest_wrapped_e.type == Exception
|
||||
|
||||
|
||||
# Note: This takes a bit, so moving from unit to slow
|
||||
@pytest.mark.unitslow
|
||||
def test_StreamInterface_with_noProto(caplog, reset_globals):
|
||||
"""Test that we can instantiate a StreamInterface based on nonProto
|
||||
and we can read/write bytes from a mocked stream
|
||||
"""
|
||||
stream = MagicMock()
|
||||
test_data = b'hello'
|
||||
stream.read.return_value = test_data
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
iface = StreamInterface(noProto=True, connectNow=False)
|
||||
iface.stream = stream
|
||||
iface._writeBytes(test_data)
|
||||
data = iface._readBytes(len(test_data))
|
||||
assert data == test_data
|
||||
|
||||
|
||||
# Note: This takes a bit, so moving from unit to slow
|
||||
# Tip: If you want to see the print output, run with '-s' flag:
|
||||
# pytest -s meshtastic/tests/test_stream_interface.py::test_sendToRadioImpl
|
||||
@pytest.mark.unitslow
|
||||
def test_sendToRadioImpl(caplog, reset_globals):
|
||||
"""Test _sendToRadioImpl()"""
|
||||
|
||||
# def add_header(b):
|
||||
# """Add header stuffs for radio"""
|
||||
# bufLen = len(b)
|
||||
# header = bytes([START1, START2, (bufLen >> 8) & 0xff, bufLen & 0xff])
|
||||
# return header + b
|
||||
|
||||
# captured raw bytes of a Heltec2.1 radio with 2 channels (primary and a secondary channel named "gpio")
|
||||
raw_1_my_info = b'\x1a,\x08\xdc\x8c\xd5\xc5\x02\x18\r2\x0e1.2.49.5354c49P\x15]\xe1%\x17Eh\xe0\xa7\x12p\xe8\x9d\x01x\x08\x90\x01\x01'
|
||||
raw_2_node_info = b'"9\x08\xdc\x8c\xd5\xc5\x02\x12(\n\t!28b5465c\x12\x0cUnknown 465c\x1a\x03?5C"\x06$o(\xb5F\\0\n\x1a\x02 1%M<\xc6a'
|
||||
# pylint: disable=C0301
|
||||
raw_3_node_info = b'"C\x08\xa4\x8c\xd5\xc5\x02\x12(\n\t!28b54624\x12\x0cUnknown 4624\x1a\x03?24"\x06$o(\xb5F$0\n\x1a\x07 5MH<\xc6a%G<\xc6a=\x00\x00\xc0@'
|
||||
raw_4_complete = b'@\xcf\xe5\xd1\x8c\x0e'
|
||||
# pylint: disable=C0301
|
||||
raw_5_prefs = b'Z6\r\\F\xb5(\x15\\F\xb5("\x1c\x08\x06\x12\x13*\x11\n\x0f0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#\xb8\t\x015]$\xddk5\xd5\x7f!b=M<\xc6aP\x03`F'
|
||||
# pylint: disable=C0301
|
||||
raw_6_channel0 = b'Z.\r\\F\xb5(\x15\\F\xb5("\x14\x08\x06\x12\x0b:\t\x12\x05\x18\x01"\x01\x01\x18\x015^$\xddk5\xd6\x7f!b=M<\xc6aP\x03`F'
|
||||
# pylint: disable=C0301
|
||||
raw_7_channel1 = b'ZS\r\\F\xb5(\x15\\F\xb5("9\x08\x06\x120:.\x08\x01\x12(" \xb4&\xb3\xc7\x06\xd8\xe39%\xba\xa5\xee\x8eH\x06\xf6\xf4H\xe8\xd5\xc1[ao\xb5Y\\\xb4"\xafmi*\x04gpio\x18\x025_$\xddk5\xd7\x7f!b=M<\xc6aP\x03`F'
|
||||
raw_8_channel2 = b'Z)\r\\F\xb5(\x15\\F\xb5("\x0f\x08\x06\x12\x06:\x04\x08\x02\x12\x005`$\xddk5\xd8\x7f!b=M<\xc6aP\x03`F'
|
||||
raw_blank = b''
|
||||
|
||||
test_data = b'hello'
|
||||
stream = MagicMock()
|
||||
#stream.read.return_value = add_header(test_data)
|
||||
stream.read.side_effect = [ raw_1_my_info, raw_2_node_info, raw_3_node_info, raw_4_complete,
|
||||
raw_5_prefs, raw_6_channel0, raw_7_channel1, raw_8_channel2,
|
||||
raw_blank, raw_blank]
|
||||
toRadio = MagicMock()
|
||||
toRadio.SerializeToString.return_value = test_data
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
iface = StreamInterface(noProto=True, connectNow=False)
|
||||
iface.stream = stream
|
||||
iface.connect()
|
||||
iface._sendToRadioImpl(toRadio)
|
||||
assert re.search(r'Sending: ', caplog.text, re.MULTILINE)
|
||||
assert re.search(r'reading character', caplog.text, re.MULTILINE)
|
||||
assert re.search(r'In reader loop', caplog.text, re.MULTILINE)
|
||||
print(caplog.text)
|
||||
|
||||
@@ -4,6 +4,7 @@ addopts = -m "not int and not smoke1 and not smoke2 and not smokewifi and not ex
|
||||
|
||||
markers =
|
||||
unit: marks tests as unit tests
|
||||
unitslow: marks slow unit tests
|
||||
int: marks tests as integration tests
|
||||
smoke1: runs smoke tests on a single device connected via USB
|
||||
smoke2: runs smoke tests on a two devices connected via USB
|
||||
|
||||
Reference in New Issue
Block a user