mirror of
https://github.com/meshtastic/python.git
synced 2026-01-21 22:28:00 -05:00
Merge pull request #228 from mkinney/more_unit_tests
add more unit tests
This commit is contained in:
@@ -82,9 +82,6 @@ from meshtastic import (mesh_pb2, portnums_pb2, apponly_pb2, admin_pb2,
|
||||
environmental_measurement_pb2, remote_hardware_pb2,
|
||||
channel_pb2, radioconfig_pb2, util)
|
||||
|
||||
if platform.system() == 'Linux':
|
||||
# pylint: disable=E0401
|
||||
import pygatt
|
||||
|
||||
# Note: To follow PEP224, comments should be after the module variable.
|
||||
|
||||
@@ -132,6 +129,7 @@ def _onTextReceive(iface, asDict):
|
||||
#
|
||||
# Usually btw this problem is caused by apps sending binary data but setting the payload type to
|
||||
# text.
|
||||
logging.debug(f'in _onTextReceive() asDict:{asDict}')
|
||||
try:
|
||||
asBytes = asDict["decoded"]["payload"]
|
||||
asDict["decoded"]["text"] = asBytes.decode("utf-8")
|
||||
@@ -142,22 +140,30 @@ def _onTextReceive(iface, asDict):
|
||||
|
||||
def _onPositionReceive(iface, asDict):
|
||||
"""Special auto parsing for received messages"""
|
||||
p = asDict["decoded"]["position"]
|
||||
iface._fixupPosition(p)
|
||||
# update node DB as needed
|
||||
iface._getOrCreateByNum(asDict["from"])["position"] = p
|
||||
logging.debug(f'in _onPositionReceive() asDict:{asDict}')
|
||||
if 'decoded' in asDict:
|
||||
if 'position' in asDict['decoded'] and 'from' in asDict:
|
||||
p = asDict["decoded"]["position"]
|
||||
logging.debug(f'p:{p}')
|
||||
p = iface._fixupPosition(p)
|
||||
logging.debug(f'after fixup p:{p}')
|
||||
# update node DB as needed
|
||||
iface._getOrCreateByNum(asDict["from"])["position"] = p
|
||||
|
||||
|
||||
def _onNodeInfoReceive(iface, asDict):
|
||||
"""Special auto parsing for received messages"""
|
||||
p = asDict["decoded"]["user"]
|
||||
# decode user protobufs and update nodedb, provide decoded version as "position" in the published msg
|
||||
# update node DB as needed
|
||||
n = iface._getOrCreateByNum(asDict["from"])
|
||||
n["user"] = p
|
||||
# We now have a node ID, make sure it is uptodate in that table
|
||||
iface.nodes[p["id"]] = n
|
||||
_receiveInfoUpdate(iface, asDict)
|
||||
logging.debug(f'in _onNodeInfoReceive() asDict:{asDict}')
|
||||
if 'decoded' in asDict:
|
||||
if 'user' in asDict['decoded'] and 'from' in asDict:
|
||||
p = asDict["decoded"]["user"]
|
||||
# decode user protobufs and update nodedb, provide decoded version as "position" in the published msg
|
||||
# update node DB as needed
|
||||
n = iface._getOrCreateByNum(asDict["from"])
|
||||
n["user"] = p
|
||||
# We now have a node ID, make sure it is uptodate in that table
|
||||
iface.nodes[p["id"]] = n
|
||||
_receiveInfoUpdate(iface, asDict)
|
||||
|
||||
|
||||
def _receiveInfoUpdate(iface, asDict):
|
||||
|
||||
@@ -62,8 +62,7 @@ class StreamInterface(MeshInterface):
|
||||
# because we want to ensure it is looking for START1)
|
||||
p = bytearray([START2] * 32)
|
||||
self._writeBytes(p)
|
||||
if not self.noProto:
|
||||
time.sleep(0.1) # wait 100ms to give device time to start running
|
||||
time.sleep(0.1) # wait 100ms to give device time to start running
|
||||
|
||||
self._rxThread.start()
|
||||
|
||||
@@ -90,8 +89,7 @@ class StreamInterface(MeshInterface):
|
||||
self.stream.write(b)
|
||||
self.stream.flush()
|
||||
# we sleep here to give the TBeam a chance to work
|
||||
if not self.noProto:
|
||||
time.sleep(0.1)
|
||||
time.sleep(0.1)
|
||||
|
||||
def _readBytes(self, length):
|
||||
"""Read an array of bytes from our stream"""
|
||||
|
||||
@@ -33,6 +33,12 @@ class TCPInterface(StreamInterface):
|
||||
StreamInterface.__init__(self, debugOut=debugOut, noProto=noProto,
|
||||
connectNow=connectNow)
|
||||
|
||||
def _socket_shutdown(self):
|
||||
"""Shutdown the socket.
|
||||
Note: Broke out this line so the exception could be unit tested.
|
||||
"""
|
||||
self.socket.shutdown(socket.SHUT_RDWR)
|
||||
|
||||
def myConnect(self):
|
||||
"""Connect to socket"""
|
||||
server_address = (self.hostname, self.portNumber)
|
||||
@@ -48,7 +54,7 @@ class TCPInterface(StreamInterface):
|
||||
self._wantExit = True
|
||||
if not self.socket is None:
|
||||
try:
|
||||
self.socket.shutdown(socket.SHUT_RDWR)
|
||||
self._socket_shutdown()
|
||||
except:
|
||||
pass # Ignore errors in shutdown, because we might have a race with the server
|
||||
self.socket.close()
|
||||
|
||||
61
meshtastic/tests/test_init.py
Normal file
61
meshtastic/tests/test_init.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""Meshtastic unit tests for __init__.py"""
|
||||
|
||||
import re
|
||||
import logging
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
import pytest
|
||||
|
||||
from meshtastic.__init__ import _onTextReceive, _onPositionReceive, _onNodeInfoReceive
|
||||
from ..serial_interface import SerialInterface
|
||||
from ..globals import Globals
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_init_onTextReceive_with_exception(caplog):
|
||||
"""Test _onTextReceive"""
|
||||
args = MagicMock()
|
||||
Globals.getInstance().set_args(args)
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
packet = {}
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
_onTextReceive(iface, packet)
|
||||
assert re.search(r'in _onTextReceive', caplog.text, re.MULTILINE)
|
||||
assert re.search(r'Malformatted', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_init_onPositionReceive(caplog):
|
||||
"""Test _onPositionReceive"""
|
||||
args = MagicMock()
|
||||
Globals.getInstance().set_args(args)
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
packet = {
|
||||
'from': 'foo',
|
||||
'decoded': {
|
||||
'position': {}
|
||||
}
|
||||
}
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
_onPositionReceive(iface, packet)
|
||||
assert re.search(r'in _onPositionReceive', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_init_onNodeInfoReceive(caplog, iface_with_nodes):
|
||||
"""Test _onNodeInfoReceive"""
|
||||
args = MagicMock()
|
||||
Globals.getInstance().set_args(args)
|
||||
iface = iface_with_nodes
|
||||
iface.myInfo.my_node_num = 2475227164
|
||||
packet = {
|
||||
'from': 'foo',
|
||||
'decoded': {
|
||||
'user': {
|
||||
'id': 'bar',
|
||||
},
|
||||
}
|
||||
}
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
_onNodeInfoReceive(iface, packet)
|
||||
assert re.search(r'in _onNodeInfoReceive', caplog.text, re.MULTILINE)
|
||||
@@ -27,6 +27,23 @@ def test_TCPInterface(capsys):
|
||||
iface.close()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_TCPInterface_exception():
|
||||
"""Test that we can instantiate a TCPInterface"""
|
||||
|
||||
def throw_an_exception():
|
||||
raise ValueError("Fake exception.")
|
||||
|
||||
with patch('meshtastic.tcp_interface.TCPInterface._socket_shutdown') as mock_shutdown:
|
||||
mock_shutdown.side_effect = throw_an_exception
|
||||
with patch('socket.socket') as mock_socket:
|
||||
iface = TCPInterface(hostname='localhost', noProto=True)
|
||||
iface.myConnect()
|
||||
iface.close()
|
||||
assert mock_socket.called
|
||||
assert mock_shutdown.called
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_TCPInterface_without_connecting():
|
||||
"""Test that we can instantiate a TCPInterface with connectNow as false"""
|
||||
|
||||
Reference in New Issue
Block a user