new Parameter --debuglib for only meshtastic debug, more termios fixes for windows tests

(cherry picked from commit 4fc4d41d3d29998bb7b697bf412be5c1449ea950)
This commit is contained in:
shukari
2025-08-19 11:06:03 +02:00
parent cbf7b9befe
commit 52eb112b95
5 changed files with 51 additions and 58 deletions

View File

@@ -1274,6 +1274,10 @@ def common():
format="%(levelname)s file:%(filename)s %(funcName)s line:%(lineno)s %(message)s",
)
# set all meshtastic loggers to DEBUG
if not (args.debug or args.listen) and args.debuglib:
logging.getLogger('meshtastic').setLevel(logging.DEBUG)
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
meshtastic.util.our_exit("", 1)
@@ -2043,6 +2047,10 @@ def initParser():
"--debug", help="Show API library debug log messages", action="store_true"
)
group.add_argument(
"--debuglib", help="Show only API library debug log messages", action="store_true"
)
group.add_argument(
"--test",
help="Run stress test against all connected Meshtastic devices",

View File

@@ -2,7 +2,7 @@
"""
# pylint: disable=R0917
import logging
import platform
import sys
import time
from typing import List, Optional
@@ -14,10 +14,6 @@ from meshtastic.stream_interface import StreamInterface
logger = logging.getLogger(__name__)
if platform.system() != "Windows":
import termios
class SerialInterface(StreamInterface):
"""Interface class for meshtastic devices over a serial link"""
@@ -48,15 +44,7 @@ class SerialInterface(StreamInterface):
logger.debug(f"Connecting to {self.devPath}")
# first we need to set the HUPCL so the device will not reboot based on RTS and/or DTR
# see https://github.com/pyserial/pyserial/issues/124
if platform.system() != "Windows":
with open(self.devPath, encoding="utf8") as f:
attrs = termios.tcgetattr(f)
attrs[2] = attrs[2] & ~termios.HUPCL
termios.tcsetattr(f, termios.TCSAFLUSH, attrs)
f.close()
time.sleep(0.1)
self._set_hupcl_with_termios()
self.stream = serial.Serial(
self.devPath, 115200, exclusive=True, timeout=0.5, write_timeout=0
@@ -68,6 +56,22 @@ class SerialInterface(StreamInterface):
self, debugOut=debugOut, noProto=noProto, connectNow=connectNow, noNodes=noNodes
)
def _set_hupcl_with_termios(self):
"""first we need to set the HUPCL so the device will not reboot based on RTS and/or DTR
see https://github.com/pyserial/pyserial/issues/124
"""
if sys.platform == "win32":
return
with open(self.devPath, encoding="utf8") as f:
import termios # pylint: disable=C0415,E0401
attrs = termios.tcgetattr(f)
attrs[2] = attrs[2] & ~termios.HUPCL
termios.tcsetattr(f, termios.TCSAFLUSH, attrs)
f.close()
time.sleep(0.1)
def __repr__(self):
rep = f"SerialInterface(devPath={self.devPath!r}"
if hasattr(self, 'debugOut') and self.debugOut is not None:

View File

@@ -257,8 +257,9 @@ class LogSet:
if not dir_name:
app_dir = root_dir()
dir_name = Path(app_dir, datetime.now().strftime('%Y%m%d-%H%M%S'))
dir_name.mkdir(exist_ok=True)
app_time_dir = Path(app_dir, datetime.now().strftime('%Y%m%d-%H%M%S'))
app_time_dir.mkdir(exist_ok=True)
dir_name = str(app_time_dir)
# Also make a 'latest' directory that always points to the most recent logs
latest_dir = Path(app_dir, "latest")

View File

@@ -6,7 +6,6 @@ import os
import platform
import re
import sys
import types
from unittest.mock import mock_open, MagicMock, patch
import pytest
@@ -36,13 +35,6 @@ from ..tcp_interface import TCPInterface
# from ..remote_hardware import onGPIOreceive
# from ..config_pb2 import Config
# create a fake termios for Wwindows, otherwise errors will occur
if sys.platform == "win32":
fake_termios = types.ModuleType("termios")
fake_termios.tcgetattr = lambda fd: None
fake_termios.tcsetattr = lambda fd, when, settings: None
sys.modules["termios"] = fake_termios
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_main_init_parser_no_args(capsys):
@@ -765,12 +757,11 @@ def test_main_sendtext_with_invalid_channel_nine(caplog, capsys):
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
@patch("termios.tcsetattr")
@patch("termios.tcgetattr")
@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios")
@patch("builtins.open", new_callable=mock_open, read_data="data")
@patch("serial.Serial")
@patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"])
def test_main_sendtext_with_dest(mock_findPorts, mock_serial, mocked_open, mock_get, mock_set, capsys, caplog, iface_with_nodes):
def test_main_sendtext_with_dest(mock_findPorts, mock_serial, mocked_open, mock_hupcl, capsys, caplog, iface_with_nodes):
"""Test --sendtext with --dest"""
sys.argv = ["", "--sendtext", "hello", "--dest", "foo"]
mt_config.args = sys.argv
@@ -964,12 +955,11 @@ def test_main_seturl(capsys):
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
@patch("termios.tcsetattr")
@patch("termios.tcgetattr")
@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios")
@patch("builtins.open", new_callable=mock_open, read_data="data")
@patch("serial.Serial")
@patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"])
def test_main_set_valid(mocked_findports, mocked_serial, mocked_open, mocked_get, mocked_set, capsys):
def test_main_set_valid(mocked_findports, mocked_serial, mocked_open, mocked_hupcl, capsys):
"""Test --set with valid field"""
sys.argv = ["", "--set", "network.wifi_ssid", "foo"]
mt_config.args = sys.argv
@@ -989,12 +979,10 @@ def test_main_set_valid(mocked_findports, mocked_serial, mocked_open, mocked_get
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
@patch("termios.tcsetattr")
@patch("termios.tcgetattr")
@patch("builtins.open", new_callable=mock_open, read_data="data")
@patch("serial.Serial")
@patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"])
def test_main_set_valid_wifi_psk(mocked_findports, mocked_serial, mocked_open, mocked_get, mocked_set, capsys):
def test_main_set_valid_wifi_psk(mocked_findports, mocked_serial, mocked_open, capsys):
"""Test --set with valid field"""
sys.argv = ["", "--set", "network.wifi_psk", "123456789"]
mt_config.args = sys.argv
@@ -1014,12 +1002,11 @@ def test_main_set_valid_wifi_psk(mocked_findports, mocked_serial, mocked_open, m
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
@patch("termios.tcsetattr")
@patch("termios.tcgetattr")
@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios")
@patch("builtins.open", new_callable=mock_open, read_data="data")
@patch("serial.Serial")
@patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"])
def test_main_set_invalid_wifi_psk(mocked_findports, mocked_serial, mocked_open, mocked_get, mocked_set, capsys):
def test_main_set_invalid_wifi_psk(mocked_findports, mocked_serial, mocked_open, mocked_hupcl, capsys):
"""Test --set with an invalid value (psk must be 8 or more characters)"""
sys.argv = ["", "--set", "network.wifi_psk", "1234567"]
mt_config.args = sys.argv
@@ -1042,12 +1029,11 @@ def test_main_set_invalid_wifi_psk(mocked_findports, mocked_serial, mocked_open,
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
@patch("termios.tcsetattr")
@patch("termios.tcgetattr")
@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios")
@patch("builtins.open", new_callable=mock_open, read_data="data")
@patch("serial.Serial")
@patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"])
def test_main_set_valid_camel_case(mocked_findports, mocked_serial, mocked_open, mocked_get, mocked_set, capsys):
def test_main_set_valid_camel_case(mocked_findports, mocked_serial, mocked_open, mocked_hupcl, capsys):
"""Test --set with valid field"""
sys.argv = ["", "--set", "network.wifi_ssid", "foo"]
mt_config.args = sys.argv
@@ -1068,12 +1054,11 @@ def test_main_set_valid_camel_case(mocked_findports, mocked_serial, mocked_open,
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
@patch("termios.tcsetattr")
@patch("termios.tcgetattr")
@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios")
@patch("builtins.open", new_callable=mock_open, read_data="data")
@patch("serial.Serial")
@patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"])
def test_main_set_with_invalid(mocked_findports, mocked_serial, mocked_open, mocked_get, mocked_set, capsys):
def test_main_set_with_invalid(mocked_findports, mocked_serial, mocked_open, mocked_hupcl, capsys):
"""Test --set with invalid field"""
sys.argv = ["", "--set", "foo", "foo"]
mt_config.args = sys.argv
@@ -1094,12 +1079,11 @@ def test_main_set_with_invalid(mocked_findports, mocked_serial, mocked_open, moc
# TODO: write some negative --configure tests
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
@patch("termios.tcsetattr")
@patch("termios.tcgetattr")
@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios")
@patch("builtins.open", new_callable=mock_open, read_data="data")
@patch("serial.Serial")
@patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"])
def test_main_configure_with_snake_case(mocked_findports, mocked_serial, mocked_open, mocked_get, mocked_set, capsys):
def test_main_configure_with_snake_case(mocked_findports, mocked_serial, mocked_open, mocked_hupcl, capsys):
"""Test --configure with valid file"""
sys.argv = ["", "--configure", "example_config.yaml"]
mt_config.args = sys.argv
@@ -1127,12 +1111,11 @@ def test_main_configure_with_snake_case(mocked_findports, mocked_serial, mocked_
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
@patch("termios.tcsetattr")
@patch("termios.tcgetattr")
@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios")
@patch("builtins.open", new_callable=mock_open, read_data="data")
@patch("serial.Serial")
@patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"])
def test_main_configure_with_camel_case_keys(mocked_findports, mocked_serial, mocked_open, mocked_get, mocked_set, capsys):
def test_main_configure_with_camel_case_keys(mocked_findports, mocked_serial, mocked_open, mocked_hupcl, capsys):
"""Test --configure with valid file"""
sys.argv = ["", "--configure", "exampleConfig.yaml"]
mt_config.args = sys.argv
@@ -2729,17 +2712,16 @@ def test_tunnel_subnet_arg_with_no_devices(mock_platform_system, caplog, capsys)
assert err == ""
@pytest.mark.skipif(sys.platform == "win32", reason="Linux is forced in test and no termios")
@pytest.mark.skipif(sys.platform == "win32", reason="on windows is no fcntl module")
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
@patch("platform.system")
@patch("termios.tcsetattr")
@patch("termios.tcgetattr")
@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios")
@patch("builtins.open", new_callable=mock_open, read_data="data")
@patch("serial.Serial")
@patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"])
def test_tunnel_tunnel_arg(
mocked_findPorts, mocked_serial, mocked_open, mock_get, mock_set, mock_platform_system, caplog, iface_with_nodes, capsys
mocked_findPorts, mocked_serial, mocked_open, mock_hupcl, mock_platform_system, caplog, iface_with_nodes, capsys
):
"""Test tunnel with tunnel arg (act like we are on a linux system)"""

View File

@@ -13,13 +13,12 @@ from ..protobuf import config_pb2
@pytest.mark.unit
@patch("time.sleep")
@patch("termios.tcsetattr")
@patch("termios.tcgetattr")
@patch("meshtastic.serial_interface.SerialInterface._set_hupcl_with_termios")
@patch("builtins.open", new_callable=mock_open, read_data="data")
@patch("serial.Serial")
@patch("meshtastic.util.findPorts", return_value=["/dev/ttyUSBfake"])
def test_SerialInterface_single_port(
mocked_findPorts, mocked_serial, mocked_open, mock_get, mock_set, mock_sleep, capsys
mocked_findPorts, mocked_serial, mocked_open, mock_hupcl, mock_sleep, capsys
):
"""Test that we can instantiate a SerialInterface with a single port"""
iface = SerialInterface(noProto=True)
@@ -29,12 +28,11 @@ def test_SerialInterface_single_port(
iface.close()
mocked_findPorts.assert_called()
mocked_serial.assert_called()
mock_hupcl.assert_called()
# doesn't get called in SerialInterface.__init__ on windows
# doesn't get called in SerialInterface._set_hupcl_with_termios on windows
if platform.system() != "Windows":
mocked_open.assert_called()
mock_get.assert_called()
mock_set.assert_called()
mock_sleep.assert_called()
out, err = capsys.readouterr()