mirror of
https://github.com/meshtastic/python.git
synced 2026-06-02 12:45:00 -04:00
Add a BLEError 'kind' field and branch on it instead of string matching
This commit is contained in:
@@ -1374,8 +1374,7 @@ def common():
|
||||
timeout=args.timeout,
|
||||
)
|
||||
except BLEInterface.BLEError as e:
|
||||
msg = str(e)
|
||||
if "No Meshtastic BLE peripheral" in msg:
|
||||
if e.kind == BLEInterface.BLEError.DEVICE_NOT_FOUND:
|
||||
meshtastic.util.our_exit(
|
||||
"BLE device not found.\n\n"
|
||||
"Possible causes:\n"
|
||||
@@ -1387,7 +1386,7 @@ def common():
|
||||
" - Run 'meshtastic --ble-scan' to see available devices",
|
||||
1,
|
||||
)
|
||||
elif "More than one" in msg:
|
||||
elif e.kind == BLEInterface.BLEError.MULTIPLE_DEVICES:
|
||||
meshtastic.util.our_exit(
|
||||
"Multiple Meshtastic BLE devices found.\n\n"
|
||||
"Please specify which device to connect to:\n"
|
||||
@@ -1395,7 +1394,7 @@ def common():
|
||||
" - Use 'meshtastic --ble <name_or_address>' to connect",
|
||||
1,
|
||||
)
|
||||
elif "Error writing BLE" in msg:
|
||||
elif e.kind == BLEInterface.BLEError.WRITE_ERROR:
|
||||
meshtastic.util.our_exit(
|
||||
"Failed to write to BLE device.\n\n"
|
||||
"Possible causes:\n"
|
||||
@@ -1407,7 +1406,7 @@ def common():
|
||||
" - Reset the Meshtastic device",
|
||||
1,
|
||||
)
|
||||
elif "Error reading BLE" in msg:
|
||||
elif e.kind == BLEInterface.BLEError.READ_ERROR:
|
||||
meshtastic.util.our_exit(
|
||||
"Failed to read from BLE device.\n\n"
|
||||
"The device may have disconnected unexpectedly.\n\n"
|
||||
|
||||
@@ -33,6 +33,16 @@ class BLEInterface(MeshInterface):
|
||||
class BLEError(Exception):
|
||||
"""An exception class for BLE errors."""
|
||||
|
||||
DEVICE_NOT_FOUND = "device_not_found"
|
||||
MULTIPLE_DEVICES = "multiple_devices"
|
||||
READ_ERROR = "read_error"
|
||||
WRITE_ERROR = "write_error"
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
def __init__(self, message: str, kind: str = UNKNOWN):
|
||||
super().__init__(message)
|
||||
self.kind = kind
|
||||
|
||||
def __init__( # pylint: disable=R0917
|
||||
self,
|
||||
address: Optional[str],
|
||||
@@ -157,11 +167,13 @@ class BLEInterface(MeshInterface):
|
||||
|
||||
if len(addressed_devices) == 0:
|
||||
raise BLEInterface.BLEError(
|
||||
f"No Meshtastic BLE peripheral with identifier or address '{address}' found. Try --ble-scan to find it."
|
||||
f"No Meshtastic BLE peripheral with identifier or address '{address}' found. Try --ble-scan to find it.",
|
||||
BLEInterface.BLEError.DEVICE_NOT_FOUND,
|
||||
)
|
||||
if len(addressed_devices) > 1:
|
||||
raise BLEInterface.BLEError(
|
||||
f"More than one Meshtastic BLE peripheral with identifier or address '{address}' found."
|
||||
f"More than one Meshtastic BLE peripheral with identifier or address '{address}' found.",
|
||||
BLEInterface.BLEError.MULTIPLE_DEVICES,
|
||||
)
|
||||
return addressed_devices[0]
|
||||
|
||||
@@ -204,7 +216,10 @@ class BLEInterface(MeshInterface):
|
||||
logger.debug(f"Device disconnected, shutting down {e}")
|
||||
self._want_receive = False
|
||||
else:
|
||||
raise BLEInterface.BLEError("Error reading BLE") from e
|
||||
raise BLEInterface.BLEError(
|
||||
"Error reading BLE",
|
||||
BLEInterface.BLEError.READ_ERROR,
|
||||
) from e
|
||||
if not b:
|
||||
if retries < 5:
|
||||
time.sleep(0.1)
|
||||
@@ -227,7 +242,8 @@ class BLEInterface(MeshInterface):
|
||||
# search Bleak src for org.bluez.Error.InProgress
|
||||
except Exception as e:
|
||||
raise BLEInterface.BLEError(
|
||||
"Error writing BLE (are you in the 'bluetooth' user group? did you enter the pairing PIN on your computer?)"
|
||||
"Error writing BLE (are you in the 'bluetooth' user group? did you enter the pairing PIN on your computer?)",
|
||||
BLEInterface.BLEError.WRITE_ERROR,
|
||||
) from e
|
||||
# Allow to propagate and then make sure we read
|
||||
time.sleep(0.01)
|
||||
|
||||
Reference in New Issue
Block a user