fix BLE scan and connect to work with latest bleak

This commit is contained in:
Kevin Hester
2024-06-23 11:56:44 -07:00
parent e6a88e055f
commit 62f16d34d4
3 changed files with 37 additions and 25 deletions

8
.vscode/launch.json vendored
View File

@@ -76,6 +76,14 @@
"justMyCode": true,
"args": ["--debug", "--info"]
},
{
"name": "meshtastic debug BLE",
"type": "python",
"request": "launch",
"module": "meshtastic",
"justMyCode": true,
"args": ["--debug", "--ble", "--info"]
},
{
"name": "meshtastic debug set region",
"type": "python",

View File

@@ -1050,7 +1050,7 @@ def common():
meshtastic.util.our_exit("BLE scan finished", 0)
return
elif args.ble:
client = BLEInterface(args.ble, debugOut=logfile, noProto=args.noproto, noNodes=args.no_nodes)
client = BLEInterface(args.ble_dest, debugOut=logfile, noProto=args.noproto, noNodes=args.no_nodes)
elif args.host:
try:
client = meshtastic.tcp_interface.TCPInterface(
@@ -1119,6 +1119,12 @@ def addConnectionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParse
group.add_argument(
"--ble",
help="The BLE device address or name to connect to",
action="store_true",
)
group.add_argument(
"--ble-dest",
help="The BLE device address or name to connect to",
default=None,
)

View File

@@ -7,7 +7,7 @@ import asyncio
from threading import Thread, Event
from typing import Optional
from bleak import BleakScanner, BleakClient
from bleak import BleakScanner, BleakClient, BLEDevice
from meshtastic.mesh_interface import MeshInterface
from meshtastic.util import our_exit
@@ -35,9 +35,6 @@ class BLEInterface(MeshInterface):
def __init__(self, address: Optional[str], noProto: bool = False, debugOut = None, noNodes: bool = False):
self.state = BLEInterface.BLEState()
if not address:
return
self.should_read = False
logging.debug("Threads starting")
@@ -54,10 +51,9 @@ class BLEInterface(MeshInterface):
self.client = self.connect(address)
self.state.BLE = True
logging.debug("BLE connected")
except BLEInterface.BLEError as e:
# except BLEInterface.BLEError as e:
finally:
self.close()
our_exit(e.message, 1)
return
logging.debug("Mesh init starting")
MeshInterface.__init__(self, debugOut = debugOut, noProto = noProto, noNodes = noNodes)
@@ -78,33 +74,35 @@ class BLEInterface(MeshInterface):
self.should_read = True
def scan(self):
"Scan for available BLE devices"
def scan(self) -> list[BLEDevice]:
"""Scan for available BLE devices."""
with BLEClient() as client:
return [
(x[0], x[1]) for x in (client.discover(
response = client.discover(
return_adv = True,
service_uuids = [ SERVICE_UUID ]
)).values()
]
service_uuids=[SERVICE_UUID]
)
devices = response.values()
# bleak sometimes returns devices we didn't ask for, so filter the response
# to only return true meshtastic devices
# d[0] is the device. d[1] is the advertisement data
devices = list(filter(lambda d: SERVICE_UUID in d[1].service_uuids, devices))
return list(map(lambda d: d[0], devices))
def find_device(self, address):
"Find a device by address"
meshtastic_devices = self.scan()
def find_device(self, address: Optional[str]) -> BLEDevice:
"""Find a device by address"""
addressed_devices = self.scan()
addressed_devices = list(filter(lambda x: address in (x[1].local_name, x[0].name), meshtastic_devices))
# If nothing is found try on the address
if len(addressed_devices) == 0:
addressed_devices = list(filter(
lambda x: BLEInterface._sanitize_address(address) == BLEInterface._sanitize_address(x[0].address),
meshtastic_devices))
if address:
addressed_devices = list(filter(lambda x: address == x.name or address == x.address, addressed_devices))
if len(addressed_devices) == 0:
raise BLEInterface.BLEError(f"No Meshtastic BLE peripheral with identifier or address '{address}' found. Try --ble-scan to find it.")
if len(addressed_devices) > 1:
raise BLEInterface.BLEError(f"More than one Meshtastic BLE peripheral with identifier or address '{address}' found.")
return addressed_devices[0][0]
return addressed_devices[0]
def _sanitize_address(address): # pylint: disable=E0213
"Standardize BLE address by removing extraneous characters and lowercasing"