mirror of
https://github.com/meshtastic/python.git
synced 2025-12-24 08:27:55 -05:00
322 lines
12 KiB
Python
322 lines
12 KiB
Python
"""Bluetooth interface
|
|
"""
|
|
import asyncio
|
|
import atexit
|
|
import logging
|
|
import struct
|
|
import time
|
|
import io
|
|
from threading import Thread
|
|
from typing import List, Optional
|
|
|
|
import google.protobuf
|
|
from bleak import BleakClient, BleakScanner, BLEDevice
|
|
from bleak.exc import BleakDBusError, BleakError
|
|
|
|
from meshtastic.mesh_interface import MeshInterface
|
|
|
|
from .protobuf import mesh_pb2
|
|
|
|
SERVICE_UUID = "6ba1b218-15a8-461f-9fa8-5dcae273eafd"
|
|
TORADIO_UUID = "f75c76d2-129e-4dad-a1dd-7866124401e7"
|
|
FROMRADIO_UUID = "2c55e69e-4993-11ed-b878-0242ac120002"
|
|
FROMNUM_UUID = "ed9da18c-a800-4f66-a670-aa7547e34453"
|
|
LEGACY_LOGRADIO_UUID = "6c6fd238-78fa-436b-aacf-15c5be1ef2e2"
|
|
LOGRADIO_UUID = "5a3d6e49-06e6-4423-9944-e9de8cdf9547"
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BLEInterface(MeshInterface):
|
|
"""MeshInterface using BLE to connect to devices."""
|
|
|
|
class BLEError(Exception):
|
|
"""An exception class for BLE errors."""
|
|
|
|
def __init__( # pylint: disable=R0917
|
|
self,
|
|
address: Optional[str],
|
|
noProto: bool = False,
|
|
debugOut: Optional[io.TextIOWrapper]=None,
|
|
noNodes: bool = False,
|
|
timeout: int = 300,
|
|
) -> None:
|
|
MeshInterface.__init__(
|
|
self, debugOut=debugOut, noProto=noProto, noNodes=noNodes, timeout=timeout
|
|
)
|
|
|
|
self.should_read = False
|
|
|
|
logger.debug("Threads starting")
|
|
self._want_receive = True
|
|
self._receiveThread: Optional[Thread] = Thread(
|
|
target=self._receiveFromRadioImpl, name="BLEReceive", daemon=True
|
|
)
|
|
self._receiveThread.start()
|
|
logger.debug("Threads running")
|
|
|
|
self.client: Optional[BLEClient] = None
|
|
try:
|
|
logger.debug(f"BLE connecting to: {address if address else 'any'}")
|
|
self.client = self.connect(address)
|
|
logger.debug("BLE connected")
|
|
except BLEInterface.BLEError as e:
|
|
self.close()
|
|
raise e
|
|
|
|
if self.client.has_characteristic(LEGACY_LOGRADIO_UUID):
|
|
self.client.start_notify(
|
|
LEGACY_LOGRADIO_UUID, self.legacy_log_radio_handler
|
|
)
|
|
|
|
if self.client.has_characteristic(LOGRADIO_UUID):
|
|
self.client.start_notify(LOGRADIO_UUID, self.log_radio_handler)
|
|
|
|
logger.debug("Mesh configure starting")
|
|
self._startConfig()
|
|
if not self.noProto:
|
|
self._waitConnected(timeout=60.0)
|
|
self.waitForConfig()
|
|
|
|
logger.debug("Register FROMNUM notify callback")
|
|
self.client.start_notify(FROMNUM_UUID, self.from_num_handler)
|
|
|
|
# We MUST run atexit (if we can) because otherwise (at least on linux) the BLE device is not disconnected
|
|
# and future connection attempts will fail. (BlueZ kinda sucks)
|
|
# Note: the on disconnected callback will call our self.close which will make us nicely wait for threads to exit
|
|
self._exit_handler = atexit.register(self.client.disconnect)
|
|
|
|
def __repr__(self):
|
|
rep = f"BLEInterface(address={self.client.address if self.client else None!r}"
|
|
if self.debugOut is not None:
|
|
rep += f", debugOut={self.debugOut!r}"
|
|
if self.noProto:
|
|
rep += ", noProto=True"
|
|
if self.noNodes:
|
|
rep += ", noNodes=True"
|
|
rep += ")"
|
|
return rep
|
|
|
|
def from_num_handler(self, _, b: bytes) -> None: # pylint: disable=C0116
|
|
"""Handle callbacks for fromnum notify.
|
|
Note: this method does not need to be async because it is just setting a bool.
|
|
"""
|
|
from_num = struct.unpack("<I", bytes(b))[0]
|
|
logger.debug(f"FROMNUM notify: {from_num}")
|
|
self.should_read = True
|
|
|
|
async def log_radio_handler(self, _, b): # pylint: disable=C0116
|
|
log_record = mesh_pb2.LogRecord()
|
|
try:
|
|
log_record.ParseFromString(bytes(b))
|
|
|
|
message = (
|
|
f"[{log_record.source}] {log_record.message}"
|
|
if log_record.source
|
|
else log_record.message
|
|
)
|
|
self._handleLogLine(message)
|
|
except google.protobuf.message.DecodeError:
|
|
logger.warning("Malformed LogRecord received. Skipping.")
|
|
|
|
async def legacy_log_radio_handler(self, _, b): # pylint: disable=C0116
|
|
log_radio = b.decode("utf-8").replace("\n", "")
|
|
self._handleLogLine(log_radio)
|
|
|
|
@staticmethod
|
|
def scan() -> List[BLEDevice]:
|
|
"""Scan for available BLE devices."""
|
|
with BLEClient() as client:
|
|
logger.info("Scanning for BLE devices (takes 10 seconds)...")
|
|
response = client.discover(
|
|
timeout=10, return_adv=True, service_uuids=[SERVICE_UUID]
|
|
)
|
|
|
|
devices = response.values()
|
|
|
|
# bleak sometimes returns devices we didn't ask for, so filter the response
|
|
# to only return true meshtastic devices
|
|
# d[0] is the device. d[1] is the advertisement data
|
|
devices = list(
|
|
filter(lambda d: SERVICE_UUID in d[1].service_uuids, devices)
|
|
)
|
|
return list(map(lambda d: d[0], devices))
|
|
|
|
def find_device(self, address: Optional[str]) -> BLEDevice:
|
|
"""Find a device by address."""
|
|
|
|
addressed_devices = BLEInterface.scan()
|
|
|
|
if address:
|
|
addressed_devices = list(
|
|
filter(
|
|
lambda x: address in (x.name, x.address),
|
|
addressed_devices,
|
|
)
|
|
)
|
|
|
|
if len(addressed_devices) == 0:
|
|
raise BLEInterface.BLEError(
|
|
f"No Meshtastic BLE peripheral with identifier or address '{address}' found. Try --ble-scan to find it."
|
|
)
|
|
if len(addressed_devices) > 1:
|
|
raise BLEInterface.BLEError(
|
|
f"More than one Meshtastic BLE peripheral with identifier or address '{address}' found."
|
|
)
|
|
return addressed_devices[0]
|
|
|
|
def _sanitize_address(self, address: Optional[str]) -> Optional[str]: # pylint: disable=E0213
|
|
"Standardize BLE address by removing extraneous characters and lowercasing."
|
|
if address is None:
|
|
return None
|
|
else:
|
|
return address.replace("-", "").replace("_", "").replace(":", "").lower()
|
|
|
|
def connect(self, address: Optional[str] = None) -> "BLEClient":
|
|
"Connect to a device by address."
|
|
|
|
# Bleak docs recommend always doing a scan before connecting (even if we know addr)
|
|
device = self.find_device(address)
|
|
client = BLEClient(device.address, disconnected_callback=lambda _: self.close())
|
|
client.connect()
|
|
client.discover()
|
|
return client
|
|
|
|
def _receiveFromRadioImpl(self) -> None:
|
|
while self._want_receive:
|
|
if self.should_read:
|
|
self.should_read = False
|
|
retries: int = 0
|
|
while self._want_receive:
|
|
if self.client is None:
|
|
logger.debug(f"BLE client is None, shutting down")
|
|
self._want_receive = False
|
|
continue
|
|
try:
|
|
b = bytes(self.client.read_gatt_char(FROMRADIO_UUID))
|
|
except BleakDBusError as e:
|
|
# Device disconnected probably, so end our read loop immediately
|
|
logger.debug(f"Device disconnected, shutting down {e}")
|
|
self._want_receive = False
|
|
except BleakError as e:
|
|
# We were definitely disconnected
|
|
if "Not connected" in str(e):
|
|
logger.debug(f"Device disconnected, shutting down {e}")
|
|
self._want_receive = False
|
|
else:
|
|
raise BLEInterface.BLEError("Error reading BLE") from e
|
|
if not b:
|
|
if retries < 5:
|
|
time.sleep(0.1)
|
|
retries += 1
|
|
continue
|
|
break
|
|
logger.debug(f"FROMRADIO read: {b.hex()}")
|
|
self._handleFromRadio(b)
|
|
else:
|
|
time.sleep(0.01)
|
|
|
|
def _sendToRadioImpl(self, toRadio) -> None:
|
|
b: bytes = toRadio.SerializeToString()
|
|
if b and self.client: # we silently ignore writes while we are shutting down
|
|
logger.debug(f"TORADIO write: {b.hex()}")
|
|
try:
|
|
self.client.write_gatt_char(
|
|
TORADIO_UUID, b, response=True
|
|
) # FIXME: or False?
|
|
# search Bleak src for org.bluez.Error.InProgress
|
|
except Exception as e:
|
|
raise BLEInterface.BLEError(
|
|
"Error writing BLE (are you in the 'bluetooth' user group? did you enter the pairing PIN on your computer?)"
|
|
) from e
|
|
# Allow to propagate and then make sure we read
|
|
time.sleep(0.01)
|
|
self.should_read = True
|
|
|
|
def close(self) -> None:
|
|
try:
|
|
MeshInterface.close(self)
|
|
except Exception as e:
|
|
logger.error(f"Error closing mesh interface: {e}")
|
|
|
|
if self._want_receive:
|
|
self._want_receive = False # Tell the thread we want it to stop
|
|
if self._receiveThread:
|
|
self._receiveThread.join(
|
|
timeout=2
|
|
) # If bleak is hung, don't wait for the thread to exit (it is critical we disconnect)
|
|
self._receiveThread = None
|
|
|
|
if self.client:
|
|
atexit.unregister(self._exit_handler)
|
|
self.client.disconnect()
|
|
self.client.close()
|
|
self.client = None
|
|
self._disconnected() # send the disconnected indicator up to clients
|
|
|
|
|
|
class BLEClient:
|
|
"""Client for managing connection to a BLE device"""
|
|
|
|
def __init__(self, address=None, **kwargs) -> None:
|
|
self._eventLoop = asyncio.new_event_loop()
|
|
self._eventThread = Thread(
|
|
target=self._run_event_loop, name="BLEClient", daemon=True
|
|
)
|
|
self._eventThread.start()
|
|
|
|
if not address:
|
|
logger.debug("No address provided - only discover method will work.")
|
|
return
|
|
|
|
self.bleak_client = BleakClient(address, **kwargs)
|
|
|
|
def discover(self, **kwargs): # pylint: disable=C0116
|
|
return self.async_await(BleakScanner.discover(**kwargs))
|
|
|
|
def pair(self, **kwargs): # pylint: disable=C0116
|
|
return self.async_await(self.bleak_client.pair(**kwargs))
|
|
|
|
def connect(self, **kwargs): # pylint: disable=C0116
|
|
return self.async_await(self.bleak_client.connect(**kwargs))
|
|
|
|
def disconnect(self, **kwargs): # pylint: disable=C0116
|
|
self.async_await(self.bleak_client.disconnect(**kwargs))
|
|
|
|
def read_gatt_char(self, *args, **kwargs): # pylint: disable=C0116
|
|
return self.async_await(self.bleak_client.read_gatt_char(*args, **kwargs))
|
|
|
|
def write_gatt_char(self, *args, **kwargs): # pylint: disable=C0116
|
|
self.async_await(self.bleak_client.write_gatt_char(*args, **kwargs))
|
|
|
|
def has_characteristic(self, specifier):
|
|
"""Check if the connected node supports a specified characteristic."""
|
|
return bool(self.bleak_client.services.get_characteristic(specifier))
|
|
|
|
def start_notify(self, *args, **kwargs): # pylint: disable=C0116
|
|
self.async_await(self.bleak_client.start_notify(*args, **kwargs))
|
|
|
|
def close(self): # pylint: disable=C0116
|
|
self.async_run(self._stop_event_loop())
|
|
self._eventThread.join()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, _type, _value, _traceback):
|
|
self.close()
|
|
|
|
def async_await(self, coro, timeout=None): # pylint: disable=C0116
|
|
return self.async_run(coro).result(timeout)
|
|
|
|
def async_run(self, coro): # pylint: disable=C0116
|
|
return asyncio.run_coroutine_threadsafe(coro, self._eventLoop)
|
|
|
|
def _run_event_loop(self):
|
|
try:
|
|
self._eventLoop.run_forever()
|
|
finally:
|
|
self._eventLoop.close()
|
|
|
|
async def _stop_event_loop(self):
|
|
self._eventLoop.stop()
|