refactor(ble): Improve BLE connection stability and error handling (#3744)

Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
James Rich
2025-11-19 12:36:52 -06:00
committed by GitHub
parent 5120644349
commit e2aca10091

View File

@@ -26,9 +26,12 @@ import com.geeksville.mesh.repository.radio.BleConstants.BTM_TORADIO_CHARACTER
import com.geeksville.mesh.service.RadioNotConnectedException
import dagger.assisted.Assisted
import dagger.assisted.AssistedInject
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.channelFlow
@@ -75,8 +78,21 @@ constructor(
@Assisted val address: String,
) : IRadioInterface {
private val connectionScope = CoroutineScope(serviceScope.coroutineContext + SupervisorJob())
private val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
Timber.e(throwable, "[$address] Uncaught exception in connectionScope")
serviceScope.launch {
try {
peripheral?.disconnect()
} catch (e: Exception) {
Timber.e(e, "[$address] Failed to disconnect in exception handler")
}
}
service.onDisconnect(BleError.from(throwable))
}
private val connectionScope = CoroutineScope(serviceScope.coroutineContext + SupervisorJob() + exceptionHandler)
private val drainMutex = Mutex()
private val writeMutex = Mutex()
private var peripheral: Peripheral? = null
@@ -106,12 +122,10 @@ constructor(
private fun dispatchPacket(packet: ByteArray) {
Timber.d("[$address] Dispatching packet to service.handleFromRadio()")
connectionScope.launch {
try {
service.handleFromRadio(p = packet)
} catch (t: Throwable) {
Timber.e(t, "[$address] Failed to schedule service.handleFromRadio)")
}
try {
service.handleFromRadio(p = packet)
} catch (t: Throwable) {
Timber.e(t, "[$address] Failed to execute service.handleFromRadio()")
}
}
@@ -136,14 +150,14 @@ constructor(
// --- Connection & Discovery Logic ---
private suspend fun findPeripheral(): Peripheral =
private fun findPeripheral(): Peripheral =
centralManager.getBondedPeripherals().firstOrNull { it.address == address }
?: throw RadioNotConnectedException("Device not found at address $address")
private fun connect() {
connectionScope.launch {
try {
peripheral = findAndConnectPeripheral()
peripheral = retryCall { findAndConnectPeripheral() }
peripheral?.let {
onConnected()
observePeripheralChanges()
@@ -169,10 +183,10 @@ constructor(
private suspend fun onConnected() {
try {
peripheral?.let { p ->
val rssi = p.readRssi()
val rssi = retryCall { p.readRssi() }
Timber.d("[$address] Connection established. RSSI: $rssi dBm")
val phyInUse = p.readPhy()
val phyInUse = retryCall { p.readPhy() }
Timber.d("[$address] PHY in use: $phyInUse")
}
} catch (e: Exception) {
@@ -200,6 +214,7 @@ constructor(
.launchIn(connectionScope)
}
@Suppress("TooGenericExceptionCaught")
@OptIn(ExperimentalUuidApi::class)
private fun discoverServicesAndSetupCharacteristics(peripheral: Peripheral) {
connectionScope.launch {
@@ -246,6 +261,15 @@ constructor(
service.onDisconnect(BleError.DiscoveryFailed("Meshtastic service not found"))
}
}
.catch { e ->
Timber.e(e, "[$address] Service discovery failed")
try {
peripheral.disconnect()
} catch (e2: Exception) {
Timber.e(e2, "[$address] Failed to disconnect in discovery catch")
}
service.onDisconnect(BleError.from(e))
}
.launchIn(connectionScope)
}
}
@@ -254,8 +278,7 @@ constructor(
@OptIn(ExperimentalUuidApi::class)
private suspend fun setupNotifications() {
fromNumCharacteristic
?.subscribe()
retryCall { fromNumCharacteristic?.subscribe() }
?.onStart { Timber.d("[$address] Subscribing to fromNumCharacteristic") }
?.onEach { notifyBytes ->
Timber.d("[$address] FromNum Notification (${notifyBytes.size} bytes), draining queue")
@@ -268,8 +291,7 @@ constructor(
?.onCompletion { cause -> Timber.d("[$address] fromNum sub flow completed, cause=$cause") }
?.launchIn(scope = connectionScope)
logRadioCharacteristic
?.subscribe()
retryCall { logRadioCharacteristic?.subscribe() }
?.onStart { Timber.d("[$address] Subscribing to logRadioCharacteristic") }
?.onEach { notifyBytes ->
Timber.d("[$address] LogRadio Notification (${notifyBytes.size} bytes), dispatching packet")
@@ -283,6 +305,22 @@ constructor(
?.launchIn(scope = connectionScope)
}
private suspend fun <T> retryCall(block: suspend () -> T): T {
var currentAttempt = 0
while (true) {
try {
return block()
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
currentAttempt++
if (currentAttempt >= RETRY_COUNT) throw e
Timber.w(e, "[$address] Operation failed, retrying ($currentAttempt/$RETRY_COUNT)...")
delay(RETRY_DELAY_MS)
}
}
}
// --- IRadioInterface Implementation ---
/**
@@ -294,19 +332,23 @@ constructor(
toRadioCharacteristic?.let { characteristic ->
if (peripheral == null) return@let
connectionScope.launch {
try {
val writeType =
if (characteristic.properties.contains(CharacteristicProperty.WRITE_WITHOUT_RESPONSE)) {
WriteType.WITHOUT_RESPONSE
} else {
WriteType.WITH_RESPONSE
writeMutex.withLock {
try {
val writeType =
if (characteristic.properties.contains(CharacteristicProperty.WRITE_WITHOUT_RESPONSE)) {
WriteType.WITHOUT_RESPONSE
} else {
WriteType.WITH_RESPONSE
}
retryCall {
Timber.d("[$address] Writing packet to toRadioCharacteristic with $writeType")
characteristic.write(p, writeType = writeType)
}
Timber.d("[$address] Writing packet to toRadioCharacteristic with $writeType")
characteristic.write(p, writeType = writeType)
drainPacketQueueAndDispatch()
} catch (e: Exception) {
Timber.e(e, "[$address] Failed to write packet to toRadioCharacteristic")
service.onDisconnect(BleError.from(e))
drainPacketQueueAndDispatch()
} catch (e: Exception) {
Timber.e(e, "[$address] Failed to write packet to toRadioCharacteristic")
service.onDisconnect(BleError.from(e))
}
}
}
} ?: Timber.w("[$address] toRadio unavailable, can't send data")
@@ -320,6 +362,11 @@ constructor(
service.onDisconnect(true)
}
}
companion object {
private const val RETRY_COUNT = 3
private const val RETRY_DELAY_MS = 500L
}
}
object BleConstants {