diff --git a/meshtastic/serial_interface.py b/meshtastic/serial_interface.py index 7475ca2..758ee2c 100644 --- a/meshtastic/serial_interface.py +++ b/meshtastic/serial_interface.py @@ -56,14 +56,17 @@ class SerialInterface(StreamInterface): def connect(self) -> None: logger.debug(f"Connecting to {self.devPath}") + dev_path = self.devPath + if dev_path is None: + raise RuntimeError("Serial device path is not set") if sys.platform != "win32": - with open(self.devPath, encoding="utf8") as f: + with open(dev_path, encoding="utf8") as f: self._set_hupcl_with_termios(f) time.sleep(0.1) self.stream = serial.Serial( - self.devPath, 115200, exclusive=True, timeout=0.5, write_timeout=0 + dev_path, 115200, exclusive=True, timeout=0.5, write_timeout=0 ) self.stream.flush() # type: ignore[attr-defined] time.sleep(0.1) diff --git a/meshtastic/tests/test_main.py b/meshtastic/tests/test_main.py index 048614e..b664665 100644 --- a/meshtastic/tests/test_main.py +++ b/meshtastic/tests/test_main.py @@ -7,6 +7,7 @@ import platform import re import sys import tempfile +from types import SimpleNamespace from unittest.mock import mock_open, MagicMock, patch import pytest @@ -1722,11 +1723,9 @@ def test_main_onReceive_with_sendtext(caplog, capsys): @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") -def test_main_onReceive_with_text(caplog, capsys): - """Test onReceive with text""" - args = MagicMock() - args.sendtext.return_value = "foo" - mt_config.args = args +def test_main_onReceive_with_text_replies_on_target_channel(caplog, capsys): + """Test onReceive replies when channel matches --ch-index (default 0).""" + mt_config.args = SimpleNamespace(reply=True, ch_index=0, sendtext=None) # Note: 'TEXT_MESSAGE_APP' value is 1 # Note: Some of this is faked below. @@ -1752,6 +1751,83 @@ def test_main_onReceive_with_text(caplog, capsys): assert re.search(r"in onReceive", caplog.text, re.MULTILINE) out, err = capsys.readouterr() assert re.search(r"Sending reply", out, re.MULTILINE) + iface.sendText.assert_called_once_with( + "got msg 'faked' with rxSnr: 6.0 and hopLimit: 3", channelIndex=0 + ) + assert err == "" + + +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +def test_main_onReceive_with_text_ignores_non_target_channel(caplog, capsys): + """Test onReceive does not reply when packet channel differs from --ch-index.""" + mt_config.args = SimpleNamespace(reply=True, ch_index=1, sendtext=None) + + packet = { + "to": 4294967295, + "decoded": {"portnum": 1, "payload": "hello", "text": "faked"}, + "id": 334776977, + "hop_limit": 3, + "want_ack": True, + "rxSnr": 6.0, + "hopLimit": 3, + "raw": "faked", + "fromId": "!28b5465c", + "toId": "^all", + "channel": 0, + } + + iface = MagicMock(autospec=SerialInterface) + iface.myInfo.my_node_num = 4294967295 + + with patch("meshtastic.serial_interface.SerialInterface", return_value=iface): + with caplog.at_level(logging.DEBUG): + onReceive(packet, iface) + assert re.search(r"in onReceive", caplog.text, re.MULTILINE) + out, err = capsys.readouterr() + assert re.search( + r"Ignored message on channel 0 \(waiting for channel 1\)", out, re.MULTILINE + ) + iface.sendText.assert_not_called() + assert err == "" + + +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +def test_main_onReceive_with_text_replies_on_explicit_matching_channel(caplog, capsys): + """Test onReceive replies when explicit packet channel matches --ch-index.""" + mt_config.args = SimpleNamespace(reply=True, ch_index=2, sendtext=None) + + packet = { + "to": 4294967295, + "decoded": {"portnum": 1, "payload": "hello", "text": "faked"}, + "id": 334776977, + "hop_limit": 3, + "want_ack": True, + "rxSnr": 6.0, + "hopLimit": 3, + "raw": "faked", + "fromId": "!28b5465c", + "toId": "^all", + "channel": 2, + } + + iface = MagicMock(autospec=SerialInterface) + iface.myInfo.my_node_num = 4294967295 + + with patch("meshtastic.serial_interface.SerialInterface", return_value=iface): + with caplog.at_level(logging.DEBUG): + onReceive(packet, iface) + assert re.search(r"in onReceive", caplog.text, re.MULTILINE) + out, err = capsys.readouterr() + assert re.search( + r"Received channel 2\. Sending reply: got msg 'faked' with rxSnr: 6.0 and hopLimit: 3", + out, + re.MULTILINE, + ) + iface.sendText.assert_called_once_with( + "got msg 'faked' with rxSnr: 6.0 and hopLimit: 3", channelIndex=2 + ) assert err == ""