mirror of
https://github.com/meshtastic/python.git
synced 2025-12-30 19:37:52 -05:00
Merge pull request #204 from mkinney/add_more_unit_tests
get last two lines covered in node
This commit is contained in:
@@ -68,8 +68,7 @@ class MeshInterface:
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
if exc_type is not None and exc_value is not None:
|
||||
logging.error(
|
||||
f'An exception of type {exc_type} with value {exc_value} has occurred')
|
||||
logging.error(f'An exception of type {exc_type} with value {exc_value} has occurred')
|
||||
if traceback is not None:
|
||||
logging.error(f'Traceback: {traceback}')
|
||||
self.close()
|
||||
@@ -394,11 +393,11 @@ class MeshInterface:
|
||||
return user.get('shortName', None)
|
||||
return None
|
||||
|
||||
def _waitConnected(self):
|
||||
def _waitConnected(self, timeout=15.0):
|
||||
"""Block until the initial node db download is complete, or timeout
|
||||
and raise an exception"""
|
||||
if not self.noProto:
|
||||
if not self.isConnected.wait(15.0): # timeout after x seconds
|
||||
if not self.isConnected.wait(timeout): # timeout after x seconds
|
||||
raise Exception("Timed out waiting for connection completion")
|
||||
|
||||
# If we failed while connecting, raise the connection to the client
|
||||
@@ -416,8 +415,7 @@ class MeshInterface:
|
||||
def _disconnected(self):
|
||||
"""Called by subclasses to tell clients this interface has disconnected"""
|
||||
self.isConnected.clear()
|
||||
publishingThread.queueWork(lambda: pub.sendMessage(
|
||||
"meshtastic.connection.lost", interface=self))
|
||||
publishingThread.queueWork(lambda: pub.sendMessage("meshtastic.connection.lost", interface=self))
|
||||
|
||||
def _startHeartbeat(self):
|
||||
"""We need to send a heartbeat message to the device every X seconds"""
|
||||
@@ -443,8 +441,7 @@ class MeshInterface:
|
||||
if not self.isConnected.is_set():
|
||||
self.isConnected.set()
|
||||
self._startHeartbeat()
|
||||
publishingThread.queueWork(lambda: pub.sendMessage(
|
||||
"meshtastic.connection.established", interface=self))
|
||||
publishingThread.queueWork(lambda: pub.sendMessage("meshtastic.connection.established", interface=self))
|
||||
|
||||
def _startConfig(self):
|
||||
"""Start device packets flowing"""
|
||||
|
||||
@@ -10,6 +10,8 @@ from ..mesh_interface import MeshInterface
|
||||
from ..node import Node
|
||||
from .. import mesh_pb2
|
||||
from ..__init__ import LOCAL_ADDR, BROADCAST_ADDR
|
||||
from ..radioconfig_pb2 import RadioConfig
|
||||
from ..util import Timeout
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -164,6 +166,22 @@ def test_sendPosition(reset_globals, caplog):
|
||||
assert re.search(r'p.time:', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_close_with_heartbeatTimer(reset_globals, caplog):
|
||||
"""Test close() with heartbeatTimer"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
anode = Node('foo', 'bar')
|
||||
radioConfig = RadioConfig()
|
||||
radioConfig.preferences.phone_timeout_secs = 10
|
||||
anode.radioConfig = radioConfig
|
||||
iface.localNode = anode
|
||||
assert iface.heartbeatTimer is None
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
iface._startHeartbeat()
|
||||
assert iface.heartbeatTimer is not None
|
||||
iface.close()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_handleFromRadio_empty_payload(reset_globals, caplog):
|
||||
"""Test _handleFromRadio"""
|
||||
@@ -543,3 +561,70 @@ def test_getOrCreateByNum(capsys, reset_globals, iface_with_nodes):
|
||||
iface.myInfo.my_node_num = 2475227164
|
||||
tmp = iface._getOrCreateByNum(2475227164)
|
||||
assert tmp['num'] == 2475227164
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_enter():
|
||||
"""Test __enter__()"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
assert iface == iface.__enter__()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_exit_with_exception(caplog):
|
||||
"""Test __exit__()"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
with caplog.at_level(logging.ERROR):
|
||||
iface.__exit__('foo', 'bar', 'baz')
|
||||
assert re.search(r'An exception of type foo with value bar has occurred', caplog.text, re.MULTILINE)
|
||||
assert re.search(r'Traceback: baz', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_showNodes_exclude_self(capsys, caplog, reset_globals, iface_with_nodes):
|
||||
"""Test that we hit that continue statement"""
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
iface = iface_with_nodes
|
||||
iface.localNode.nodeNum = 2475227164
|
||||
iface.showNodes()
|
||||
iface.showNodes(includeSelf=False)
|
||||
capsys.readouterr()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_waitForConfig(caplog, capsys):
|
||||
"""Test waitForConfig()"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
# override how long to wait
|
||||
iface._timeout = Timeout(0.01)
|
||||
with pytest.raises(Exception) as pytest_wrapped_e:
|
||||
iface.waitForConfig()
|
||||
assert pytest_wrapped_e.type == Exception
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Exception: Timed out waiting for interface config', err, re.MULTILINE)
|
||||
assert out == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_waitConnected_raises_an_exception(caplog, capsys):
|
||||
"""Test waitConnected()"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
with pytest.raises(Exception) as pytest_wrapped_e:
|
||||
iface.failure = "warn about something"
|
||||
iface._waitConnected(0.01)
|
||||
assert pytest_wrapped_e.type == Exception
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'warn about something', err, re.MULTILINE)
|
||||
assert out == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_waitConnected_isConnected_timeout(caplog, capsys):
|
||||
"""Test waitConnected()"""
|
||||
with pytest.raises(Exception) as pytest_wrapped_e:
|
||||
iface = MeshInterface()
|
||||
iface._waitConnected(0.01)
|
||||
assert pytest_wrapped_e.type == Exception
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'warn about something', err, re.MULTILINE)
|
||||
assert out == ''
|
||||
|
||||
@@ -11,6 +11,7 @@ from ..serial_interface import SerialInterface
|
||||
from ..admin_pb2 import AdminMessage
|
||||
from ..channel_pb2 import Channel
|
||||
from ..radioconfig_pb2 import RadioConfig
|
||||
from ..util import Timeout
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -90,6 +91,16 @@ def test_setOwner_no_short_name_and_long_name_has_words(caplog):
|
||||
assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_setOwner_long_name_no_short(caplog):
|
||||
"""Test setOwner"""
|
||||
anode = Node('foo', 'bar', noProto=True)
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
anode.setOwner(long_name ='Aabo', is_licensed=True)
|
||||
assert re.search(r'p.set_owner.long_name:Aabo:', caplog.text, re.MULTILINE)
|
||||
assert re.search(r'p.set_owner.short_name:Aab:', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_exitSimulator(caplog):
|
||||
"""Test exitSimulator"""
|
||||
@@ -869,3 +880,14 @@ def test_onResponseRequestSetting_with_error(capsys):
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Error on response', out)
|
||||
assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_waitForConfig():
|
||||
"""Test waitForConfig()"""
|
||||
anode = Node('foo', 'bar')
|
||||
radioConfig = RadioConfig()
|
||||
anode.radioConfig = radioConfig
|
||||
anode._timeout = Timeout(0.01)
|
||||
result = anode.waitForConfig()
|
||||
assert not result
|
||||
|
||||
Reference in New Issue
Block a user