Make pylint happy with ble_interface.py

This commit is contained in:
Ian McEwen
2024-03-19 13:18:15 -07:00
parent bf56521a53
commit 9b5943192d

View File

@@ -3,12 +3,12 @@
import logging
import time
import struct
import asyncio
from threading import Thread, Event
from bleak import BleakScanner, BleakClient
from meshtastic.mesh_interface import MeshInterface
from meshtastic.util import our_exit
from bleak import BleakScanner, BleakClient
import asyncio
SERVICE_UUID = "6ba1b218-15a8-461f-9fa8-5dcae273eafd"
TORADIO_UUID = "f75c76d2-129e-4dad-a1dd-7866124401e7"
@@ -17,13 +17,14 @@ FROMNUM_UUID = "ed9da18c-a800-4f66-a670-aa7547e34453"
class BLEInterface(MeshInterface):
"""MeshInterface using BLE to connect to devices"""
class BLEError(Exception):
"""An exception class for BLE errors"""
def __init__(self, message):
self.message = message
super().__init__(self.message)
class BLEState():
class BLEState(): # pylint: disable=C0115
THREADS = False
BLE = False
MESH = False
@@ -69,13 +70,14 @@ class BLEInterface(MeshInterface):
self.client.start_notify(FROMNUM_UUID, self.from_num_handler)
async def from_num_handler(self, _, b):
async def from_num_handler(self, _, b): # pylint: disable=C0116
from_num = struct.unpack('<I', bytes(b))[0]
logging.debug(f"FROMNUM notify: {from_num}")
self.should_read = True
def scan(self):
"Scan for available BLE devices"
with BLEClient() as client:
return [
(x[0], x[1]) for x in (client.discover(
@@ -86,12 +88,15 @@ class BLEInterface(MeshInterface):
def find_device(self, address):
"Find a device by address"
meshtastic_devices = self.scan()
addressed_devices = list(filter(lambda x: address == x[1].local_name or address == x[0].name, meshtastic_devices))
addressed_devices = list(filter(lambda x: address in (x[1].local_name, x[0].name), meshtastic_devices))
# If nothing is found try on the address
if len(addressed_devices) == 0:
addressed_devices = list(filter(lambda x: BLEInterface._sanitize_address(address) == BLEInterface._sanitize_address(x[0].address), meshtastic_devices))
addressed_devices = list(filter(
lambda x: BLEInterface._sanitize_address(address) == BLEInterface._sanitize_address(x[0].address),
meshtastic_devices))
if len(addressed_devices) == 0:
raise BLEInterface.BLEError(f"No Meshtastic BLE peripheral with identifier or address '{address}' found. Try --ble-scan to find it.")
@@ -99,7 +104,8 @@ class BLEInterface(MeshInterface):
raise BLEInterface.BLEError(f"More than one Meshtastic BLE peripheral with identifier or address '{address}' found.")
return addressed_devices[0][0]
def _sanitize_address(address):
def _sanitize_address(address): # pylint: disable=E0213
"Standardize BLE address by removing extraneous characters and lowercasing"
return address \
.replace("-", "") \
.replace("_", "") \
@@ -107,6 +113,7 @@ class BLEInterface(MeshInterface):
.lower()
def connect(self, address):
"Connect to a device by address"
device = self.find_device(address)
client = BLEClient(device.address)
client.connect()
@@ -156,13 +163,14 @@ class BLEInterface(MeshInterface):
if self.state.THREADS:
self._receiveThread_started.clear()
self._receiveThread_stopped.wait(5)
if self.state.BLE:
self.client.disconnect()
self.client.close()
class BLEClient():
"""Client for managing connection to a BLE device"""
def __init__(self, address = None, **kwargs):
self._eventThread = Thread(target = self._run_event_loop)
self._eventThread_started = Event()
@@ -177,47 +185,46 @@ class BLEClient():
self.bleak_client = BleakClient(address, **kwargs)
def discover(self, **kwargs):
def discover(self, **kwargs): # pylint: disable=C0116
return self.async_await(BleakScanner.discover(**kwargs))
def pair(self, **kwargs):
def pair(self, **kwargs): # pylint: disable=C0116
return self.async_await(self.bleak_client.pair(**kwargs))
def connect(self, **kwargs):
def connect(self, **kwargs): # pylint: disable=C0116
return self.async_await(self.bleak_client.connect(**kwargs))
def disconnect(self, **kwargs):
def disconnect(self, **kwargs): # pylint: disable=C0116
self.async_await(self.bleak_client.disconnect(**kwargs))
def read_gatt_char(self, *args, **kwargs):
def read_gatt_char(self, *args, **kwargs): # pylint: disable=C0116
return self.async_await(self.bleak_client.read_gatt_char(*args, **kwargs))
def write_gatt_char(self, *args, **kwargs):
def write_gatt_char(self, *args, **kwargs): # pylint: disable=C0116
self.async_await(self.bleak_client.write_gatt_char(*args, **kwargs))
def start_notify(self, *args, **kwargs):
def start_notify(self, *args, **kwargs): # pylint: disable=C0116
self.async_await(self.bleak_client.start_notify(*args, **kwargs))
def close(self):
def close(self): # pylint: disable=C0116
self.async_run(self._stop_event_loop())
self._eventThread_stopped.wait(5)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
def __exit__(self, _type, _value, _traceback):
self.close()
def async_await(self, coro, timeout = None):
def async_await(self, coro, timeout = None): # pylint: disable=C0116
return self.async_run(coro).result(timeout)
def async_run(self, coro):
def async_run(self, coro): # pylint: disable=C0116
return asyncio.run_coroutine_threadsafe(coro, self._eventLoop)
def _run_event_loop(self):
self._eventLoop = asyncio.new_event_loop()
# I don't know if the event loop can be initialized in __init__ so silencing pylint
self._eventLoop = asyncio.new_event_loop() # pylint: disable=W0201
self._eventThread_started.set()
try:
self._eventLoop.run_forever()