diff --git a/app/src/main/java/com/geeksville/mesh/repository/radio/NordicBleInterface.kt b/app/src/main/java/com/geeksville/mesh/repository/radio/NordicBleInterface.kt index 08816bb6e..cfb67ef87 100644 --- a/app/src/main/java/com/geeksville/mesh/repository/radio/NordicBleInterface.kt +++ b/app/src/main/java/com/geeksville/mesh/repository/radio/NordicBleInterface.kt @@ -26,9 +26,12 @@ import com.geeksville.mesh.repository.radio.BleConstants.BTM_TORADIO_CHARACTER import com.geeksville.mesh.service.RadioNotConnectedException import dagger.assisted.Assisted import dagger.assisted.AssistedInject +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineExceptionHandler import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.channelFlow @@ -75,8 +78,21 @@ constructor( @Assisted val address: String, ) : IRadioInterface { - private val connectionScope = CoroutineScope(serviceScope.coroutineContext + SupervisorJob()) + private val exceptionHandler = CoroutineExceptionHandler { _, throwable -> + Timber.e(throwable, "[$address] Uncaught exception in connectionScope") + serviceScope.launch { + try { + peripheral?.disconnect() + } catch (e: Exception) { + Timber.e(e, "[$address] Failed to disconnect in exception handler") + } + } + service.onDisconnect(BleError.from(throwable)) + } + + private val connectionScope = CoroutineScope(serviceScope.coroutineContext + SupervisorJob() + exceptionHandler) private val drainMutex = Mutex() + private val writeMutex = Mutex() private var peripheral: Peripheral? = null @@ -106,12 +122,10 @@ constructor( private fun dispatchPacket(packet: ByteArray) { Timber.d("[$address] Dispatching packet to service.handleFromRadio()") - connectionScope.launch { - try { - service.handleFromRadio(p = packet) - } catch (t: Throwable) { - Timber.e(t, "[$address] Failed to schedule service.handleFromRadio)") - } + try { + service.handleFromRadio(p = packet) + } catch (t: Throwable) { + Timber.e(t, "[$address] Failed to execute service.handleFromRadio()") } } @@ -136,14 +150,14 @@ constructor( // --- Connection & Discovery Logic --- - private suspend fun findPeripheral(): Peripheral = + private fun findPeripheral(): Peripheral = centralManager.getBondedPeripherals().firstOrNull { it.address == address } ?: throw RadioNotConnectedException("Device not found at address $address") private fun connect() { connectionScope.launch { try { - peripheral = findAndConnectPeripheral() + peripheral = retryCall { findAndConnectPeripheral() } peripheral?.let { onConnected() observePeripheralChanges() @@ -169,10 +183,10 @@ constructor( private suspend fun onConnected() { try { peripheral?.let { p -> - val rssi = p.readRssi() + val rssi = retryCall { p.readRssi() } Timber.d("[$address] Connection established. RSSI: $rssi dBm") - val phyInUse = p.readPhy() + val phyInUse = retryCall { p.readPhy() } Timber.d("[$address] PHY in use: $phyInUse") } } catch (e: Exception) { @@ -200,6 +214,7 @@ constructor( .launchIn(connectionScope) } + @Suppress("TooGenericExceptionCaught") @OptIn(ExperimentalUuidApi::class) private fun discoverServicesAndSetupCharacteristics(peripheral: Peripheral) { connectionScope.launch { @@ -246,6 +261,15 @@ constructor( service.onDisconnect(BleError.DiscoveryFailed("Meshtastic service not found")) } } + .catch { e -> + Timber.e(e, "[$address] Service discovery failed") + try { + peripheral.disconnect() + } catch (e2: Exception) { + Timber.e(e2, "[$address] Failed to disconnect in discovery catch") + } + service.onDisconnect(BleError.from(e)) + } .launchIn(connectionScope) } } @@ -254,8 +278,7 @@ constructor( @OptIn(ExperimentalUuidApi::class) private suspend fun setupNotifications() { - fromNumCharacteristic - ?.subscribe() + retryCall { fromNumCharacteristic?.subscribe() } ?.onStart { Timber.d("[$address] Subscribing to fromNumCharacteristic") } ?.onEach { notifyBytes -> Timber.d("[$address] FromNum Notification (${notifyBytes.size} bytes), draining queue") @@ -268,8 +291,7 @@ constructor( ?.onCompletion { cause -> Timber.d("[$address] fromNum sub flow completed, cause=$cause") } ?.launchIn(scope = connectionScope) - logRadioCharacteristic - ?.subscribe() + retryCall { logRadioCharacteristic?.subscribe() } ?.onStart { Timber.d("[$address] Subscribing to logRadioCharacteristic") } ?.onEach { notifyBytes -> Timber.d("[$address] LogRadio Notification (${notifyBytes.size} bytes), dispatching packet") @@ -283,6 +305,22 @@ constructor( ?.launchIn(scope = connectionScope) } + private suspend fun retryCall(block: suspend () -> T): T { + var currentAttempt = 0 + while (true) { + try { + return block() + } catch (e: CancellationException) { + throw e + } catch (e: Exception) { + currentAttempt++ + if (currentAttempt >= RETRY_COUNT) throw e + Timber.w(e, "[$address] Operation failed, retrying ($currentAttempt/$RETRY_COUNT)...") + delay(RETRY_DELAY_MS) + } + } + } + // --- IRadioInterface Implementation --- /** @@ -294,19 +332,23 @@ constructor( toRadioCharacteristic?.let { characteristic -> if (peripheral == null) return@let connectionScope.launch { - try { - val writeType = - if (characteristic.properties.contains(CharacteristicProperty.WRITE_WITHOUT_RESPONSE)) { - WriteType.WITHOUT_RESPONSE - } else { - WriteType.WITH_RESPONSE + writeMutex.withLock { + try { + val writeType = + if (characteristic.properties.contains(CharacteristicProperty.WRITE_WITHOUT_RESPONSE)) { + WriteType.WITHOUT_RESPONSE + } else { + WriteType.WITH_RESPONSE + } + retryCall { + Timber.d("[$address] Writing packet to toRadioCharacteristic with $writeType") + characteristic.write(p, writeType = writeType) } - Timber.d("[$address] Writing packet to toRadioCharacteristic with $writeType") - characteristic.write(p, writeType = writeType) - drainPacketQueueAndDispatch() - } catch (e: Exception) { - Timber.e(e, "[$address] Failed to write packet to toRadioCharacteristic") - service.onDisconnect(BleError.from(e)) + drainPacketQueueAndDispatch() + } catch (e: Exception) { + Timber.e(e, "[$address] Failed to write packet to toRadioCharacteristic") + service.onDisconnect(BleError.from(e)) + } } } } ?: Timber.w("[$address] toRadio unavailable, can't send data") @@ -320,6 +362,11 @@ constructor( service.onDisconnect(true) } } + + companion object { + private const val RETRY_COUNT = 3 + private const val RETRY_DELAY_MS = 500L + } } object BleConstants {