mirror of
https://github.com/meshtastic/python.git
synced 2026-06-02 12:45:00 -04:00
@@ -1414,13 +1414,59 @@ def common():
|
||||
print(f"Found: name='{x.name}' address='{x.address}'")
|
||||
meshtastic.util.our_exit("BLE scan finished", 0)
|
||||
elif args.ble:
|
||||
client = BLEInterface(
|
||||
args.ble if args.ble != "any" else None,
|
||||
debugOut=logfile,
|
||||
noProto=args.noproto,
|
||||
noNodes=args.no_nodes,
|
||||
timeout=args.timeout,
|
||||
)
|
||||
try:
|
||||
client = BLEInterface(
|
||||
args.ble if args.ble != "any" else None,
|
||||
debugOut=logfile,
|
||||
noProto=args.noproto,
|
||||
noNodes=args.no_nodes,
|
||||
timeout=args.timeout,
|
||||
)
|
||||
except BLEInterface.BLEError as e:
|
||||
if e.kind == BLEInterface.BLEError.DEVICE_NOT_FOUND:
|
||||
meshtastic.util.our_exit(
|
||||
"BLE device not found.\n\n"
|
||||
"Possible causes:\n"
|
||||
" - Bluetooth is disabled on the Meshtastic device\n"
|
||||
" - Device is in deep sleep mode\n"
|
||||
" - Device is out of range\n\n"
|
||||
"Try:\n"
|
||||
" - Press the reset button on your device\n"
|
||||
" - Run 'meshtastic --ble-scan' to see available devices",
|
||||
1,
|
||||
)
|
||||
elif e.kind == BLEInterface.BLEError.MULTIPLE_DEVICES:
|
||||
meshtastic.util.our_exit(
|
||||
"Multiple Meshtastic BLE devices found.\n\n"
|
||||
"Please specify which device to connect to:\n"
|
||||
" - Run 'meshtastic --ble-scan' to list devices\n"
|
||||
" - Use 'meshtastic --ble <name_or_address>' to connect",
|
||||
1,
|
||||
)
|
||||
elif e.kind == BLEInterface.BLEError.WRITE_ERROR:
|
||||
meshtastic.util.our_exit(
|
||||
"Failed to write to BLE device.\n\n"
|
||||
"Possible causes:\n"
|
||||
" - Device requires pairing PIN (check device screen)\n"
|
||||
" - On Linux: user not in 'bluetooth' group\n"
|
||||
" - Connection was interrupted\n\n"
|
||||
"Try:\n"
|
||||
" - Restart Bluetooth on your computer\n"
|
||||
" - Reset the Meshtastic device",
|
||||
1,
|
||||
)
|
||||
elif e.kind == BLEInterface.BLEError.READ_ERROR:
|
||||
meshtastic.util.our_exit(
|
||||
"Failed to read from BLE device.\n\n"
|
||||
"The device may have disconnected unexpectedly.\n\n"
|
||||
"Try:\n"
|
||||
" - Move closer to the device\n"
|
||||
" - Reset the Meshtastic device\n"
|
||||
" - Restart Bluetooth on your computer",
|
||||
1,
|
||||
)
|
||||
else:
|
||||
meshtastic.util.our_exit(f"BLE error: {e}", 1)
|
||||
elif args.host:
|
||||
try:
|
||||
if ":" in args.host:
|
||||
@@ -1476,6 +1522,23 @@ def common():
|
||||
message += " Please close any applications or webpages that may be using the device and try again.\n"
|
||||
message += f"\nOriginal error: {ex}"
|
||||
meshtastic.util.our_exit(message)
|
||||
except MeshInterface.MeshInterfaceError as ex:
|
||||
msg = str(ex)
|
||||
if "Timed out" in msg:
|
||||
meshtastic.util.our_exit(
|
||||
"Connection timed out.\n\n"
|
||||
"Possible causes:\n"
|
||||
" - Device is rebooting\n"
|
||||
" - Device firmware is updating\n"
|
||||
" - Serial connection was interrupted\n\n"
|
||||
"Try:\n"
|
||||
" - Wait a few seconds and try again\n"
|
||||
" - Check if device is fully booted (LED patterns)\n"
|
||||
" - Reconnect the USB cable",
|
||||
1,
|
||||
)
|
||||
else:
|
||||
meshtastic.util.our_exit(f"Connection error: {ex}", 1)
|
||||
if client.devPath is None:
|
||||
try:
|
||||
client = meshtastic.tcp_interface.TCPInterface(
|
||||
|
||||
@@ -33,6 +33,16 @@ class BLEInterface(MeshInterface):
|
||||
class BLEError(Exception):
|
||||
"""An exception class for BLE errors."""
|
||||
|
||||
DEVICE_NOT_FOUND = "device_not_found"
|
||||
MULTIPLE_DEVICES = "multiple_devices"
|
||||
READ_ERROR = "read_error"
|
||||
WRITE_ERROR = "write_error"
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
def __init__(self, message: str, kind: str = UNKNOWN):
|
||||
super().__init__(message)
|
||||
self.kind = kind
|
||||
|
||||
def __init__( # pylint: disable=R0917
|
||||
self,
|
||||
address: Optional[str],
|
||||
@@ -157,11 +167,13 @@ class BLEInterface(MeshInterface):
|
||||
|
||||
if len(addressed_devices) == 0:
|
||||
raise BLEInterface.BLEError(
|
||||
f"No Meshtastic BLE peripheral with identifier or address '{address}' found. Try --ble-scan to find it."
|
||||
f"No Meshtastic BLE peripheral with identifier or address '{address}' found. Try --ble-scan to find it.",
|
||||
BLEInterface.BLEError.DEVICE_NOT_FOUND,
|
||||
)
|
||||
if len(addressed_devices) > 1:
|
||||
raise BLEInterface.BLEError(
|
||||
f"More than one Meshtastic BLE peripheral with identifier or address '{address}' found."
|
||||
f"More than one Meshtastic BLE peripheral with identifier or address '{address}' found.",
|
||||
BLEInterface.BLEError.MULTIPLE_DEVICES,
|
||||
)
|
||||
return addressed_devices[0]
|
||||
|
||||
@@ -204,7 +216,10 @@ class BLEInterface(MeshInterface):
|
||||
logger.debug(f"Device disconnected, shutting down {e}")
|
||||
self._want_receive = False
|
||||
else:
|
||||
raise BLEInterface.BLEError("Error reading BLE") from e
|
||||
raise BLEInterface.BLEError(
|
||||
"Error reading BLE",
|
||||
BLEInterface.BLEError.READ_ERROR,
|
||||
) from e
|
||||
if not b:
|
||||
if retries < 5:
|
||||
time.sleep(0.1)
|
||||
@@ -227,7 +242,8 @@ class BLEInterface(MeshInterface):
|
||||
# search Bleak src for org.bluez.Error.InProgress
|
||||
except Exception as e:
|
||||
raise BLEInterface.BLEError(
|
||||
"Error writing BLE (are you in the 'bluetooth' user group? did you enter the pairing PIN on your computer?)"
|
||||
"Error writing BLE (are you in the 'bluetooth' user group? did you enter the pairing PIN on your computer?)",
|
||||
BLEInterface.BLEError.WRITE_ERROR,
|
||||
) from e
|
||||
# Allow to propagate and then make sure we read
|
||||
time.sleep(0.01)
|
||||
|
||||
69
meshtastic/tests/test_ble_interface.py
Normal file
69
meshtastic/tests/test_ble_interface.py
Normal file
@@ -0,0 +1,69 @@
|
||||
"""Meshtastic unit tests for ble_interface.py"""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from bleak.exc import BleakError
|
||||
|
||||
from ..ble_interface import BLEInterface
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_ble_error_default_kind_unknown():
|
||||
"""BLEError defaults to UNKNOWN kind."""
|
||||
error = BLEInterface.BLEError("test")
|
||||
assert error.kind == BLEInterface.BLEError.UNKNOWN
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_ble_find_device_not_found_sets_kind():
|
||||
"""find_device emits DEVICE_NOT_FOUND for no scan results."""
|
||||
iface = object.__new__(BLEInterface)
|
||||
with patch("meshtastic.ble_interface.BLEInterface.scan", return_value=[]):
|
||||
with pytest.raises(BLEInterface.BLEError) as excinfo:
|
||||
iface.find_device("missing")
|
||||
assert excinfo.value.kind == BLEInterface.BLEError.DEVICE_NOT_FOUND
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_ble_find_device_multiple_sets_kind():
|
||||
"""find_device emits MULTIPLE_DEVICES for ambiguous matches."""
|
||||
iface = object.__new__(BLEInterface)
|
||||
first = MagicMock()
|
||||
first.name = "dup"
|
||||
first.address = "AA:AA:AA:AA:AA:01"
|
||||
second = MagicMock()
|
||||
second.name = "dup"
|
||||
second.address = "AA:AA:AA:AA:AA:02"
|
||||
with patch(
|
||||
"meshtastic.ble_interface.BLEInterface.scan", return_value=[first, second]
|
||||
):
|
||||
with pytest.raises(BLEInterface.BLEError) as excinfo:
|
||||
iface.find_device("dup")
|
||||
assert excinfo.value.kind == BLEInterface.BLEError.MULTIPLE_DEVICES
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_ble_send_to_radio_wraps_write_errors_with_kind():
|
||||
"""_sendToRadioImpl wraps write failures with WRITE_ERROR."""
|
||||
iface = object.__new__(BLEInterface)
|
||||
iface.client = MagicMock()
|
||||
iface.client.write_gatt_char.side_effect = RuntimeError("boom")
|
||||
to_radio = MagicMock()
|
||||
to_radio.SerializeToString.return_value = b"\x01"
|
||||
with pytest.raises(BLEInterface.BLEError) as excinfo:
|
||||
iface._sendToRadioImpl(to_radio)
|
||||
assert excinfo.value.kind == BLEInterface.BLEError.WRITE_ERROR
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_ble_receive_wraps_unexpected_bleak_error_with_kind():
|
||||
"""_receiveFromRadioImpl wraps unexpected BleakError with READ_ERROR."""
|
||||
iface = object.__new__(BLEInterface)
|
||||
iface.should_read = True
|
||||
iface._want_receive = True
|
||||
iface.client = MagicMock()
|
||||
iface.client.read_gatt_char.side_effect = BleakError("some other BLE failure")
|
||||
with pytest.raises(BLEInterface.BLEError) as excinfo:
|
||||
iface._receiveFromRadioImpl()
|
||||
assert excinfo.value.kind == BLEInterface.BLEError.READ_ERROR
|
||||
@@ -11,6 +11,7 @@ from types import SimpleNamespace
|
||||
from unittest.mock import mock_open, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
import meshtastic.__main__ as mt_main
|
||||
|
||||
from meshtastic.__main__ import (
|
||||
export_config,
|
||||
@@ -27,6 +28,7 @@ from meshtastic import mt_config
|
||||
from ..protobuf.channel_pb2 import Channel # pylint: disable=E0611
|
||||
|
||||
# from ..ble_interface import BLEInterface
|
||||
from ..mesh_interface import MeshInterface
|
||||
from ..node import Node
|
||||
|
||||
# from ..radioconfig_pb2 import UserPreferences
|
||||
@@ -261,6 +263,113 @@ def test_main_info_with_permission_error(patched_getlogin, capsys, caplog):
|
||||
assert err == ""
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_mt_config")
|
||||
def test_main_ble_device_not_found_message(capsys):
|
||||
"""Test BLE device-not-found help text."""
|
||||
sys.argv = ["", "--info", "--ble", "any"]
|
||||
mt_config.args = sys.argv
|
||||
|
||||
with patch("meshtastic.__main__.BLEInterface.__init__") as mock_ble_init:
|
||||
mock_ble_init.side_effect = mt_main.BLEInterface.BLEError(
|
||||
"missing",
|
||||
mt_main.BLEInterface.BLEError.DEVICE_NOT_FOUND,
|
||||
)
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
main()
|
||||
|
||||
out, err = capsys.readouterr()
|
||||
assert excinfo.value.code == 1
|
||||
assert re.search(r"BLE device not found", out, re.MULTILINE)
|
||||
assert re.search(r"--ble-scan", out, re.MULTILINE)
|
||||
assert err == ""
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_mt_config")
|
||||
def test_main_ble_multiple_devices_message(capsys):
|
||||
"""Test BLE multiple-devices help text."""
|
||||
sys.argv = ["", "--info", "--ble", "any"]
|
||||
mt_config.args = sys.argv
|
||||
|
||||
with patch("meshtastic.__main__.BLEInterface.__init__") as mock_ble_init:
|
||||
mock_ble_init.side_effect = mt_main.BLEInterface.BLEError(
|
||||
"multiple",
|
||||
mt_main.BLEInterface.BLEError.MULTIPLE_DEVICES,
|
||||
)
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
main()
|
||||
|
||||
out, err = capsys.readouterr()
|
||||
assert excinfo.value.code == 1
|
||||
assert re.search(r"Multiple Meshtastic BLE devices found", out, re.MULTILINE)
|
||||
assert re.search(r"meshtastic --ble <name_or_address>", out, re.MULTILINE)
|
||||
assert err == ""
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_mt_config")
|
||||
def test_main_ble_write_error_message(capsys):
|
||||
"""Test BLE write-error help text."""
|
||||
sys.argv = ["", "--info", "--ble", "any"]
|
||||
mt_config.args = sys.argv
|
||||
|
||||
with patch("meshtastic.__main__.BLEInterface.__init__") as mock_ble_init:
|
||||
mock_ble_init.side_effect = mt_main.BLEInterface.BLEError(
|
||||
"write fail",
|
||||
mt_main.BLEInterface.BLEError.WRITE_ERROR,
|
||||
)
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
main()
|
||||
|
||||
out, err = capsys.readouterr()
|
||||
assert excinfo.value.code == 1
|
||||
assert re.search(r"Failed to write to BLE device", out, re.MULTILINE)
|
||||
assert re.search(r"user not in 'bluetooth' group", out, re.MULTILINE)
|
||||
assert err == ""
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_mt_config")
|
||||
def test_main_ble_read_error_message(capsys):
|
||||
"""Test BLE read-error help text."""
|
||||
sys.argv = ["", "--info", "--ble", "any"]
|
||||
mt_config.args = sys.argv
|
||||
|
||||
with patch("meshtastic.__main__.BLEInterface.__init__") as mock_ble_init:
|
||||
mock_ble_init.side_effect = mt_main.BLEInterface.BLEError(
|
||||
"read fail",
|
||||
mt_main.BLEInterface.BLEError.READ_ERROR,
|
||||
)
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
main()
|
||||
|
||||
out, err = capsys.readouterr()
|
||||
assert excinfo.value.code == 1
|
||||
assert re.search(r"Failed to read from BLE device", out, re.MULTILINE)
|
||||
assert re.search(r"Move closer to the device", out, re.MULTILINE)
|
||||
assert err == ""
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_mt_config")
|
||||
def test_main_serial_timeout_message(capsys):
|
||||
"""Test serial timeout help text."""
|
||||
sys.argv = ["", "--info"]
|
||||
mt_config.args = sys.argv
|
||||
|
||||
with patch("meshtastic.serial_interface.SerialInterface") as mock_serial:
|
||||
mock_serial.side_effect = MeshInterface.MeshInterfaceError("Timed out waiting")
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
main()
|
||||
|
||||
out, err = capsys.readouterr()
|
||||
assert excinfo.value.code == 1
|
||||
assert re.search(r"Connection timed out", out, re.MULTILINE)
|
||||
assert re.search(r"Device is rebooting", out, re.MULTILINE)
|
||||
assert err == ""
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_mt_config")
|
||||
def test_main_info_with_tcp_interface(capsys):
|
||||
|
||||
Reference in New Issue
Block a user