From 3a4cae5d0b16e0a121dc8bbf90de2efca4942e2b Mon Sep 17 00:00:00 2001 From: James Rich <2199651+jamesarich@users.noreply.github.com> Date: Wed, 22 Apr 2026 18:35:22 -0500 Subject: [PATCH] fix(ble): unblock reconnect + kable audit (logging, priority, backoff, StateFlow) (#5222) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: garthvh <1795163+garthvh@users.noreply.github.com> --- .../meshtastic/app/di/KoinVerificationTest.kt | 7 ++ .../meshtastic/core/ble/KablePlatformSetup.kt | 10 ++- .../org/meshtastic/core/ble/BleConnection.kt | 16 ++-- .../meshtastic/core/ble/BleLoggingConfig.kt | 82 +++++++++++++++++++ .../org/meshtastic/core/ble/BleRetry.kt | 33 ++++++-- .../meshtastic/core/ble/KableBleConnection.kt | 38 ++++----- .../core/ble/KableBleConnectionFactory.kt | 4 +- .../meshtastic/core/ble/KableBleScanner.kt | 8 +- .../core/ble/KableMeshtasticRadioProfile.kt | 18 +++- .../meshtastic/core/ble/di/CoreBleModule.kt | 13 ++- .../core/network/radio/BleRadioTransport.kt | 42 ++++++---- .../BleRadioTransportReconnectCrashTest.kt | 80 +++++++++++++++--- .../org/meshtastic/core/testing/FakeBle.kt | 27 +++--- .../meshtastic/desktop/di/DesktopKoinTest.kt | 7 ++ .../ota/dfu/LegacyDfuTransportTest.kt | 6 +- .../ota/dfu/SecureDfuTransportTest.kt | 6 +- 16 files changed, 302 insertions(+), 95 deletions(-) create mode 100644 core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleLoggingConfig.kt diff --git a/app/src/test/kotlin/org/meshtastic/app/di/KoinVerificationTest.kt b/app/src/test/kotlin/org/meshtastic/app/di/KoinVerificationTest.kt index 30e1b6be7..fd4b7aba8 100644 --- a/app/src/test/kotlin/org/meshtastic/app/di/KoinVerificationTest.kt +++ b/app/src/test/kotlin/org/meshtastic/app/di/KoinVerificationTest.kt @@ -30,6 +30,8 @@ import org.koin.test.verify.definition import org.koin.test.verify.injectedParameters import org.koin.test.verify.verify import org.meshtastic.app.map.MapViewModel +import org.meshtastic.core.ble.BleLogFormat +import org.meshtastic.core.ble.BleLogLevel import org.meshtastic.core.model.util.NodeIdLookup import org.meshtastic.feature.node.metrics.MetricsViewModel import kotlin.test.Test @@ -53,6 +55,11 @@ class KoinVerificationTest { NodeIdLookup::class, HttpClient::class, HttpClientEngine::class, + // BleLoggingConfig is a data class assembled by a factory function. Koin Verify + // still introspects its constructor params, so the wrapping enums need to be + // declared as known types even though they're never resolved from the graph. + BleLogLevel::class, + BleLogFormat::class, ), injections = injectedParameters( diff --git a/core/ble/src/androidMain/kotlin/org/meshtastic/core/ble/KablePlatformSetup.kt b/core/ble/src/androidMain/kotlin/org/meshtastic/core/ble/KablePlatformSetup.kt index 018d8f9fc..e9b4a58cc 100644 --- a/core/ble/src/androidMain/kotlin/org/meshtastic/core/ble/KablePlatformSetup.kt +++ b/core/ble/src/androidMain/kotlin/org/meshtastic/core/ble/KablePlatformSetup.kt @@ -43,15 +43,21 @@ internal actual fun PeripheralBuilder.platformConfig(device: BleDevice, autoConn threadingStrategy = sharedThreadingStrategy + // We intentionally keep Kable's defaults for `transport` (Le) and `phy` (Le1M). + // Meshtastic radios (nRF52, ESP32-S3, RP2040+nRF) advertise BLE-only and don't support + // the LE 2M PHY in any first-party firmware, so changing these would be a regression risk + // with no upside. If a future hardware revision exposes 2M PHY, override `phy = Phy.Le2M` + // here after confirming the firmware advertises it. + onServicesDiscovered { try { // Android defaults to 23 bytes MTU. Meshtastic packets can be 512 bytes. // Requesting the max MTU is critical for preventing dropped packets and stalls. @Suppress("MagicNumber") val negotiatedMtu = requestMtu(512) - Logger.i { "Negotiated MTU: $negotiatedMtu" } + Logger.i { "[${device.address}] Negotiated MTU: $negotiatedMtu" } } catch (@Suppress("TooGenericExceptionCaught") e: Exception) { - Logger.w(e) { "Failed to request MTU" } + Logger.w(e) { "[${device.address}] Failed to request MTU" } } } } diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleConnection.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleConnection.kt index 5a8b67ce1..4eb718e5b 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleConnection.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleConnection.kt @@ -18,7 +18,7 @@ package org.meshtastic.core.ble import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.onStart import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds @@ -41,11 +41,17 @@ interface BleConnection { /** The currently connected [BleDevice], or null if not connected. */ val device: BleDevice? - /** A flow of the current device. */ - val deviceFlow: SharedFlow + /** + * A flow of the current device. [StateFlow] semantics: replays the latest value to new collectors and conflates + * rapid updates. + */ + val deviceFlow: StateFlow - /** A flow of [BleConnectionState] changes. */ - val connectionState: SharedFlow + /** + * A flow of [BleConnectionState] changes. [StateFlow] semantics ensure the latest state is always observable and + * distinct-equals deduplication avoids spurious re-emissions. + */ + val connectionState: StateFlow /** Connects to the given [BleDevice]. */ suspend fun connect(device: BleDevice) diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleLoggingConfig.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleLoggingConfig.kt new file mode 100644 index 000000000..9cd934368 --- /dev/null +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleLoggingConfig.kt @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.ble + +import com.juul.kable.logs.Logging + +/** + * Verbosity for Kable's internal BLE logging. Wraps [Logging.Level] so callers and the Koin DI graph don't leak Kable + * types into modules that don't directly depend on Kable. + */ +enum class BleLogLevel { + /** Only failures (lowest noise; default in release builds). */ + Warnings, + + /** [Warnings] plus connect / disconnect / subscribe / GATT operation events. */ + Events, + + /** [Events] plus a hex dump of every read/write/notify payload. Very noisy. */ + Data, +} + +/** + * Format for Kable's internal BLE log entries. + * + * [Compact] keeps each entry on a single line — strongly preferred for `adb logcat`, grep, and bug reports. [Multiline] + * pretty-prints across several lines, which is harder to read in tooling. + */ +enum class BleLogFormat { + Compact, + Multiline, +} + +/** + * Verbosity and formatting controls for Kable's internal BLE logging. + * + * @see BleLogLevel for the verbosity scale. + * @see BleLogFormat for the layout choices. + */ +data class BleLoggingConfig(val level: BleLogLevel, val format: BleLogFormat = BleLogFormat.Compact) { + companion object { + /** Quiet defaults suitable for release builds — only warnings, single-line. */ + val Release: BleLoggingConfig = BleLoggingConfig(level = BleLogLevel.Warnings) + + /** Verbose defaults suitable for debug builds — every BLE event, single-line. */ + val Debug: BleLoggingConfig = BleLoggingConfig(level = BleLogLevel.Events) + } +} + +internal fun BleLogLevel.toKable(): Logging.Level = when (this) { + BleLogLevel.Warnings -> Logging.Level.Warnings + BleLogLevel.Events -> Logging.Level.Events + BleLogLevel.Data -> Logging.Level.Data +} + +internal fun BleLogFormat.toKable(): Logging.Format = when (this) { + BleLogFormat.Compact -> Logging.Format.Compact + BleLogFormat.Multiline -> Logging.Format.Multiline +} + +/** Applies this [BleLoggingConfig] to a Kable `logging { }` block, routing through [KermitLogEngine]. */ +internal fun Logging.applyConfig(config: BleLoggingConfig, identifier: String? = null) { + engine = KermitLogEngine + level = config.level.toKable() + format = config.format.toKable() + if (identifier != null) { + this.identifier = identifier + } +} diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleRetry.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleRetry.kt index 5e85a52f8..a58b91aca 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleRetry.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleRetry.kt @@ -19,20 +19,33 @@ package org.meshtastic.core.ble import co.touchlab.kermit.Logger import kotlinx.coroutines.CancellationException import kotlinx.coroutines.delay +import kotlin.math.pow +import kotlin.random.Random + +/** Cap on the per-attempt backoff to prevent unbounded growth. */ +private const val MAX_RETRY_DELAY_MS = 2_000L + +/** Multiplicative growth factor between attempts (delay doubles each time). */ +private const val BACKOFF_FACTOR = 2.0 /** - * Retries a BLE operation a specified number of times with a delay between attempts. + * Retries a BLE operation with bounded exponential backoff and jitter. * - * @param count The number of attempts to make. - * @param delayMs The delay in milliseconds between attempts. - * @param tag A tag for logging. + * Each retry waits `delayMs * 2^(attempt-1)`, capped at [MAX_RETRY_DELAY_MS], with a random ±25% jitter applied to + * avoid synchronised retry storms when multiple operations fail in lockstep (e.g. a TX/RX pair both failing the same + * `STATUS_GATT_BUSY` window). + * + * @param count Total attempt count (default 3). + * @param delayMs Initial delay before the first retry. Subsequent delays grow exponentially. + * @param tag Tag for log prefixes. * @param block The operation to perform. * @return The result of the operation. - * @throws Exception if the operation fails after all attempts. + * @throws Exception If the operation fails after all attempts. [CancellationException] is always re-thrown immediately. */ +@Suppress("MagicNumber") suspend fun retryBleOperation( count: Int = 3, - delayMs: Long = 500L, + delayMs: Long = 250L, tag: String = "BLE", block: suspend () -> T, ): T { @@ -48,8 +61,12 @@ suspend fun retryBleOperation( Logger.w(e) { "[$tag] BLE operation failed after $count attempts, giving up" } throw e } - Logger.w(e) { "[$tag] BLE operation failed (attempt $currentAttempt/$count), retrying in ${delayMs}ms..." } - delay(delayMs) + val backoffMs = (delayMs * BACKOFF_FACTOR.pow(currentAttempt - 1)).toLong().coerceAtMost(MAX_RETRY_DELAY_MS) + val jitterRange = (backoffMs / 4).coerceAtLeast(1L) + val jitter = Random.nextLong(-jitterRange, jitterRange + 1) + val sleepMs = (backoffMs + jitter).coerceAtLeast(0L) + Logger.w(e) { "[$tag] BLE operation failed (attempt $currentAttempt/$count), retrying in ${sleepMs}ms..." } + delay(sleepMs) } } } diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleConnection.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleConnection.kt index a3808d3e6..d64a88dde 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleConnection.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleConnection.kt @@ -22,18 +22,16 @@ import com.juul.kable.PeripheralBuilder import com.juul.kable.State import com.juul.kable.WriteType import com.juul.kable.characteristicOf -import com.juul.kable.logs.Logging import com.juul.kable.writeWithoutResponse import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.TimeoutCancellationException -import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.SharedFlow -import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.job @@ -90,7 +88,8 @@ class KableBleService(private val peripheral: Peripheral, private val serviceUui * fall back to `autoConnect = true` on failure. Only two attempts are made per [connect] call — the caller * ([BleRadioTransport]) owns the macro-level retry/backoff loop. */ -class KableBleConnection(private val scope: CoroutineScope) : BleConnection { +class KableBleConnection(private val scope: CoroutineScope, private val loggingConfig: BleLoggingConfig) : + BleConnection { @Volatile private var peripheral: Peripheral? = null @@ -103,19 +102,15 @@ class KableBleConnection(private val scope: CoroutineScope) : BleConnection { private val AUTOCONNECT_FALLBACK_DELAY = 1.seconds } - private val _deviceFlow = MutableSharedFlow(replay = 1) - override val deviceFlow: SharedFlow = _deviceFlow.asSharedFlow() + private val _deviceFlow = MutableStateFlow(null) + override val deviceFlow: StateFlow = _deviceFlow.asStateFlow() override val device: BleDevice? - get() = _deviceFlow.replayCache.firstOrNull() + get() = _deviceFlow.value private val _connectionState = - MutableSharedFlow( - replay = 1, - extraBufferCapacity = 1, - onBufferOverflow = BufferOverflow.DROP_OLDEST, - ) - override val connectionState: SharedFlow = _connectionState.asSharedFlow() + MutableStateFlow(BleConnectionState.Disconnected(DisconnectReason.Unknown)) + override val connectionState: StateFlow = _connectionState.asStateFlow() @Suppress("CyclomaticComplexMethod", "LongMethod") override suspend fun connect(device: BleDevice) { @@ -124,11 +119,7 @@ class KableBleConnection(private val scope: CoroutineScope) : BleConnection { /** Applies logging, observation exception handling, and platform config shared by both peripheral types. */ fun PeripheralBuilder.commonConfig() { - logging { - engine = KermitLogEngine - level = Logging.Level.Events - identifier = device.address - } + logging { applyConfig(loggingConfig, identifier = device.address) } observationExceptionHandler { cause -> Logger.w(cause) { "[${device.address}] Observation failure suppressed" } } @@ -182,8 +173,11 @@ class KableBleConnection(private val scope: CoroutineScope) : BleConnection { throw e } catch (@Suppress("TooGenericExceptionCaught", "SwallowedException") e: Exception) { if (autoConnect) { - // autoConnect already true and still failed — don't loop forever. - Logger.w { "[${device.address}] autoConnect attempt failed, giving up" } + // Already on the autoConnect path and still failing: surface a clear Disconnected + // and let the outer reconnect loop (BleRadioTransport) own the macro retry budget. + Logger.w { + "[${device.address}] autoConnect attempt also failed; deferring to outer reconnect loop" + } _connectionState.emit(BleConnectionState.Disconnected(DisconnectReason.ConnectionFailed)) throw e } diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleConnectionFactory.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleConnectionFactory.kt index 13b8a1663..8fb34aa3b 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleConnectionFactory.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleConnectionFactory.kt @@ -20,12 +20,12 @@ import kotlinx.coroutines.CoroutineScope import org.koin.core.annotation.Single @Single -class KableBleConnectionFactory : BleConnectionFactory { +class KableBleConnectionFactory(private val loggingConfig: BleLoggingConfig) : BleConnectionFactory { /** * Creates a new [KableBleConnection]. * * [tag] is unused because Kable's own log identifier is set per-peripheral inside [KableBleConnection.connect] * using the device address, which provides more precise context than a factory-time tag. */ - override fun create(scope: CoroutineScope, tag: String): BleConnection = KableBleConnection(scope) + override fun create(scope: CoroutineScope, tag: String): BleConnection = KableBleConnection(scope, loggingConfig) } diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleScanner.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleScanner.kt index 5e91b3459..875978ceb 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleScanner.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleScanner.kt @@ -17,7 +17,6 @@ package org.meshtastic.core.ble import com.juul.kable.Scanner -import com.juul.kable.logs.Logging import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.withTimeoutOrNull @@ -26,13 +25,10 @@ import kotlin.time.Duration import kotlin.uuid.Uuid @Single -class KableBleScanner : BleScanner { +class KableBleScanner(private val loggingConfig: BleLoggingConfig) : BleScanner { override fun scan(timeout: Duration, serviceUuid: Uuid?, address: String?): Flow { val scanner = Scanner { - logging { - engine = KermitLogEngine - level = Logging.Level.Events - } + logging { applyConfig(loggingConfig) } // Use separate match blocks so each filter is evaluated independently (OR semantics). // Combining address and service UUID in a single match{} creates an AND filter which // silently drops results on OEM stacks (Samsung, Xiaomi) when the device uses a diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableMeshtasticRadioProfile.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableMeshtasticRadioProfile.kt index 3f0e61864..8ecb253bf 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableMeshtasticRadioProfile.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableMeshtasticRadioProfile.kt @@ -47,15 +47,24 @@ class KableMeshtasticRadioProfile(private val service: BleService) : MeshtasticR private val fromNum = service.characteristic(FROMNUM_CHARACTERISTIC) private val logRadioChar = service.characteristic(LOGRADIO_CHARACTERISTIC) + /** + * Cached preferred write type for [toRadio]. Resolved once at construction so the hot send path doesn't have to + * walk the discovered services list on every packet. + */ + private val toRadioWriteType: BleWriteType = service.preferredWriteType(toRadio) + companion object { private val TRANSIENT_RETRY_DELAY = 500.milliseconds } private val subscriptionReady = CompletableDeferred() - /** Seed with replay=1 so the config-handshake drain starts before FROMNUM notifications are gated in. */ + /** + * Latched signal: a single buffered slot collapses bursts of drain triggers into one pending poll. Capacity 1 with + * DROP_OLDEST means we never block writers and never let stale drain requests pile up. + */ private val triggerDrain = - MutableSharedFlow(replay = 1, extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.DROP_OLDEST) + MutableSharedFlow(replay = 1, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) @Suppress("TooGenericExceptionCaught", "SwallowedException") override val fromRadio: Flow = channelFlow { @@ -97,14 +106,15 @@ class KableMeshtasticRadioProfile(private val service: BleService) : MeshtasticR if (service.hasCharacteristic(logRadioChar)) { service.observe(logRadioChar).catch { e -> if (e is CancellationException) throw e - // logRadio is optional — swallow observation errors silently. + // logRadio is optional — log at debug for diagnostics but don't surface to callers. + Logger.d(e) { "logRadio observation failure suppressed" } } } else { emptyFlow() } override suspend fun sendToRadio(packet: ByteArray) { - service.write(toRadio, packet, service.preferredWriteType(toRadio)) + service.write(toRadio, packet, toRadioWriteType) triggerDrain.tryEmit(Unit) } diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/di/CoreBleModule.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/di/CoreBleModule.kt index f064fcb63..6302c4af1 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/di/CoreBleModule.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/di/CoreBleModule.kt @@ -18,7 +18,18 @@ package org.meshtastic.core.ble.di import org.koin.core.annotation.ComponentScan import org.koin.core.annotation.Module +import org.koin.core.annotation.Single +import org.meshtastic.core.ble.BleLoggingConfig +import org.meshtastic.core.common.BuildConfigProvider @Module @ComponentScan("org.meshtastic.core.ble") -class CoreBleModule +class CoreBleModule { + /** + * Quiet by default in release; verbose (Kable [Events][com.juul.kable.logs.Logging.Level.Events]) in debug builds. + * Always single-line for grep/logcat friendliness. + */ + @Single + fun provideBleLoggingConfig(buildConfig: BuildConfigProvider): BleLoggingConfig = + if (buildConfig.isDebug) BleLoggingConfig.Debug else BleLoggingConfig.Release +} diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/BleRadioTransport.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/BleRadioTransport.kt index 9f1b530a8..7da67a54a 100644 --- a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/BleRadioTransport.kt +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/BleRadioTransport.kt @@ -26,9 +26,9 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach @@ -258,24 +258,25 @@ class BleRadioTransport( isFullyConnected = true onConnected() - // Scope the connectionState listener to this iteration so it's - // cancelled automatically before the next reconnect cycle. - var disconnectReason: DisconnectReason = DisconnectReason.Unknown - coroutineScope { - bleConnection.connectionState - .onEach { s -> - if (s is BleConnectionState.Disconnected && isFullyConnected) { - isFullyConnected = false - disconnectReason = s.reason - onDisconnected() - } - } - .catch { e -> Logger.w(e) { "[$address] bleConnection.connectionState flow crashed" } } - .launchIn(this) + discoverServicesAndSetupCharacteristics() - discoverServicesAndSetupCharacteristics() + // Wait for the StateFlow to actually reflect Connected before watching for the next + // Disconnected. connectAndAwait returns synchronously based on the underlying Kable + // peripheral state, but our _connectionState observer runs on a separate coroutine and + // may lag. Without this gate the next .first { Disconnected } below could match the + // *previous* cycle's stale Disconnected value and fire immediately, breaking reconnect. + bleConnection.connectionState.first { it is BleConnectionState.Connected } - bleConnection.connectionState.first { it is BleConnectionState.Disconnected } + // Suspend until the next Disconnected emission. We deliberately do NOT wrap this in a + // coroutineScope { launchIn(...); first(...) } pattern: launching a hot StateFlow + // collector inside coroutineScope hangs the scope after .first returns (the launched + // collector never completes naturally, and coroutineScope waits for all children). + val disconnectedState = + bleConnection.connectionState.filterIsInstance().first() + val disconnectReason = disconnectedState.reason + if (isFullyConnected) { + isFullyConnected = false + onDisconnected() } Logger.i { "[$address] BLE connection dropped (reason: $disconnectReason), preparing to reconnect" } @@ -350,6 +351,13 @@ class BleRadioTransport( val maxLen = bleConnection.maximumWriteValueLength(BleWriteType.WITHOUT_RESPONSE) Logger.i { "[$address] BLE Radio Session Ready. Max write length (WITHOUT_RESPONSE): $maxLen bytes" } + // Ask the platform for a low-latency / high-throughput connection interval + // (~7.5 ms on Android). The Meshtastic firmware happily accepts this and it + // materially speeds up the initial config drain and any bulk fromRadio reads. + if (bleConnection.requestHighConnectionPriority()) { + Logger.d { "[$address] Requested high BLE connection priority" } + } + this@BleRadioTransport.callback.onConnect() } } catch (e: CancellationException) { diff --git a/core/network/src/commonTest/kotlin/org/meshtastic/core/network/radio/BleRadioTransportReconnectCrashTest.kt b/core/network/src/commonTest/kotlin/org/meshtastic/core/network/radio/BleRadioTransportReconnectCrashTest.kt index 9b5cee7b7..c1835e788 100644 --- a/core/network/src/commonTest/kotlin/org/meshtastic/core/network/radio/BleRadioTransportReconnectCrashTest.kt +++ b/core/network/src/commonTest/kotlin/org/meshtastic/core/network/radio/BleRadioTransportReconnectCrashTest.kt @@ -24,9 +24,9 @@ import dev.mokkery.mock import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.SharedFlow -import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.test.advanceTimeBy import kotlinx.coroutines.test.runTest import org.meshtastic.core.ble.BleConnection @@ -35,6 +35,7 @@ import org.meshtastic.core.ble.BleConnectionState import org.meshtastic.core.ble.BleDevice import org.meshtastic.core.ble.BleService import org.meshtastic.core.ble.BleWriteType +import org.meshtastic.core.ble.DisconnectReason import org.meshtastic.core.testing.FakeBleConnection import org.meshtastic.core.testing.FakeBleConnectionFactory import org.meshtastic.core.testing.FakeBleDevice @@ -227,6 +228,63 @@ class BleRadioTransportReconnectCrashTest { "disconnect() must be called after CancellationException in profile() — GATT leak fix", ) } + + // ─── Reconnect after a stable connection drops ─────────────────────────────────────────────── + + /** + * Regression test for the BLE reconnect hang. + * + * Symptom: after a stable connection (uptime > minStableConnection) was terminated by a remote disconnect (e.g. + * node power-cycle), the transport's reconnect loop never iterated — `attemptConnection` ran exactly once, the GATT + * disconnect callback fired, and then nothing. + * + * Root cause: `attemptConnection` wrapped its disconnect-watcher in a `coroutineScope { + * connectionState.onEach{...}.launchIn(this); connectionState.first { Disconnected } }` block. `coroutineScope` + * waits for ALL launched children before returning, but the `.launchIn` collector on a hot `StateFlow` (or + * `SharedFlow(replay=1)`) never completes naturally. After `.first` returned, the scope hung forever, blocking + * `BleReconnectPolicy.execute` from issuing the next attempt. + * + * This test exercises the full happy-path reconnect cycle: connect → stable uptime → external disconnect → expect a + * second `connectAndAwait` call. With the bug present, only one `connectAndAwait` call ever happens. + */ + @Test + fun `transport reconnects after a stable connection is dropped remotely`() = runTest { + val device = FakeBleDevice(address = address, name = "Test Radio") + bluetoothRepository.bond(device) + + val bleTransport = + BleRadioTransport( + scope = this, + scanner = scanner, + bluetoothRepository = bluetoothRepository, + connectionFactory = connectionFactory, + callback = service, + address = address, + ) + bleTransport.start() + + // Settle delay (3 s) + connect + handshake. + advanceTimeBy(4_000L) + assertTrue(connection.connectAndAwaitCalls == 1, "First connect must happen during initial start window") + + // Stay connected long enough to be considered stable (> minStableConnection = 5 s). + advanceTimeBy(10_000L) + + // Simulate the firmware dying mid-session — the same path a node power-cycle takes. + connection.simulateRemoteDisconnect(reason = DisconnectReason.Timeout) + + // Settle delay (3 s) before the next attempt + re-connect window. Generous to absorb + // the policy retry backoff (5 s on first failure) plus another 3 s settle delay. + advanceTimeBy(30_000L) + + assertTrue( + connection.connectAndAwaitCalls >= 2, + "Reconnect loop must call connectAndAwait again after a remote disconnect " + + "(actual calls: ${connection.connectAndAwaitCalls})", + ) + + bleTransport.close() + } } // ─── Test doubles ──────────────────────────────────────────────────────────────────────────────── @@ -237,19 +295,19 @@ class BleRadioTransportReconnectCrashTest { */ private class CancellingProfileBleConnection : BleConnection { - private val _deviceFlow = MutableSharedFlow(replay = 1) - override val deviceFlow: SharedFlow = _deviceFlow.asSharedFlow() + private val _deviceFlow = MutableStateFlow(null) + override val deviceFlow: StateFlow = _deviceFlow.asStateFlow() - private val _connectionState = MutableSharedFlow(replay = 1) - override val connectionState: SharedFlow = _connectionState.asSharedFlow() + private val _connectionState = MutableStateFlow(BleConnectionState.Disconnected()) + override val connectionState: StateFlow = _connectionState.asStateFlow() override val device: BleDevice? = null var disconnectCalls = 0 override suspend fun connect(device: BleDevice) { - _deviceFlow.emit(device) - _connectionState.emit(BleConnectionState.Connected) + _deviceFlow.value = device + _connectionState.value = BleConnectionState.Connected } override suspend fun connectAndAwait(device: BleDevice, timeout: Duration): BleConnectionState { @@ -259,8 +317,8 @@ private class CancellingProfileBleConnection : BleConnection { override suspend fun disconnect() { disconnectCalls++ - _connectionState.emit(BleConnectionState.Disconnected()) - _deviceFlow.emit(null) + _connectionState.value = BleConnectionState.Disconnected() + _deviceFlow.value = null } override suspend fun profile( diff --git a/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeBle.kt b/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeBle.kt index f2001da86..bed4b1146 100644 --- a/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeBle.kt +++ b/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeBle.kt @@ -20,9 +20,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flow @@ -36,6 +34,7 @@ import org.meshtastic.core.ble.BleService import org.meshtastic.core.ble.BleWriteType import org.meshtastic.core.ble.BluetoothRepository import org.meshtastic.core.ble.BluetoothState +import org.meshtastic.core.ble.DisconnectReason import kotlin.time.Duration import kotlin.uuid.Uuid @@ -91,11 +90,10 @@ class FakeBleConnection : override val device: BleDevice? get() = _device.value - private val _deviceFlow = mutableSharedFlow(replay = 1) - override val deviceFlow: SharedFlow = _deviceFlow.asSharedFlow() + override val deviceFlow: StateFlow = _device.asStateFlow() - private val _connectionState = mutableSharedFlow(replay = 1) - override val connectionState: SharedFlow = _connectionState.asSharedFlow() + private val _connectionState = mutableStateFlow(BleConnectionState.Disconnected()) + override val connectionState: StateFlow = _connectionState.asStateFlow() /** When > 0, the next [failNextN] calls to [connectAndAwait] return [BleConnectionState.Disconnected]. */ var failNextN: Int = 0 @@ -109,6 +107,14 @@ class FakeBleConnection : /** Number of times [disconnect] has been invoked. */ var disconnectCalls: Int = 0 + /** Number of times [connectAndAwait] has been invoked (including failures). */ + var connectAndAwaitCalls: Int = 0 + + /** Externally simulate a remote disconnect (e.g. node power-cycle) for tests that exercise reconnect. */ + fun simulateRemoteDisconnect(reason: DisconnectReason = DisconnectReason.Timeout) { + _connectionState.value = BleConnectionState.Disconnected(reason) + } + /** Service UUIDs that should appear missing — `profile()` throws `NoSuchElementException` for these. */ val missingServices: MutableSet = mutableSetOf() @@ -116,18 +122,18 @@ class FakeBleConnection : override suspend fun connect(device: BleDevice) { _device.value = device - _deviceFlow.emit(device) - _connectionState.emit(BleConnectionState.Connecting) + _connectionState.value = BleConnectionState.Connecting if (device is FakeBleDevice) { device.setState(BleConnectionState.Connecting) } - _connectionState.emit(BleConnectionState.Connected) + _connectionState.value = BleConnectionState.Connected if (device is FakeBleDevice) { device.setState(BleConnectionState.Connected) } } override suspend fun connectAndAwait(device: BleDevice, timeout: Duration): BleConnectionState { + connectAndAwaitCalls++ connectException?.let { throw it } if (failNextN > 0) { failNextN-- @@ -140,12 +146,11 @@ class FakeBleConnection : override suspend fun disconnect() { disconnectCalls++ val currentDevice = _device.value - _connectionState.emit(BleConnectionState.Disconnected()) + _connectionState.value = BleConnectionState.Disconnected() if (currentDevice is FakeBleDevice) { currentDevice.setState(BleConnectionState.Disconnected()) } _device.value = null - _deviceFlow.emit(null) } override suspend fun profile( diff --git a/desktop/src/test/kotlin/org/meshtastic/desktop/di/DesktopKoinTest.kt b/desktop/src/test/kotlin/org/meshtastic/desktop/di/DesktopKoinTest.kt index b1136e71a..d18626660 100644 --- a/desktop/src/test/kotlin/org/meshtastic/desktop/di/DesktopKoinTest.kt +++ b/desktop/src/test/kotlin/org/meshtastic/desktop/di/DesktopKoinTest.kt @@ -23,6 +23,8 @@ import kotlinx.coroutines.CoroutineDispatcher import org.koin.core.annotation.KoinExperimentalAPI import org.koin.dsl.module import org.koin.test.verify.verify +import org.meshtastic.core.ble.BleLogFormat +import org.meshtastic.core.ble.BleLogLevel import kotlin.test.Test @OptIn(KoinExperimentalAPI::class) @@ -41,6 +43,11 @@ class DesktopKoinTest { CoroutineDispatcher::class, HttpClient::class, HttpClientEngine::class, + // BleLoggingConfig is a data class assembled by a factory function. Koin Verify + // still introspects its constructor params, so the wrapping enums need to be + // declared as known types even though they're never resolved from the graph. + BleLogLevel::class, + BleLogFormat::class, ), ) } diff --git a/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/dfu/LegacyDfuTransportTest.kt b/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/dfu/LegacyDfuTransportTest.kt index 4504e460d..42ddaad70 100644 --- a/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/dfu/LegacyDfuTransportTest.kt +++ b/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/dfu/LegacyDfuTransportTest.kt @@ -22,7 +22,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.test.runTest import org.meshtastic.core.ble.BleCharacteristic import org.meshtastic.core.ble.BleConnection @@ -510,10 +510,10 @@ class LegacyDfuTransportTest { override val device: BleDevice? get() = delegate.device - override val deviceFlow: SharedFlow + override val deviceFlow: StateFlow get() = delegate.deviceFlow - override val connectionState: SharedFlow + override val connectionState: StateFlow get() = delegate.connectionState override suspend fun connect(device: BleDevice) = delegate.connect(device) diff --git a/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuTransportTest.kt b/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuTransportTest.kt index 454aadaa2..964916d51 100644 --- a/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuTransportTest.kt +++ b/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuTransportTest.kt @@ -21,7 +21,7 @@ package org.meshtastic.feature.firmware.ota.dfu import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.test.runTest import org.meshtastic.core.ble.BleCharacteristic import org.meshtastic.core.ble.BleConnection @@ -632,10 +632,10 @@ class SecureDfuTransportTest { override val device: BleDevice? get() = delegate.device - override val deviceFlow: SharedFlow + override val deviceFlow: StateFlow get() = delegate.deviceFlow - override val connectionState: SharedFlow + override val connectionState: StateFlow get() = delegate.connectionState override suspend fun connect(device: BleDevice) = delegate.connect(device)