diff --git a/README.md b/README.md index 003c2be..37b26ed 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ An example using Python 3 code to send a message to the mesh: ``` import meshtastic -interface = meshtastic.SerialInterface() # By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0 +interface = meshtastic.serial_interface.SerialInterface() # By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0 interface.sendText("hello mesh") # or sendData to send binary data, see documentations for other options. interface.close() ``` @@ -103,7 +103,7 @@ You can even set the channel preshared key to a particular AES128 or AES256 sequ meshtastic --ch-set psk 0x1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b --info ``` -Use "--ch-set psk none" to turn off encryption. +Use "--ch-set psk none" to turn off encryption. Use "--ch-set psk random" will assign a new (high quality) random AES256 key to the primary channel (similar to what the Android app does when making new channels). diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index ce74979..844e729 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -311,7 +311,10 @@ class MeshInterface: elif destinationId == BROADCAST_ADDR: nodeNum = BROADCAST_NUM elif destinationId == LOCAL_ADDR: - nodeNum = self.myInfo.my_node_num + if self.myInfo: + nodeNum = self.myInfo.my_node_num + else: + our_exit("Warning: No myInfo found.") # A simple hex style nodeid - we can parse this without needing the DB elif destinationId.startswith("!"): nodeNum = int(destinationId[1:], 16) @@ -334,7 +337,7 @@ class MeshInterface: meshPacket.id = self._generatePacketId() toRadio.packet.CopyFrom(meshPacket) - #logging.debug(f"Sending packet: {stripnl(meshPacket)}") + logging.debug(f"Sending packet: {stripnl(meshPacket)}") self._sendToRadio(toRadio) return meshPacket @@ -374,8 +377,9 @@ class MeshInterface: def _waitConnected(self): """Block until the initial node db download is complete, or timeout and raise an exception""" - if not self.isConnected.wait(15.0): # timeout after x seconds - raise Exception("Timed out waiting for connection completion") + if not self.noProto: + if not self.isConnected.wait(15.0): # timeout after x seconds + raise Exception("Timed out waiting for connection completion") # If we failed while connecting, raise the connection to the client if self.failure: diff --git a/meshtastic/tests/test_mesh_interface.py b/meshtastic/tests/test_mesh_interface.py index 98a6b9a..2917c4e 100644 --- a/meshtastic/tests/test_mesh_interface.py +++ b/meshtastic/tests/test_mesh_interface.py @@ -9,7 +9,7 @@ import pytest from ..mesh_interface import MeshInterface from ..node import Node from .. import mesh_pb2 -from ..__init__ import LOCAL_ADDR +from ..__init__ import LOCAL_ADDR, BROADCAST_ADDR @pytest.mark.unit @@ -284,3 +284,62 @@ def test_sendPosition_with_a_position(caplog, reset_globals): assert re.search(r'p.latitude_i:408', caplog.text, re.MULTILINE) assert re.search(r'p.longitude_i:-11186', caplog.text, re.MULTILINE) assert re.search(r'p.altitude:201', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_sendPacket_with_no_destination(capsys, reset_globals): + """Test _sendPacket()""" + iface = MeshInterface(noProto=True) + with pytest.raises(SystemExit) as pytest_wrapped_e: + iface._sendPacket(b'', destinationId=None) + out, err = capsys.readouterr() + assert re.search(r'Warning: destinationId must not be None', out, re.MULTILINE) + assert err == '' + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + + +@pytest.mark.unit +def test_sendPacket_with_destination_as_int(caplog, reset_globals): + """Test _sendPacket() with int as a destination""" + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.DEBUG): + meshPacket = mesh_pb2.MeshPacket() + iface._sendPacket(meshPacket, destinationId=123) + assert re.search(r'Sending packet', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog, reset_globals): + """Test _sendPacket() with BROADCAST_ADDR as a destination""" + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.DEBUG): + meshPacket = mesh_pb2.MeshPacket() + iface._sendPacket(meshPacket, destinationId=BROADCAST_ADDR) + assert re.search(r'Sending packet', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_sendPacket_with_destination_as_LOCAL_ADDR_no_myInfo(capsys, reset_globals): + """Test _sendPacket() with LOCAL_ADDR as a destination with no myInfo""" + iface = MeshInterface(noProto=True) + with pytest.raises(SystemExit) as pytest_wrapped_e: + meshPacket = mesh_pb2.MeshPacket() + iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR) + out, err = capsys.readouterr() + assert re.search(r'Warning: No myInfo', out, re.MULTILINE) + assert err == '' + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + + +@pytest.mark.unit +def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog, reset_globals): + """Test _sendPacket() with LOCAL_ADDR as a destination with myInfo""" + iface = MeshInterface(noProto=True) + myInfo = MagicMock() + iface.myInfo = myInfo + with caplog.at_level(logging.DEBUG): + meshPacket = mesh_pb2.MeshPacket() + iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR) + assert re.search(r'Sending packet', caplog.text, re.MULTILINE)