Files
python/meshtastic/ble_interface.py
Aleksei Sviridkin 9b9df9e585 fix(ble): resolve BLE hangs on macOS without --debug flag
This fixes two issues that caused BLE connections to hang on macOS
when not using the --debug flag:

1. Race condition in BLEClient event loop initialization
   - The event loop thread was started but asyncio operations were
     submitted before the loop was actually running
   - Added threading.Event synchronization to ensure the event loop
     is running before any operations are submitted
   - The ready signal is sent from within the loop via call_soon()
     to guarantee the loop is truly active

2. CoreBluetooth callback delivery on macOS
   - On macOS, CoreBluetooth requires occasional I/O operations for
     callbacks to be properly delivered to the main thread
   - Without --debug, no I/O was happening, causing callbacks to
     never be processed and operations to hang indefinitely
   - Added sys.stdout.flush() call before waiting for async results
     to trigger the necessary I/O

The --debug flag masked these issues because:
- Debug logging introduces timing delays that let the event loop start
- Logger I/O triggers the necessary callback delivery mechanism

Co-Authored-By: Claude <noreply@anthropic.com>
Signed-off-by: Aleksei Sviridkin <f@lex.la>
2026-01-25 03:06:58 +03:00

340 lines
13 KiB
Python

"""Bluetooth interface
"""
import asyncio
import atexit
import logging
import struct
import sys
import time
import io
from threading import Thread, Event
from typing import List, Optional
import google.protobuf
from bleak import BleakClient, BleakScanner, BLEDevice
from bleak.exc import BleakDBusError, BleakError
from meshtastic.mesh_interface import MeshInterface
from .protobuf import mesh_pb2
SERVICE_UUID = "6ba1b218-15a8-461f-9fa8-5dcae273eafd"
TORADIO_UUID = "f75c76d2-129e-4dad-a1dd-7866124401e7"
FROMRADIO_UUID = "2c55e69e-4993-11ed-b878-0242ac120002"
FROMNUM_UUID = "ed9da18c-a800-4f66-a670-aa7547e34453"
LEGACY_LOGRADIO_UUID = "6c6fd238-78fa-436b-aacf-15c5be1ef2e2"
LOGRADIO_UUID = "5a3d6e49-06e6-4423-9944-e9de8cdf9547"
logger = logging.getLogger(__name__)
class BLEInterface(MeshInterface):
"""MeshInterface using BLE to connect to devices."""
class BLEError(Exception):
"""An exception class for BLE errors."""
def __init__( # pylint: disable=R0917
self,
address: Optional[str],
noProto: bool = False,
debugOut: Optional[io.TextIOWrapper]=None,
noNodes: bool = False,
timeout: int = 300,
) -> None:
MeshInterface.__init__(
self, debugOut=debugOut, noProto=noProto, noNodes=noNodes, timeout=timeout
)
self.should_read = False
logger.debug("Threads starting")
self._want_receive = True
self._receiveThread: Optional[Thread] = Thread(
target=self._receiveFromRadioImpl, name="BLEReceive", daemon=True
)
self._receiveThread.start()
logger.debug("Threads running")
self.client: Optional[BLEClient] = None
try:
logger.debug(f"BLE connecting to: {address if address else 'any'}")
self.client = self.connect(address)
logger.debug("BLE connected")
except BLEInterface.BLEError as e:
self.close()
raise e
if self.client.has_characteristic(LEGACY_LOGRADIO_UUID):
self.client.start_notify(
LEGACY_LOGRADIO_UUID, self.legacy_log_radio_handler
)
if self.client.has_characteristic(LOGRADIO_UUID):
self.client.start_notify(LOGRADIO_UUID, self.log_radio_handler)
logger.debug("Mesh configure starting")
self._startConfig()
if not self.noProto:
self._waitConnected(timeout=60.0)
self.waitForConfig()
logger.debug("Register FROMNUM notify callback")
self.client.start_notify(FROMNUM_UUID, self.from_num_handler)
# We MUST run atexit (if we can) because otherwise (at least on linux) the BLE device is not disconnected
# and future connection attempts will fail. (BlueZ kinda sucks)
# Note: the on disconnected callback will call our self.close which will make us nicely wait for threads to exit
self._exit_handler = atexit.register(self.client.disconnect)
def __repr__(self):
rep = f"BLEInterface(address={self.client.address if self.client else None!r}"
if self.debugOut is not None:
rep += f", debugOut={self.debugOut!r}"
if self.noProto:
rep += ", noProto=True"
if self.noNodes:
rep += ", noNodes=True"
rep += ")"
return rep
def from_num_handler(self, _, b: bytes) -> None: # pylint: disable=C0116
"""Handle callbacks for fromnum notify.
Note: this method does not need to be async because it is just setting a bool.
"""
from_num = struct.unpack("<I", bytes(b))[0]
logger.debug(f"FROMNUM notify: {from_num}")
self.should_read = True
async def log_radio_handler(self, _, b): # pylint: disable=C0116
log_record = mesh_pb2.LogRecord()
try:
log_record.ParseFromString(bytes(b))
message = (
f"[{log_record.source}] {log_record.message}"
if log_record.source
else log_record.message
)
self._handleLogLine(message)
except google.protobuf.message.DecodeError:
logger.warning("Malformed LogRecord received. Skipping.")
async def legacy_log_radio_handler(self, _, b): # pylint: disable=C0116
log_radio = b.decode("utf-8").replace("\n", "")
self._handleLogLine(log_radio)
@staticmethod
def scan() -> List[BLEDevice]:
"""Scan for available BLE devices."""
with BLEClient() as client:
logger.info("Scanning for BLE devices (takes 10 seconds)...")
response = client.discover(
timeout=10, return_adv=True, service_uuids=[SERVICE_UUID]
)
devices = response.values()
# bleak sometimes returns devices we didn't ask for, so filter the response
# to only return true meshtastic devices
# d[0] is the device. d[1] is the advertisement data
devices = list(
filter(lambda d: SERVICE_UUID in d[1].service_uuids, devices)
)
return list(map(lambda d: d[0], devices))
def find_device(self, address: Optional[str]) -> BLEDevice:
"""Find a device by address."""
addressed_devices = BLEInterface.scan()
if address:
addressed_devices = list(
filter(
lambda x: address in (x.name, x.address),
addressed_devices,
)
)
if len(addressed_devices) == 0:
raise BLEInterface.BLEError(
f"No Meshtastic BLE peripheral with identifier or address '{address}' found. Try --ble-scan to find it."
)
if len(addressed_devices) > 1:
raise BLEInterface.BLEError(
f"More than one Meshtastic BLE peripheral with identifier or address '{address}' found."
)
return addressed_devices[0]
def _sanitize_address(self, address: Optional[str]) -> Optional[str]: # pylint: disable=E0213
"Standardize BLE address by removing extraneous characters and lowercasing."
if address is None:
return None
else:
return address.replace("-", "").replace("_", "").replace(":", "").lower()
def connect(self, address: Optional[str] = None) -> "BLEClient":
"Connect to a device by address."
# Bleak docs recommend always doing a scan before connecting (even if we know addr)
device = self.find_device(address)
client = BLEClient(device.address, disconnected_callback=lambda _: self.close())
client.connect()
client.discover()
return client
def _receiveFromRadioImpl(self) -> None:
while self._want_receive:
if self.should_read:
self.should_read = False
retries: int = 0
while self._want_receive:
if self.client is None:
logger.debug(f"BLE client is None, shutting down")
self._want_receive = False
continue
try:
b = bytes(self.client.read_gatt_char(FROMRADIO_UUID))
except BleakDBusError as e:
# Device disconnected probably, so end our read loop immediately
logger.debug(f"Device disconnected, shutting down {e}")
self._want_receive = False
except BleakError as e:
# We were definitely disconnected
if "Not connected" in str(e):
logger.debug(f"Device disconnected, shutting down {e}")
self._want_receive = False
else:
raise BLEInterface.BLEError("Error reading BLE") from e
if not b:
if retries < 5:
time.sleep(0.1)
retries += 1
continue
break
logger.debug(f"FROMRADIO read: {b.hex()}")
self._handleFromRadio(b)
else:
time.sleep(0.01)
def _sendToRadioImpl(self, toRadio) -> None:
b: bytes = toRadio.SerializeToString()
if b and self.client: # we silently ignore writes while we are shutting down
logger.debug(f"TORADIO write: {b.hex()}")
try:
self.client.write_gatt_char(
TORADIO_UUID, b, response=True
) # FIXME: or False?
# search Bleak src for org.bluez.Error.InProgress
except Exception as e:
raise BLEInterface.BLEError(
"Error writing BLE (are you in the 'bluetooth' user group? did you enter the pairing PIN on your computer?)"
) from e
# Allow to propagate and then make sure we read
time.sleep(0.01)
self.should_read = True
def close(self) -> None:
try:
MeshInterface.close(self)
except Exception as e:
logger.error(f"Error closing mesh interface: {e}")
if self._want_receive:
self._want_receive = False # Tell the thread we want it to stop
if self._receiveThread:
self._receiveThread.join(
timeout=2
) # If bleak is hung, don't wait for the thread to exit (it is critical we disconnect)
self._receiveThread = None
if self.client:
atexit.unregister(self._exit_handler)
self.client.disconnect()
self.client.close()
self.client = None
self._disconnected() # send the disconnected indicator up to clients
class BLEClient:
"""Client for managing connection to a BLE device"""
def __init__(self, address=None, **kwargs) -> None:
self._loop_ready = Event()
self._eventLoop = asyncio.new_event_loop()
self._eventThread = Thread(
target=self._run_event_loop, name="BLEClient", daemon=True
)
self._eventThread.start()
self._loop_ready.wait() # Wait for event loop to be running
if not address:
logger.debug("No address provided - only discover method will work.")
return
self.bleak_client = BleakClient(address, **kwargs)
def discover(self, **kwargs): # pylint: disable=C0116
return self.async_await(BleakScanner.discover(**kwargs))
def pair(self, **kwargs): # pylint: disable=C0116
return self.async_await(self.bleak_client.pair(**kwargs))
def connect(self, **kwargs): # pylint: disable=C0116
return self.async_await(self.bleak_client.connect(**kwargs))
def disconnect(self, **kwargs): # pylint: disable=C0116
self.async_await(self.bleak_client.disconnect(**kwargs))
def read_gatt_char(self, *args, **kwargs): # pylint: disable=C0116
return self.async_await(self.bleak_client.read_gatt_char(*args, **kwargs))
def write_gatt_char(self, *args, **kwargs): # pylint: disable=C0116
self.async_await(self.bleak_client.write_gatt_char(*args, **kwargs))
def has_characteristic(self, specifier):
"""Check if the connected node supports a specified characteristic."""
return bool(self.bleak_client.services.get_characteristic(specifier))
def start_notify(self, *args, **kwargs): # pylint: disable=C0116
self.async_await(self.bleak_client.start_notify(*args, **kwargs))
def close(self): # pylint: disable=C0116
self.async_run(self._stop_event_loop())
self._eventThread.join()
def __enter__(self):
return self
def __exit__(self, _type, _value, _traceback):
self.close()
def async_await(self, coro, timeout=None): # pylint: disable=C0116
"""Wait for async operation to complete.
On macOS, CoreBluetooth requires occasional I/O operations for
callbacks to be properly delivered. The debug logging provides this
I/O when enabled, allowing the system to process pending callbacks.
"""
logger.debug(f"async_await: waiting for {coro}")
future = self.async_run(coro)
# On macOS without debug logging, callbacks may not be delivered
# unless we trigger some I/O. This is a known quirk of CoreBluetooth.
sys.stdout.flush()
result = future.result(timeout)
logger.debug("async_await: complete")
return result
def async_run(self, coro): # pylint: disable=C0116
return asyncio.run_coroutine_threadsafe(coro, self._eventLoop)
def _run_event_loop(self):
try:
# Signal ready from WITHIN the loop to guarantee it's actually running
self._eventLoop.call_soon(self._loop_ready.set)
self._eventLoop.run_forever()
finally:
self._eventLoop.close()
async def _stop_event_loop(self):
self._eventLoop.stop()