diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 3025bde..92b5b6e 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -1274,6 +1274,10 @@ def common(): format="%(levelname)s file:%(filename)s %(funcName)s line:%(lineno)s %(message)s", ) + # set all meshtastic loggers to DEBUG + if not (args.debug or args.listen) and args.debuglib: + logging.getLogger('meshtastic').setLevel(logging.DEBUG) + if len(sys.argv) == 1: parser.print_help(sys.stderr) meshtastic.util.our_exit("", 1) @@ -2043,6 +2047,10 @@ def initParser(): "--debug", help="Show API library debug log messages", action="store_true" ) + group.add_argument( + "--debuglib", help="Show only API library debug log messages", action="store_true" + ) + group.add_argument( "--test", help="Run stress test against all connected Meshtastic devices", diff --git a/meshtastic/serial_interface.py b/meshtastic/serial_interface.py index 9f99a17..b91e9b8 100644 --- a/meshtastic/serial_interface.py +++ b/meshtastic/serial_interface.py @@ -2,7 +2,7 @@ """ # pylint: disable=R0917 import logging -import platform +import sys import time from typing import List, Optional @@ -14,10 +14,6 @@ from meshtastic.stream_interface import StreamInterface logger = logging.getLogger(__name__) -if platform.system() != "Windows": - import termios - - class SerialInterface(StreamInterface): """Interface class for meshtastic devices over a serial link""" @@ -48,15 +44,7 @@ class SerialInterface(StreamInterface): logger.debug(f"Connecting to {self.devPath}") - # first we need to set the HUPCL so the device will not reboot based on RTS and/or DTR - # see https://github.com/pyserial/pyserial/issues/124 - if platform.system() != "Windows": - with open(self.devPath, encoding="utf8") as f: - attrs = termios.tcgetattr(f) - attrs[2] = attrs[2] & ~termios.HUPCL - termios.tcsetattr(f, termios.TCSAFLUSH, attrs) - f.close() - time.sleep(0.1) + self._set_hupcl_with_termios() self.stream = serial.Serial( self.devPath, 115200, exclusive=True, timeout=0.5, write_timeout=0 @@ -68,6 +56,22 @@ class SerialInterface(StreamInterface): self, debugOut=debugOut, noProto=noProto, connectNow=connectNow, noNodes=noNodes ) + def _set_hupcl_with_termios(self): + """first we need to set the HUPCL so the device will not reboot based on RTS and/or DTR + see https://github.com/pyserial/pyserial/issues/124 + """ + if sys.platform == "win32": + return + + with open(self.devPath, encoding="utf8") as f: + import termios # pylint: disable=C0415,E0401 + attrs = termios.tcgetattr(f) + attrs[2] = attrs[2] & ~termios.HUPCL + termios.tcsetattr(f, termios.TCSAFLUSH, attrs) + f.close() + + time.sleep(0.1) + def __repr__(self): rep = f"SerialInterface(devPath={self.devPath!r}" if hasattr(self, 'debugOut') and self.debugOut is not None: diff --git a/meshtastic/slog/slog.py b/meshtastic/slog/slog.py index f91f066..e29c9d8 100644 --- a/meshtastic/slog/slog.py +++ b/meshtastic/slog/slog.py @@ -257,8 +257,9 @@ class LogSet: if not dir_name: app_dir = root_dir() - dir_name = Path(app_dir, datetime.now().strftime('%Y%m%d-%H%M%S')) - dir_name.mkdir(exist_ok=True) + app_time_dir = Path(app_dir, datetime.now().strftime('%Y%m%d-%H%M%S')) + app_time_dir.mkdir(exist_ok=True) + dir_name = str(app_time_dir) # Also make a 'latest' directory that always points to the most recent logs latest_dir = Path(app_dir, "latest") diff --git a/meshtastic/tests/test_main.py b/meshtastic/tests/test_main.py index 2a7048c..75ec5c1 100644 --- a/meshtastic/tests/test_main.py +++ b/meshtastic/tests/test_main.py @@ -6,7 +6,6 @@ import os import platform import re import sys -import types from unittest.mock import mock_open, MagicMock, patch import pytest @@ -36,13 +35,6 @@ from ..tcp_interface import TCPInterface # from ..remote_hardware import onGPIOreceive # from ..config_pb2 import Config -# create a fake termios for Wwindows, otherwise errors will occur -if sys.platform == "win32": - fake_termios = types.ModuleType("termios") - fake_termios.tcgetattr = lambda fd: None - fake_termios.tcsetattr = lambda fd, when, settings: None - sys.modules["termios"] = fake_termios - @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_main_init_parser_no_args(capsys): @@ -765,12 +757,11 @@ def test_main_sendtext_with_invalid_channel_nine(caplog, capsys): @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") -@patch("termios.tcsetattr") -@patch("termios.tcgetattr") +@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios") @patch("builtins.open", new_callable=mock_open, read_data="data") @patch("serial.Serial") @patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"]) -def test_main_sendtext_with_dest(mock_findPorts, mock_serial, mocked_open, mock_get, mock_set, capsys, caplog, iface_with_nodes): +def test_main_sendtext_with_dest(mock_findPorts, mock_serial, mocked_open, mock_hupcl, capsys, caplog, iface_with_nodes): """Test --sendtext with --dest""" sys.argv = ["", "--sendtext", "hello", "--dest", "foo"] mt_config.args = sys.argv @@ -964,12 +955,11 @@ def test_main_seturl(capsys): @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") -@patch("termios.tcsetattr") -@patch("termios.tcgetattr") +@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios") @patch("builtins.open", new_callable=mock_open, read_data="data") @patch("serial.Serial") @patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"]) -def test_main_set_valid(mocked_findports, mocked_serial, mocked_open, mocked_get, mocked_set, capsys): +def test_main_set_valid(mocked_findports, mocked_serial, mocked_open, mocked_hupcl, capsys): """Test --set with valid field""" sys.argv = ["", "--set", "network.wifi_ssid", "foo"] mt_config.args = sys.argv @@ -989,12 +979,10 @@ def test_main_set_valid(mocked_findports, mocked_serial, mocked_open, mocked_get @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") -@patch("termios.tcsetattr") -@patch("termios.tcgetattr") @patch("builtins.open", new_callable=mock_open, read_data="data") @patch("serial.Serial") @patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"]) -def test_main_set_valid_wifi_psk(mocked_findports, mocked_serial, mocked_open, mocked_get, mocked_set, capsys): +def test_main_set_valid_wifi_psk(mocked_findports, mocked_serial, mocked_open, capsys): """Test --set with valid field""" sys.argv = ["", "--set", "network.wifi_psk", "123456789"] mt_config.args = sys.argv @@ -1014,12 +1002,11 @@ def test_main_set_valid_wifi_psk(mocked_findports, mocked_serial, mocked_open, m @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") -@patch("termios.tcsetattr") -@patch("termios.tcgetattr") +@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios") @patch("builtins.open", new_callable=mock_open, read_data="data") @patch("serial.Serial") @patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"]) -def test_main_set_invalid_wifi_psk(mocked_findports, mocked_serial, mocked_open, mocked_get, mocked_set, capsys): +def test_main_set_invalid_wifi_psk(mocked_findports, mocked_serial, mocked_open, mocked_hupcl, capsys): """Test --set with an invalid value (psk must be 8 or more characters)""" sys.argv = ["", "--set", "network.wifi_psk", "1234567"] mt_config.args = sys.argv @@ -1042,12 +1029,11 @@ def test_main_set_invalid_wifi_psk(mocked_findports, mocked_serial, mocked_open, @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") -@patch("termios.tcsetattr") -@patch("termios.tcgetattr") +@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios") @patch("builtins.open", new_callable=mock_open, read_data="data") @patch("serial.Serial") @patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"]) -def test_main_set_valid_camel_case(mocked_findports, mocked_serial, mocked_open, mocked_get, mocked_set, capsys): +def test_main_set_valid_camel_case(mocked_findports, mocked_serial, mocked_open, mocked_hupcl, capsys): """Test --set with valid field""" sys.argv = ["", "--set", "network.wifi_ssid", "foo"] mt_config.args = sys.argv @@ -1068,12 +1054,11 @@ def test_main_set_valid_camel_case(mocked_findports, mocked_serial, mocked_open, @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") -@patch("termios.tcsetattr") -@patch("termios.tcgetattr") +@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios") @patch("builtins.open", new_callable=mock_open, read_data="data") @patch("serial.Serial") @patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"]) -def test_main_set_with_invalid(mocked_findports, mocked_serial, mocked_open, mocked_get, mocked_set, capsys): +def test_main_set_with_invalid(mocked_findports, mocked_serial, mocked_open, mocked_hupcl, capsys): """Test --set with invalid field""" sys.argv = ["", "--set", "foo", "foo"] mt_config.args = sys.argv @@ -1094,12 +1079,11 @@ def test_main_set_with_invalid(mocked_findports, mocked_serial, mocked_open, moc # TODO: write some negative --configure tests @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") -@patch("termios.tcsetattr") -@patch("termios.tcgetattr") +@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios") @patch("builtins.open", new_callable=mock_open, read_data="data") @patch("serial.Serial") @patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"]) -def test_main_configure_with_snake_case(mocked_findports, mocked_serial, mocked_open, mocked_get, mocked_set, capsys): +def test_main_configure_with_snake_case(mocked_findports, mocked_serial, mocked_open, mocked_hupcl, capsys): """Test --configure with valid file""" sys.argv = ["", "--configure", "example_config.yaml"] mt_config.args = sys.argv @@ -1127,12 +1111,11 @@ def test_main_configure_with_snake_case(mocked_findports, mocked_serial, mocked_ @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") -@patch("termios.tcsetattr") -@patch("termios.tcgetattr") +@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios") @patch("builtins.open", new_callable=mock_open, read_data="data") @patch("serial.Serial") @patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"]) -def test_main_configure_with_camel_case_keys(mocked_findports, mocked_serial, mocked_open, mocked_get, mocked_set, capsys): +def test_main_configure_with_camel_case_keys(mocked_findports, mocked_serial, mocked_open, mocked_hupcl, capsys): """Test --configure with valid file""" sys.argv = ["", "--configure", "exampleConfig.yaml"] mt_config.args = sys.argv @@ -2729,17 +2712,16 @@ def test_tunnel_subnet_arg_with_no_devices(mock_platform_system, caplog, capsys) assert err == "" -@pytest.mark.skipif(sys.platform == "win32", reason="Linux is forced in test and no termios") +@pytest.mark.skipif(sys.platform == "win32", reason="on windows is no fcntl module") @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") @patch("platform.system") -@patch("termios.tcsetattr") -@patch("termios.tcgetattr") +@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios") @patch("builtins.open", new_callable=mock_open, read_data="data") @patch("serial.Serial") @patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"]) def test_tunnel_tunnel_arg( - mocked_findPorts, mocked_serial, mocked_open, mock_get, mock_set, mock_platform_system, caplog, iface_with_nodes, capsys + mocked_findPorts, mocked_serial, mocked_open, mock_hupcl, mock_platform_system, caplog, iface_with_nodes, capsys ): """Test tunnel with tunnel arg (act like we are on a linux system)""" diff --git a/meshtastic/tests/test_serial_interface.py b/meshtastic/tests/test_serial_interface.py index 29683d7..b902234 100644 --- a/meshtastic/tests/test_serial_interface.py +++ b/meshtastic/tests/test_serial_interface.py @@ -13,13 +13,12 @@ from ..protobuf import config_pb2 @pytest.mark.unit @patch("time.sleep") -@patch("termios.tcsetattr") -@patch("termios.tcgetattr") +@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios") @patch("builtins.open", new_callable=mock_open, read_data="data") @patch("serial.Serial") @patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"]) def test_SerialInterface_single_port( - mocked_findPorts, mocked_serial, mocked_open, mock_get, mock_set, mock_sleep, capsys + mocked_findPorts, mocked_serial, mocked_open, mock_hupcl, mock_sleep, capsys ): """Test that we can instantiate a SerialInterface with a single port""" iface = SerialInterface(noProto=True) @@ -29,12 +28,11 @@ def test_SerialInterface_single_port( iface.close() mocked_findPorts.assert_called() mocked_serial.assert_called() + mock_hupcl.assert_called() - # doesn't get called in SerialInterface.__init__ on windows + # doesn't get called in SerialInterface._set_hupcl_with_termios on windows if platform.system() != "Windows": mocked_open.assert_called() - mock_get.assert_called() - mock_set.assert_called() mock_sleep.assert_called() out, err = capsys.readouterr()