diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 89390e6..8b866de 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -1414,13 +1414,59 @@ def common(): print(f"Found: name='{x.name}' address='{x.address}'") meshtastic.util.our_exit("BLE scan finished", 0) elif args.ble: - client = BLEInterface( - args.ble if args.ble != "any" else None, - debugOut=logfile, - noProto=args.noproto, - noNodes=args.no_nodes, - timeout=args.timeout, - ) + try: + client = BLEInterface( + args.ble if args.ble != "any" else None, + debugOut=logfile, + noProto=args.noproto, + noNodes=args.no_nodes, + timeout=args.timeout, + ) + except BLEInterface.BLEError as e: + if e.kind == BLEInterface.BLEError.DEVICE_NOT_FOUND: + meshtastic.util.our_exit( + "BLE device not found.\n\n" + "Possible causes:\n" + " - Bluetooth is disabled on the Meshtastic device\n" + " - Device is in deep sleep mode\n" + " - Device is out of range\n\n" + "Try:\n" + " - Press the reset button on your device\n" + " - Run 'meshtastic --ble-scan' to see available devices", + 1, + ) + elif e.kind == BLEInterface.BLEError.MULTIPLE_DEVICES: + meshtastic.util.our_exit( + "Multiple Meshtastic BLE devices found.\n\n" + "Please specify which device to connect to:\n" + " - Run 'meshtastic --ble-scan' to list devices\n" + " - Use 'meshtastic --ble ' to connect", + 1, + ) + elif e.kind == BLEInterface.BLEError.WRITE_ERROR: + meshtastic.util.our_exit( + "Failed to write to BLE device.\n\n" + "Possible causes:\n" + " - Device requires pairing PIN (check device screen)\n" + " - On Linux: user not in 'bluetooth' group\n" + " - Connection was interrupted\n\n" + "Try:\n" + " - Restart Bluetooth on your computer\n" + " - Reset the Meshtastic device", + 1, + ) + elif e.kind == BLEInterface.BLEError.READ_ERROR: + meshtastic.util.our_exit( + "Failed to read from BLE device.\n\n" + "The device may have disconnected unexpectedly.\n\n" + "Try:\n" + " - Move closer to the device\n" + " - Reset the Meshtastic device\n" + " - Restart Bluetooth on your computer", + 1, + ) + else: + meshtastic.util.our_exit(f"BLE error: {e}", 1) elif args.host: try: if ":" in args.host: @@ -1476,6 +1522,23 @@ def common(): message += " Please close any applications or webpages that may be using the device and try again.\n" message += f"\nOriginal error: {ex}" meshtastic.util.our_exit(message) + except MeshInterface.MeshInterfaceError as ex: + msg = str(ex) + if "Timed out" in msg: + meshtastic.util.our_exit( + "Connection timed out.\n\n" + "Possible causes:\n" + " - Device is rebooting\n" + " - Device firmware is updating\n" + " - Serial connection was interrupted\n\n" + "Try:\n" + " - Wait a few seconds and try again\n" + " - Check if device is fully booted (LED patterns)\n" + " - Reconnect the USB cable", + 1, + ) + else: + meshtastic.util.our_exit(f"Connection error: {ex}", 1) if client.devPath is None: try: client = meshtastic.tcp_interface.TCPInterface( diff --git a/meshtastic/ble_interface.py b/meshtastic/ble_interface.py index 0237672..64a6361 100644 --- a/meshtastic/ble_interface.py +++ b/meshtastic/ble_interface.py @@ -33,6 +33,16 @@ class BLEInterface(MeshInterface): class BLEError(Exception): """An exception class for BLE errors.""" + DEVICE_NOT_FOUND = "device_not_found" + MULTIPLE_DEVICES = "multiple_devices" + READ_ERROR = "read_error" + WRITE_ERROR = "write_error" + UNKNOWN = "unknown" + + def __init__(self, message: str, kind: str = UNKNOWN): + super().__init__(message) + self.kind = kind + def __init__( # pylint: disable=R0917 self, address: Optional[str], @@ -157,11 +167,13 @@ class BLEInterface(MeshInterface): if len(addressed_devices) == 0: raise BLEInterface.BLEError( - f"No Meshtastic BLE peripheral with identifier or address '{address}' found. Try --ble-scan to find it." + f"No Meshtastic BLE peripheral with identifier or address '{address}' found. Try --ble-scan to find it.", + BLEInterface.BLEError.DEVICE_NOT_FOUND, ) if len(addressed_devices) > 1: raise BLEInterface.BLEError( - f"More than one Meshtastic BLE peripheral with identifier or address '{address}' found." + f"More than one Meshtastic BLE peripheral with identifier or address '{address}' found.", + BLEInterface.BLEError.MULTIPLE_DEVICES, ) return addressed_devices[0] @@ -204,7 +216,10 @@ class BLEInterface(MeshInterface): logger.debug(f"Device disconnected, shutting down {e}") self._want_receive = False else: - raise BLEInterface.BLEError("Error reading BLE") from e + raise BLEInterface.BLEError( + "Error reading BLE", + BLEInterface.BLEError.READ_ERROR, + ) from e if not b: if retries < 5: time.sleep(0.1) @@ -227,7 +242,8 @@ class BLEInterface(MeshInterface): # search Bleak src for org.bluez.Error.InProgress except Exception as e: raise BLEInterface.BLEError( - "Error writing BLE (are you in the 'bluetooth' user group? did you enter the pairing PIN on your computer?)" + "Error writing BLE (are you in the 'bluetooth' user group? did you enter the pairing PIN on your computer?)", + BLEInterface.BLEError.WRITE_ERROR, ) from e # Allow to propagate and then make sure we read time.sleep(0.01) diff --git a/meshtastic/tests/test_ble_interface.py b/meshtastic/tests/test_ble_interface.py new file mode 100644 index 0000000..0b725e6 --- /dev/null +++ b/meshtastic/tests/test_ble_interface.py @@ -0,0 +1,69 @@ +"""Meshtastic unit tests for ble_interface.py""" + +from unittest.mock import MagicMock, patch + +import pytest +from bleak.exc import BleakError + +from ..ble_interface import BLEInterface + + +@pytest.mark.unit +def test_ble_error_default_kind_unknown(): + """BLEError defaults to UNKNOWN kind.""" + error = BLEInterface.BLEError("test") + assert error.kind == BLEInterface.BLEError.UNKNOWN + + +@pytest.mark.unit +def test_ble_find_device_not_found_sets_kind(): + """find_device emits DEVICE_NOT_FOUND for no scan results.""" + iface = object.__new__(BLEInterface) + with patch("meshtastic.ble_interface.BLEInterface.scan", return_value=[]): + with pytest.raises(BLEInterface.BLEError) as excinfo: + iface.find_device("missing") + assert excinfo.value.kind == BLEInterface.BLEError.DEVICE_NOT_FOUND + + +@pytest.mark.unit +def test_ble_find_device_multiple_sets_kind(): + """find_device emits MULTIPLE_DEVICES for ambiguous matches.""" + iface = object.__new__(BLEInterface) + first = MagicMock() + first.name = "dup" + first.address = "AA:AA:AA:AA:AA:01" + second = MagicMock() + second.name = "dup" + second.address = "AA:AA:AA:AA:AA:02" + with patch( + "meshtastic.ble_interface.BLEInterface.scan", return_value=[first, second] + ): + with pytest.raises(BLEInterface.BLEError) as excinfo: + iface.find_device("dup") + assert excinfo.value.kind == BLEInterface.BLEError.MULTIPLE_DEVICES + + +@pytest.mark.unit +def test_ble_send_to_radio_wraps_write_errors_with_kind(): + """_sendToRadioImpl wraps write failures with WRITE_ERROR.""" + iface = object.__new__(BLEInterface) + iface.client = MagicMock() + iface.client.write_gatt_char.side_effect = RuntimeError("boom") + to_radio = MagicMock() + to_radio.SerializeToString.return_value = b"\x01" + with pytest.raises(BLEInterface.BLEError) as excinfo: + iface._sendToRadioImpl(to_radio) + assert excinfo.value.kind == BLEInterface.BLEError.WRITE_ERROR + + +@pytest.mark.unit +def test_ble_receive_wraps_unexpected_bleak_error_with_kind(): + """_receiveFromRadioImpl wraps unexpected BleakError with READ_ERROR.""" + iface = object.__new__(BLEInterface) + iface.should_read = True + iface._want_receive = True + iface.client = MagicMock() + iface.client.read_gatt_char.side_effect = BleakError("some other BLE failure") + with pytest.raises(BLEInterface.BLEError) as excinfo: + iface._receiveFromRadioImpl() + assert excinfo.value.kind == BLEInterface.BLEError.READ_ERROR diff --git a/meshtastic/tests/test_main.py b/meshtastic/tests/test_main.py index b664665..60f3520 100644 --- a/meshtastic/tests/test_main.py +++ b/meshtastic/tests/test_main.py @@ -11,6 +11,7 @@ from types import SimpleNamespace from unittest.mock import mock_open, MagicMock, patch import pytest +import meshtastic.__main__ as mt_main from meshtastic.__main__ import ( export_config, @@ -27,6 +28,7 @@ from meshtastic import mt_config from ..protobuf.channel_pb2 import Channel # pylint: disable=E0611 # from ..ble_interface import BLEInterface +from ..mesh_interface import MeshInterface from ..node import Node # from ..radioconfig_pb2 import UserPreferences @@ -261,6 +263,113 @@ def test_main_info_with_permission_error(patched_getlogin, capsys, caplog): assert err == "" +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +def test_main_ble_device_not_found_message(capsys): + """Test BLE device-not-found help text.""" + sys.argv = ["", "--info", "--ble", "any"] + mt_config.args = sys.argv + + with patch("meshtastic.__main__.BLEInterface.__init__") as mock_ble_init: + mock_ble_init.side_effect = mt_main.BLEInterface.BLEError( + "missing", + mt_main.BLEInterface.BLEError.DEVICE_NOT_FOUND, + ) + with pytest.raises(SystemExit) as excinfo: + main() + + out, err = capsys.readouterr() + assert excinfo.value.code == 1 + assert re.search(r"BLE device not found", out, re.MULTILINE) + assert re.search(r"--ble-scan", out, re.MULTILINE) + assert err == "" + + +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +def test_main_ble_multiple_devices_message(capsys): + """Test BLE multiple-devices help text.""" + sys.argv = ["", "--info", "--ble", "any"] + mt_config.args = sys.argv + + with patch("meshtastic.__main__.BLEInterface.__init__") as mock_ble_init: + mock_ble_init.side_effect = mt_main.BLEInterface.BLEError( + "multiple", + mt_main.BLEInterface.BLEError.MULTIPLE_DEVICES, + ) + with pytest.raises(SystemExit) as excinfo: + main() + + out, err = capsys.readouterr() + assert excinfo.value.code == 1 + assert re.search(r"Multiple Meshtastic BLE devices found", out, re.MULTILINE) + assert re.search(r"meshtastic --ble ", out, re.MULTILINE) + assert err == "" + + +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +def test_main_ble_write_error_message(capsys): + """Test BLE write-error help text.""" + sys.argv = ["", "--info", "--ble", "any"] + mt_config.args = sys.argv + + with patch("meshtastic.__main__.BLEInterface.__init__") as mock_ble_init: + mock_ble_init.side_effect = mt_main.BLEInterface.BLEError( + "write fail", + mt_main.BLEInterface.BLEError.WRITE_ERROR, + ) + with pytest.raises(SystemExit) as excinfo: + main() + + out, err = capsys.readouterr() + assert excinfo.value.code == 1 + assert re.search(r"Failed to write to BLE device", out, re.MULTILINE) + assert re.search(r"user not in 'bluetooth' group", out, re.MULTILINE) + assert err == "" + + +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +def test_main_ble_read_error_message(capsys): + """Test BLE read-error help text.""" + sys.argv = ["", "--info", "--ble", "any"] + mt_config.args = sys.argv + + with patch("meshtastic.__main__.BLEInterface.__init__") as mock_ble_init: + mock_ble_init.side_effect = mt_main.BLEInterface.BLEError( + "read fail", + mt_main.BLEInterface.BLEError.READ_ERROR, + ) + with pytest.raises(SystemExit) as excinfo: + main() + + out, err = capsys.readouterr() + assert excinfo.value.code == 1 + assert re.search(r"Failed to read from BLE device", out, re.MULTILINE) + assert re.search(r"Move closer to the device", out, re.MULTILINE) + assert err == "" + + +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +def test_main_serial_timeout_message(capsys): + """Test serial timeout help text.""" + sys.argv = ["", "--info"] + mt_config.args = sys.argv + + with patch("meshtastic.serial_interface.SerialInterface") as mock_serial: + mock_serial.side_effect = MeshInterface.MeshInterfaceError("Timed out waiting") + with pytest.raises(SystemExit) as excinfo: + main() + + out, err = capsys.readouterr() + assert excinfo.value.code == 1 + assert re.search(r"Connection timed out", out, re.MULTILINE) + assert re.search(r"Device is rebooting", out, re.MULTILINE) + assert err == "" + + @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_main_info_with_tcp_interface(capsys):