unit tests for sendPacket()

This commit is contained in:
Mike Kinney
2021-12-21 15:21:30 -08:00
parent 302f52469a
commit 2919930473
3 changed files with 70 additions and 7 deletions

View File

@@ -28,7 +28,7 @@ An example using Python 3 code to send a message to the mesh:
```
import meshtastic
interface = meshtastic.SerialInterface() # By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0
interface = meshtastic.serial_interface.SerialInterface() # By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0
interface.sendText("hello mesh") # or sendData to send binary data, see documentations for other options.
interface.close()
```
@@ -103,7 +103,7 @@ You can even set the channel preshared key to a particular AES128 or AES256 sequ
meshtastic --ch-set psk 0x1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b --info
```
Use "--ch-set psk none" to turn off encryption.
Use "--ch-set psk none" to turn off encryption.
Use "--ch-set psk random" will assign a new (high quality) random AES256 key to the primary channel (similar to what the Android app does when making new channels).

View File

@@ -311,7 +311,10 @@ class MeshInterface:
elif destinationId == BROADCAST_ADDR:
nodeNum = BROADCAST_NUM
elif destinationId == LOCAL_ADDR:
nodeNum = self.myInfo.my_node_num
if self.myInfo:
nodeNum = self.myInfo.my_node_num
else:
our_exit("Warning: No myInfo found.")
# A simple hex style nodeid - we can parse this without needing the DB
elif destinationId.startswith("!"):
nodeNum = int(destinationId[1:], 16)
@@ -334,7 +337,7 @@ class MeshInterface:
meshPacket.id = self._generatePacketId()
toRadio.packet.CopyFrom(meshPacket)
#logging.debug(f"Sending packet: {stripnl(meshPacket)}")
logging.debug(f"Sending packet: {stripnl(meshPacket)}")
self._sendToRadio(toRadio)
return meshPacket
@@ -374,8 +377,9 @@ class MeshInterface:
def _waitConnected(self):
"""Block until the initial node db download is complete, or timeout
and raise an exception"""
if not self.isConnected.wait(15.0): # timeout after x seconds
raise Exception("Timed out waiting for connection completion")
if not self.noProto:
if not self.isConnected.wait(15.0): # timeout after x seconds
raise Exception("Timed out waiting for connection completion")
# If we failed while connecting, raise the connection to the client
if self.failure:

View File

@@ -9,7 +9,7 @@ import pytest
from ..mesh_interface import MeshInterface
from ..node import Node
from .. import mesh_pb2
from ..__init__ import LOCAL_ADDR
from ..__init__ import LOCAL_ADDR, BROADCAST_ADDR
@pytest.mark.unit
@@ -284,3 +284,62 @@ def test_sendPosition_with_a_position(caplog, reset_globals):
assert re.search(r'p.latitude_i:408', caplog.text, re.MULTILINE)
assert re.search(r'p.longitude_i:-11186', caplog.text, re.MULTILINE)
assert re.search(r'p.altitude:201', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_sendPacket_with_no_destination(capsys, reset_globals):
"""Test _sendPacket()"""
iface = MeshInterface(noProto=True)
with pytest.raises(SystemExit) as pytest_wrapped_e:
iface._sendPacket(b'', destinationId=None)
out, err = capsys.readouterr()
assert re.search(r'Warning: destinationId must not be None', out, re.MULTILINE)
assert err == ''
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
@pytest.mark.unit
def test_sendPacket_with_destination_as_int(caplog, reset_globals):
"""Test _sendPacket() with int as a destination"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId=123)
assert re.search(r'Sending packet', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog, reset_globals):
"""Test _sendPacket() with BROADCAST_ADDR as a destination"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId=BROADCAST_ADDR)
assert re.search(r'Sending packet', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_sendPacket_with_destination_as_LOCAL_ADDR_no_myInfo(capsys, reset_globals):
"""Test _sendPacket() with LOCAL_ADDR as a destination with no myInfo"""
iface = MeshInterface(noProto=True)
with pytest.raises(SystemExit) as pytest_wrapped_e:
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR)
out, err = capsys.readouterr()
assert re.search(r'Warning: No myInfo', out, re.MULTILINE)
assert err == ''
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
@pytest.mark.unit
def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog, reset_globals):
"""Test _sendPacket() with LOCAL_ADDR as a destination with myInfo"""
iface = MeshInterface(noProto=True)
myInfo = MagicMock()
iface.myInfo = myInfo
with caplog.at_level(logging.DEBUG):
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR)
assert re.search(r'Sending packet', caplog.text, re.MULTILINE)