diff --git a/.vscode/launch.json b/.vscode/launch.json index aca86dd..911eb4d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -76,6 +76,14 @@ "justMyCode": true, "args": ["--debug", "--info"] }, + { + "name": "meshtastic debug BLE", + "type": "python", + "request": "launch", + "module": "meshtastic", + "justMyCode": true, + "args": ["--debug", "--ble", "--info"] + }, { "name": "meshtastic debug set region", "type": "python", diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index fa71adc..fb12a0c 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -1050,7 +1050,7 @@ def common(): meshtastic.util.our_exit("BLE scan finished", 0) return elif args.ble: - client = BLEInterface(args.ble, debugOut=logfile, noProto=args.noproto, noNodes=args.no_nodes) + client = BLEInterface(args.ble_dest, debugOut=logfile, noProto=args.noproto, noNodes=args.no_nodes) elif args.host: try: client = meshtastic.tcp_interface.TCPInterface( @@ -1119,6 +1119,12 @@ def addConnectionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParse group.add_argument( "--ble", help="The BLE device address or name to connect to", + action="store_true", + ) + + group.add_argument( + "--ble-dest", + help="The BLE device address or name to connect to", default=None, ) diff --git a/meshtastic/ble_interface.py b/meshtastic/ble_interface.py index 1c12758..4b8ddeb 100644 --- a/meshtastic/ble_interface.py +++ b/meshtastic/ble_interface.py @@ -7,7 +7,7 @@ import asyncio from threading import Thread, Event from typing import Optional -from bleak import BleakScanner, BleakClient +from bleak import BleakScanner, BleakClient, BLEDevice from meshtastic.mesh_interface import MeshInterface from meshtastic.util import our_exit @@ -35,9 +35,6 @@ class BLEInterface(MeshInterface): def __init__(self, address: Optional[str], noProto: bool = False, debugOut = None, noNodes: bool = False): self.state = BLEInterface.BLEState() - if not address: - return - self.should_read = False logging.debug("Threads starting") @@ -54,10 +51,9 @@ class BLEInterface(MeshInterface): self.client = self.connect(address) self.state.BLE = True logging.debug("BLE connected") - except BLEInterface.BLEError as e: + # except BLEInterface.BLEError as e: + finally: self.close() - our_exit(e.message, 1) - return logging.debug("Mesh init starting") MeshInterface.__init__(self, debugOut = debugOut, noProto = noProto, noNodes = noNodes) @@ -78,33 +74,35 @@ class BLEInterface(MeshInterface): self.should_read = True - def scan(self): - "Scan for available BLE devices" + def scan(self) -> list[BLEDevice]: + """Scan for available BLE devices.""" with BLEClient() as client: - return [ - (x[0], x[1]) for x in (client.discover( + response = client.discover( return_adv = True, - service_uuids = [ SERVICE_UUID ] - )).values() - ] + service_uuids=[SERVICE_UUID] + ) + + devices = response.values() + + # bleak sometimes returns devices we didn't ask for, so filter the response + # to only return true meshtastic devices + # d[0] is the device. d[1] is the advertisement data + devices = list(filter(lambda d: SERVICE_UUID in d[1].service_uuids, devices)) + return list(map(lambda d: d[0], devices)) - def find_device(self, address): - "Find a device by address" - meshtastic_devices = self.scan() + def find_device(self, address: Optional[str]) -> BLEDevice: + """Find a device by address""" + addressed_devices = self.scan() - addressed_devices = list(filter(lambda x: address in (x[1].local_name, x[0].name), meshtastic_devices)) - # If nothing is found try on the address - if len(addressed_devices) == 0: - addressed_devices = list(filter( - lambda x: BLEInterface._sanitize_address(address) == BLEInterface._sanitize_address(x[0].address), - meshtastic_devices)) + if address: + addressed_devices = list(filter(lambda x: address == x.name or address == x.address, addressed_devices)) if len(addressed_devices) == 0: raise BLEInterface.BLEError(f"No Meshtastic BLE peripheral with identifier or address '{address}' found. Try --ble-scan to find it.") if len(addressed_devices) > 1: raise BLEInterface.BLEError(f"More than one Meshtastic BLE peripheral with identifier or address '{address}' found.") - return addressed_devices[0][0] + return addressed_devices[0] def _sanitize_address(address): # pylint: disable=E0213 "Standardize BLE address by removing extraneous characters and lowercasing"