"""Bluetooth interface """ import asyncio import atexit import logging import struct import time import io from threading import Thread from typing import List, Optional import google.protobuf from bleak import BleakClient, BleakScanner, BLEDevice from bleak.exc import BleakDBusError, BleakError from meshtastic.mesh_interface import MeshInterface from .protobuf import mesh_pb2 SERVICE_UUID = "6ba1b218-15a8-461f-9fa8-5dcae273eafd" TORADIO_UUID = "f75c76d2-129e-4dad-a1dd-7866124401e7" FROMRADIO_UUID = "2c55e69e-4993-11ed-b878-0242ac120002" FROMNUM_UUID = "ed9da18c-a800-4f66-a670-aa7547e34453" LEGACY_LOGRADIO_UUID = "6c6fd238-78fa-436b-aacf-15c5be1ef2e2" LOGRADIO_UUID = "5a3d6e49-06e6-4423-9944-e9de8cdf9547" logger = logging.getLogger(__name__) class BLEInterface(MeshInterface): """MeshInterface using BLE to connect to devices.""" class BLEError(Exception): """An exception class for BLE errors.""" def __init__( # pylint: disable=R0917 self, address: Optional[str], noProto: bool = False, debugOut: Optional[io.TextIOWrapper]=None, noNodes: bool = False, timeout: int = 300, ) -> None: MeshInterface.__init__( self, debugOut=debugOut, noProto=noProto, noNodes=noNodes, timeout=timeout ) self.should_read = False logger.debug("Threads starting") self._want_receive = True self._receiveThread: Optional[Thread] = Thread( target=self._receiveFromRadioImpl, name="BLEReceive", daemon=True ) self._receiveThread.start() logger.debug("Threads running") self.client: Optional[BLEClient] = None try: logger.debug(f"BLE connecting to: {address if address else 'any'}") self.client = self.connect(address) logger.debug("BLE connected") except BLEInterface.BLEError as e: self.close() raise e if self.client.has_characteristic(LEGACY_LOGRADIO_UUID): self.client.start_notify( LEGACY_LOGRADIO_UUID, self.legacy_log_radio_handler ) if self.client.has_characteristic(LOGRADIO_UUID): self.client.start_notify(LOGRADIO_UUID, self.log_radio_handler) logger.debug("Mesh configure starting") self._startConfig() if not self.noProto: self._waitConnected(timeout=60.0) self.waitForConfig() logger.debug("Register FROMNUM notify callback") self.client.start_notify(FROMNUM_UUID, self.from_num_handler) # We MUST run atexit (if we can) because otherwise (at least on linux) the BLE device is not disconnected # and future connection attempts will fail. (BlueZ kinda sucks) # Note: the on disconnected callback will call our self.close which will make us nicely wait for threads to exit self._exit_handler = atexit.register(self.client.disconnect) def __repr__(self): rep = f"BLEInterface(address={self.client.address if self.client else None!r}" if self.debugOut is not None: rep += f", debugOut={self.debugOut!r}" if self.noProto: rep += ", noProto=True" if self.noNodes: rep += ", noNodes=True" rep += ")" return rep def from_num_handler(self, _, b: bytes) -> None: # pylint: disable=C0116 """Handle callbacks for fromnum notify. Note: this method does not need to be async because it is just setting a bool. """ from_num = struct.unpack(" List[BLEDevice]: """Scan for available BLE devices.""" with BLEClient() as client: logger.info("Scanning for BLE devices (takes 10 seconds)...") response = client.discover( timeout=10, return_adv=True, service_uuids=[SERVICE_UUID] ) devices = response.values() # bleak sometimes returns devices we didn't ask for, so filter the response # to only return true meshtastic devices # d[0] is the device. d[1] is the advertisement data devices = list( filter(lambda d: SERVICE_UUID in d[1].service_uuids, devices) ) return list(map(lambda d: d[0], devices)) def find_device(self, address: Optional[str]) -> BLEDevice: """Find a device by address.""" addressed_devices = BLEInterface.scan() if address: addressed_devices = list( filter( lambda x: address in (x.name, x.address), addressed_devices, ) ) if len(addressed_devices) == 0: raise BLEInterface.BLEError( f"No Meshtastic BLE peripheral with identifier or address '{address}' found. Try --ble-scan to find it." ) if len(addressed_devices) > 1: raise BLEInterface.BLEError( f"More than one Meshtastic BLE peripheral with identifier or address '{address}' found." ) return addressed_devices[0] def _sanitize_address(self, address: Optional[str]) -> Optional[str]: # pylint: disable=E0213 "Standardize BLE address by removing extraneous characters and lowercasing." if address is None: return None else: return address.replace("-", "").replace("_", "").replace(":", "").lower() def connect(self, address: Optional[str] = None) -> "BLEClient": "Connect to a device by address." # Bleak docs recommend always doing a scan before connecting (even if we know addr) device = self.find_device(address) client = BLEClient(device.address, disconnected_callback=lambda _: self.close()) client.connect() client.discover() return client def _receiveFromRadioImpl(self) -> None: while self._want_receive: if self.should_read: self.should_read = False retries: int = 0 while self._want_receive: if self.client is None: logger.debug(f"BLE client is None, shutting down") self._want_receive = False continue try: b = bytes(self.client.read_gatt_char(FROMRADIO_UUID)) except BleakDBusError as e: # Device disconnected probably, so end our read loop immediately logger.debug(f"Device disconnected, shutting down {e}") self._want_receive = False except BleakError as e: # We were definitely disconnected if "Not connected" in str(e): logger.debug(f"Device disconnected, shutting down {e}") self._want_receive = False else: raise BLEInterface.BLEError("Error reading BLE") from e if not b: if retries < 5: time.sleep(0.1) retries += 1 continue break logger.debug(f"FROMRADIO read: {b.hex()}") self._handleFromRadio(b) else: time.sleep(0.01) def _sendToRadioImpl(self, toRadio) -> None: b: bytes = toRadio.SerializeToString() if b and self.client: # we silently ignore writes while we are shutting down logger.debug(f"TORADIO write: {b.hex()}") try: self.client.write_gatt_char( TORADIO_UUID, b, response=True ) # FIXME: or False? # search Bleak src for org.bluez.Error.InProgress except Exception as e: raise BLEInterface.BLEError( "Error writing BLE (are you in the 'bluetooth' user group? did you enter the pairing PIN on your computer?)" ) from e # Allow to propagate and then make sure we read time.sleep(0.01) self.should_read = True def close(self) -> None: try: MeshInterface.close(self) except Exception as e: logger.error(f"Error closing mesh interface: {e}") if self._want_receive: self._want_receive = False # Tell the thread we want it to stop if self._receiveThread: self._receiveThread.join( timeout=2 ) # If bleak is hung, don't wait for the thread to exit (it is critical we disconnect) self._receiveThread = None if self.client: atexit.unregister(self._exit_handler) self.client.disconnect() self.client.close() self.client = None self._disconnected() # send the disconnected indicator up to clients class BLEClient: """Client for managing connection to a BLE device""" def __init__(self, address=None, **kwargs) -> None: self._eventLoop = asyncio.new_event_loop() self._eventThread = Thread( target=self._run_event_loop, name="BLEClient", daemon=True ) self._eventThread.start() if not address: logger.debug("No address provided - only discover method will work.") return self.bleak_client = BleakClient(address, **kwargs) def discover(self, **kwargs): # pylint: disable=C0116 return self.async_await(BleakScanner.discover(**kwargs)) def pair(self, **kwargs): # pylint: disable=C0116 return self.async_await(self.bleak_client.pair(**kwargs)) def connect(self, **kwargs): # pylint: disable=C0116 return self.async_await(self.bleak_client.connect(**kwargs)) def disconnect(self, **kwargs): # pylint: disable=C0116 self.async_await(self.bleak_client.disconnect(**kwargs)) def read_gatt_char(self, *args, **kwargs): # pylint: disable=C0116 return self.async_await(self.bleak_client.read_gatt_char(*args, **kwargs)) def write_gatt_char(self, *args, **kwargs): # pylint: disable=C0116 self.async_await(self.bleak_client.write_gatt_char(*args, **kwargs)) def has_characteristic(self, specifier): """Check if the connected node supports a specified characteristic.""" return bool(self.bleak_client.services.get_characteristic(specifier)) def start_notify(self, *args, **kwargs): # pylint: disable=C0116 self.async_await(self.bleak_client.start_notify(*args, **kwargs)) def close(self): # pylint: disable=C0116 self.async_run(self._stop_event_loop()) self._eventThread.join() def __enter__(self): return self def __exit__(self, _type, _value, _traceback): self.close() def async_await(self, coro, timeout=None): # pylint: disable=C0116 return self.async_run(coro).result(timeout) def async_run(self, coro): # pylint: disable=C0116 return asyncio.run_coroutine_threadsafe(coro, self._eventLoop) def _run_event_loop(self): try: self._eventLoop.run_forever() finally: self._eventLoop.close() async def _stop_event_loop(self): self._eventLoop.stop()