fix bug: we were never calling BLE.disconnect() which...

on linux breaks all but the first connection attempts.
Also remove unneeded event stuff and arbitrary timeouts, better just to
use thread.join()
This commit is contained in:
Kevin Hester
2024-06-29 14:25:01 -07:00
parent 898018ebf3
commit 532ca54ba4

View File

@@ -1,14 +1,15 @@
"""Bluetooth interface
"""
import asyncio
import atexit
import logging
import struct
import time
from threading import Event, Thread
from threading import Thread
from typing import Optional
from print_color import print
from bleak import BleakClient, BleakScanner, BLEDevice
from print_color import print
from meshtastic.mesh_interface import MeshInterface
from meshtastic.util import our_exit
@@ -20,18 +21,13 @@ FROMNUM_UUID = "ed9da18c-a800-4f66-a670-aa7547e34453"
LOGRADIO_UUID = "6c6fd238-78fa-436b-aacf-15c5be1ef2e2"
class BLEInterface(MeshInterface):
"""MeshInterface using BLE to connect to devices."""
class BLEError(Exception):
"""An exception class for BLE errors."""
pass
class BLEState: # pylint: disable=C0115
THREADS = False
BLE = False
MESH = False
pass
def __init__(
self,
@@ -40,23 +36,23 @@ class BLEInterface(MeshInterface):
debugOut=None,
noNodes: bool = False,
):
self.state = BLEInterface.BLEState()
MeshInterface.__init__(
self, debugOut=debugOut, noProto=noProto, noNodes=noNodes
)
self.should_read = False
logging.debug("Threads starting")
self._receiveThread = Thread(target=self._receiveFromRadioImpl, name="BLEReceive", daemon=True)
self._receiveThread_started = Event()
self._receiveThread_stopped = Event()
self._want_receive = True
self._receiveThread: Optional[Thread] = Thread(
target=self._receiveFromRadioImpl, name="BLEReceive", daemon=True
)
self._receiveThread.start()
self._receiveThread_started.wait(1)
self.state.THREADS = True
logging.debug("Threads running")
try:
logging.debug(f"BLE connecting to: {address if address else 'any'}")
self.client = self.connect(address)
self.state.BLE = True
self.client: Optional[BLEClient] = self.connect(address)
logging.debug("BLE connected")
except BLEInterface.BLEError as e:
self.close()
@@ -64,29 +60,30 @@ class BLEInterface(MeshInterface):
self.client.start_notify(LOGRADIO_UUID, self.log_radio_handler)
logging.debug("Mesh init starting")
MeshInterface.__init__(
self, debugOut=debugOut, noProto=noProto, noNodes=noNodes
)
logging.debug("Mesh configure starting")
self._startConfig()
if not self.noProto:
self._waitConnected(timeout=60.0)
self.waitForConfig()
self.state.MESH = True
logging.debug("Mesh init finished")
logging.debug("Register FROMNUM notify callback")
self.client.start_notify(FROMNUM_UUID, self.from_num_handler)
# We MUST run atexit (if we can) because otherwise (at least on linux) the BLE device is not disconnected
# and future connection attempts will fail. (BlueZ kinda sucks)
self._exit_handler = atexit.register(self.close)
def from_num_handler(self, _, b): # pylint: disable=C0116
"""Handle callbacks for fromnum notify.
Note: this method does not need to be async because it is just setting a bool."""
Note: this method does not need to be async because it is just setting a bool.
"""
from_num = struct.unpack("<I", bytes(b))[0]
logging.debug(f"FROMNUM notify: {from_num}")
self.should_read = True
async def log_radio_handler(self, _, b): # pylint: disable=C0116
log_radio = b.decode('utf-8').replace('\n', '')
async def log_radio_handler(self, _, b): # pylint: disable=C0116
log_radio = b.decode("utf-8").replace("\n", "")
if log_radio.startswith("DEBUG"):
print(log_radio, color="cyan", end=None)
elif log_radio.startswith("INFO"):
@@ -103,7 +100,9 @@ class BLEInterface(MeshInterface):
"""Scan for available BLE devices."""
with BLEClient() as client:
logging.info("Scanning for BLE devices (takes 10 seconds)...")
response = client.discover(timeout=10, return_adv=True, service_uuids=[SERVICE_UUID])
response = client.discover(
timeout=10, return_adv=True, service_uuids=[SERVICE_UUID]
)
devices = response.values()
@@ -153,8 +152,7 @@ class BLEInterface(MeshInterface):
return client
def _receiveFromRadioImpl(self):
self._receiveThread_started.set()
while self._receiveThread_started.is_set():
while self._want_receive:
if self.should_read:
self.should_read = False
retries = 0
@@ -172,44 +170,49 @@ class BLEInterface(MeshInterface):
logging.debug(f"FROMRADIO read: {b.hex()}")
self._handleFromRadio(b)
else:
time.sleep(0.1)
self._receiveThread_stopped.set()
time.sleep(0.01)
def _sendToRadioImpl(self, toRadio):
b = toRadio.SerializeToString()
if b:
logging.debug(f"TORADIO write: {b.hex()}")
try:
self.client.write_gatt_char(TORADIO_UUID, b, response=True) # FIXME: or False?
self.client.write_gatt_char(
TORADIO_UUID, b, response=True
) # FIXME: or False?
# search Bleak src for org.bluez.Error.InProgress
except Exception as e:
raise BLEInterface.BLEError("Error writing BLE (are you in the 'bluetooth' user group? did you enter the pairing PIN on your computer?)") from e
raise BLEInterface.BLEError(
"Error writing BLE (are you in the 'bluetooth' user group? did you enter the pairing PIN on your computer?)"
) from e
# Allow to propagate and then make sure we read
time.sleep(0.01)
self.should_read = True
def close(self):
if self.state.MESH:
MeshInterface.close(self)
atexit.unregister(self._exit_handler)
MeshInterface.close(self)
if self.state.THREADS:
self._receiveThread_started.clear()
self._receiveThread_stopped.wait(5)
if self._want_receive:
self.want_receive = False # Tell the thread we want it to stop
self._receiveThread.join()
self._receiveThread = None
if self.state.BLE:
if self.client:
self.client.disconnect()
self.client.close()
self.client = None
class BLEClient:
"""Client for managing connection to a BLE device"""
def __init__(self, address=None, **kwargs):
self._eventThread = Thread(target=self._run_event_loop, name="BLEClient", daemon=True)
self._eventThread_started = Event()
self._eventThread_stopped = Event()
self._eventLoop = asyncio.new_event_loop()
self._eventThread = Thread(
target=self._run_event_loop, name="BLEClient", daemon=True
)
self._eventThread.start()
self._eventThread_started.wait(1)
if not address:
logging.debug("No address provided - only discover method will work.")
@@ -240,7 +243,7 @@ class BLEClient:
def close(self): # pylint: disable=C0116
self.async_run(self._stop_event_loop())
self._eventThread_stopped.wait(5)
self._eventThread.join()
def __enter__(self):
return self
@@ -255,14 +258,10 @@ class BLEClient:
return asyncio.run_coroutine_threadsafe(coro, self._eventLoop)
def _run_event_loop(self):
# I don't know if the event loop can be initialized in __init__ so silencing pylint
self._eventLoop = asyncio.new_event_loop() # pylint: disable=W0201
self._eventThread_started.set()
try:
self._eventLoop.run_forever()
finally:
self._eventLoop.close()
self._eventThread_stopped.set()
async def _stop_event_loop(self):
self._eventLoop.stop()