mirror of
https://github.com/meshtastic/Meshtastic-Android.git
synced 2026-05-12 00:28:20 -04:00
fix(ble): unblock reconnect + kable audit (logging, priority, backoff, StateFlow) (#5222)
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -30,6 +30,8 @@ import org.koin.test.verify.definition
|
||||
import org.koin.test.verify.injectedParameters
|
||||
import org.koin.test.verify.verify
|
||||
import org.meshtastic.app.map.MapViewModel
|
||||
import org.meshtastic.core.ble.BleLogFormat
|
||||
import org.meshtastic.core.ble.BleLogLevel
|
||||
import org.meshtastic.core.model.util.NodeIdLookup
|
||||
import org.meshtastic.feature.node.metrics.MetricsViewModel
|
||||
import kotlin.test.Test
|
||||
@@ -53,6 +55,11 @@ class KoinVerificationTest {
|
||||
NodeIdLookup::class,
|
||||
HttpClient::class,
|
||||
HttpClientEngine::class,
|
||||
// BleLoggingConfig is a data class assembled by a factory function. Koin Verify
|
||||
// still introspects its constructor params, so the wrapping enums need to be
|
||||
// declared as known types even though they're never resolved from the graph.
|
||||
BleLogLevel::class,
|
||||
BleLogFormat::class,
|
||||
),
|
||||
injections =
|
||||
injectedParameters(
|
||||
|
||||
@@ -43,15 +43,21 @@ internal actual fun PeripheralBuilder.platformConfig(device: BleDevice, autoConn
|
||||
|
||||
threadingStrategy = sharedThreadingStrategy
|
||||
|
||||
// We intentionally keep Kable's defaults for `transport` (Le) and `phy` (Le1M).
|
||||
// Meshtastic radios (nRF52, ESP32-S3, RP2040+nRF) advertise BLE-only and don't support
|
||||
// the LE 2M PHY in any first-party firmware, so changing these would be a regression risk
|
||||
// with no upside. If a future hardware revision exposes 2M PHY, override `phy = Phy.Le2M`
|
||||
// here after confirming the firmware advertises it.
|
||||
|
||||
onServicesDiscovered {
|
||||
try {
|
||||
// Android defaults to 23 bytes MTU. Meshtastic packets can be 512 bytes.
|
||||
// Requesting the max MTU is critical for preventing dropped packets and stalls.
|
||||
@Suppress("MagicNumber")
|
||||
val negotiatedMtu = requestMtu(512)
|
||||
Logger.i { "Negotiated MTU: $negotiatedMtu" }
|
||||
Logger.i { "[${device.address}] Negotiated MTU: $negotiatedMtu" }
|
||||
} catch (@Suppress("TooGenericExceptionCaught") e: Exception) {
|
||||
Logger.w(e) { "Failed to request MTU" }
|
||||
Logger.w(e) { "[${device.address}] Failed to request MTU" }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ package org.meshtastic.core.ble
|
||||
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.SharedFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.onStart
|
||||
import kotlin.time.Duration
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
@@ -41,11 +41,17 @@ interface BleConnection {
|
||||
/** The currently connected [BleDevice], or null if not connected. */
|
||||
val device: BleDevice?
|
||||
|
||||
/** A flow of the current device. */
|
||||
val deviceFlow: SharedFlow<BleDevice?>
|
||||
/**
|
||||
* A flow of the current device. [StateFlow] semantics: replays the latest value to new collectors and conflates
|
||||
* rapid updates.
|
||||
*/
|
||||
val deviceFlow: StateFlow<BleDevice?>
|
||||
|
||||
/** A flow of [BleConnectionState] changes. */
|
||||
val connectionState: SharedFlow<BleConnectionState>
|
||||
/**
|
||||
* A flow of [BleConnectionState] changes. [StateFlow] semantics ensure the latest state is always observable and
|
||||
* distinct-equals deduplication avoids spurious re-emissions.
|
||||
*/
|
||||
val connectionState: StateFlow<BleConnectionState>
|
||||
|
||||
/** Connects to the given [BleDevice]. */
|
||||
suspend fun connect(device: BleDevice)
|
||||
|
||||
@@ -0,0 +1,82 @@
|
||||
/*
|
||||
* Copyright (c) 2026 Meshtastic LLC
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package org.meshtastic.core.ble
|
||||
|
||||
import com.juul.kable.logs.Logging
|
||||
|
||||
/**
|
||||
* Verbosity for Kable's internal BLE logging. Wraps [Logging.Level] so callers and the Koin DI graph don't leak Kable
|
||||
* types into modules that don't directly depend on Kable.
|
||||
*/
|
||||
enum class BleLogLevel {
|
||||
/** Only failures (lowest noise; default in release builds). */
|
||||
Warnings,
|
||||
|
||||
/** [Warnings] plus connect / disconnect / subscribe / GATT operation events. */
|
||||
Events,
|
||||
|
||||
/** [Events] plus a hex dump of every read/write/notify payload. Very noisy. */
|
||||
Data,
|
||||
}
|
||||
|
||||
/**
|
||||
* Format for Kable's internal BLE log entries.
|
||||
*
|
||||
* [Compact] keeps each entry on a single line — strongly preferred for `adb logcat`, grep, and bug reports. [Multiline]
|
||||
* pretty-prints across several lines, which is harder to read in tooling.
|
||||
*/
|
||||
enum class BleLogFormat {
|
||||
Compact,
|
||||
Multiline,
|
||||
}
|
||||
|
||||
/**
|
||||
* Verbosity and formatting controls for Kable's internal BLE logging.
|
||||
*
|
||||
* @see BleLogLevel for the verbosity scale.
|
||||
* @see BleLogFormat for the layout choices.
|
||||
*/
|
||||
data class BleLoggingConfig(val level: BleLogLevel, val format: BleLogFormat = BleLogFormat.Compact) {
|
||||
companion object {
|
||||
/** Quiet defaults suitable for release builds — only warnings, single-line. */
|
||||
val Release: BleLoggingConfig = BleLoggingConfig(level = BleLogLevel.Warnings)
|
||||
|
||||
/** Verbose defaults suitable for debug builds — every BLE event, single-line. */
|
||||
val Debug: BleLoggingConfig = BleLoggingConfig(level = BleLogLevel.Events)
|
||||
}
|
||||
}
|
||||
|
||||
internal fun BleLogLevel.toKable(): Logging.Level = when (this) {
|
||||
BleLogLevel.Warnings -> Logging.Level.Warnings
|
||||
BleLogLevel.Events -> Logging.Level.Events
|
||||
BleLogLevel.Data -> Logging.Level.Data
|
||||
}
|
||||
|
||||
internal fun BleLogFormat.toKable(): Logging.Format = when (this) {
|
||||
BleLogFormat.Compact -> Logging.Format.Compact
|
||||
BleLogFormat.Multiline -> Logging.Format.Multiline
|
||||
}
|
||||
|
||||
/** Applies this [BleLoggingConfig] to a Kable `logging { }` block, routing through [KermitLogEngine]. */
|
||||
internal fun Logging.applyConfig(config: BleLoggingConfig, identifier: String? = null) {
|
||||
engine = KermitLogEngine
|
||||
level = config.level.toKable()
|
||||
format = config.format.toKable()
|
||||
if (identifier != null) {
|
||||
this.identifier = identifier
|
||||
}
|
||||
}
|
||||
@@ -19,20 +19,33 @@ package org.meshtastic.core.ble
|
||||
import co.touchlab.kermit.Logger
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlin.math.pow
|
||||
import kotlin.random.Random
|
||||
|
||||
/** Cap on the per-attempt backoff to prevent unbounded growth. */
|
||||
private const val MAX_RETRY_DELAY_MS = 2_000L
|
||||
|
||||
/** Multiplicative growth factor between attempts (delay doubles each time). */
|
||||
private const val BACKOFF_FACTOR = 2.0
|
||||
|
||||
/**
|
||||
* Retries a BLE operation a specified number of times with a delay between attempts.
|
||||
* Retries a BLE operation with bounded exponential backoff and jitter.
|
||||
*
|
||||
* @param count The number of attempts to make.
|
||||
* @param delayMs The delay in milliseconds between attempts.
|
||||
* @param tag A tag for logging.
|
||||
* Each retry waits `delayMs * 2^(attempt-1)`, capped at [MAX_RETRY_DELAY_MS], with a random ±25% jitter applied to
|
||||
* avoid synchronised retry storms when multiple operations fail in lockstep (e.g. a TX/RX pair both failing the same
|
||||
* `STATUS_GATT_BUSY` window).
|
||||
*
|
||||
* @param count Total attempt count (default 3).
|
||||
* @param delayMs Initial delay before the first retry. Subsequent delays grow exponentially.
|
||||
* @param tag Tag for log prefixes.
|
||||
* @param block The operation to perform.
|
||||
* @return The result of the operation.
|
||||
* @throws Exception if the operation fails after all attempts.
|
||||
* @throws Exception If the operation fails after all attempts. [CancellationException] is always re-thrown immediately.
|
||||
*/
|
||||
@Suppress("MagicNumber")
|
||||
suspend fun <T> retryBleOperation(
|
||||
count: Int = 3,
|
||||
delayMs: Long = 500L,
|
||||
delayMs: Long = 250L,
|
||||
tag: String = "BLE",
|
||||
block: suspend () -> T,
|
||||
): T {
|
||||
@@ -48,8 +61,12 @@ suspend fun <T> retryBleOperation(
|
||||
Logger.w(e) { "[$tag] BLE operation failed after $count attempts, giving up" }
|
||||
throw e
|
||||
}
|
||||
Logger.w(e) { "[$tag] BLE operation failed (attempt $currentAttempt/$count), retrying in ${delayMs}ms..." }
|
||||
delay(delayMs)
|
||||
val backoffMs = (delayMs * BACKOFF_FACTOR.pow(currentAttempt - 1)).toLong().coerceAtMost(MAX_RETRY_DELAY_MS)
|
||||
val jitterRange = (backoffMs / 4).coerceAtLeast(1L)
|
||||
val jitter = Random.nextLong(-jitterRange, jitterRange + 1)
|
||||
val sleepMs = (backoffMs + jitter).coerceAtLeast(0L)
|
||||
Logger.w(e) { "[$tag] BLE operation failed (attempt $currentAttempt/$count), retrying in ${sleepMs}ms..." }
|
||||
delay(sleepMs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,18 +22,16 @@ import com.juul.kable.PeripheralBuilder
|
||||
import com.juul.kable.State
|
||||
import com.juul.kable.WriteType
|
||||
import com.juul.kable.characteristicOf
|
||||
import com.juul.kable.logs.Logging
|
||||
import com.juul.kable.writeWithoutResponse
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.NonCancellable
|
||||
import kotlinx.coroutines.TimeoutCancellationException
|
||||
import kotlinx.coroutines.channels.BufferOverflow
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.SharedFlow
|
||||
import kotlinx.coroutines.flow.asSharedFlow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.job
|
||||
@@ -90,7 +88,8 @@ class KableBleService(private val peripheral: Peripheral, private val serviceUui
|
||||
* fall back to `autoConnect = true` on failure. Only two attempts are made per [connect] call — the caller
|
||||
* ([BleRadioTransport]) owns the macro-level retry/backoff loop.
|
||||
*/
|
||||
class KableBleConnection(private val scope: CoroutineScope) : BleConnection {
|
||||
class KableBleConnection(private val scope: CoroutineScope, private val loggingConfig: BleLoggingConfig) :
|
||||
BleConnection {
|
||||
|
||||
@Volatile private var peripheral: Peripheral? = null
|
||||
|
||||
@@ -103,19 +102,15 @@ class KableBleConnection(private val scope: CoroutineScope) : BleConnection {
|
||||
private val AUTOCONNECT_FALLBACK_DELAY = 1.seconds
|
||||
}
|
||||
|
||||
private val _deviceFlow = MutableSharedFlow<BleDevice?>(replay = 1)
|
||||
override val deviceFlow: SharedFlow<BleDevice?> = _deviceFlow.asSharedFlow()
|
||||
private val _deviceFlow = MutableStateFlow<BleDevice?>(null)
|
||||
override val deviceFlow: StateFlow<BleDevice?> = _deviceFlow.asStateFlow()
|
||||
|
||||
override val device: BleDevice?
|
||||
get() = _deviceFlow.replayCache.firstOrNull()
|
||||
get() = _deviceFlow.value
|
||||
|
||||
private val _connectionState =
|
||||
MutableSharedFlow<BleConnectionState>(
|
||||
replay = 1,
|
||||
extraBufferCapacity = 1,
|
||||
onBufferOverflow = BufferOverflow.DROP_OLDEST,
|
||||
)
|
||||
override val connectionState: SharedFlow<BleConnectionState> = _connectionState.asSharedFlow()
|
||||
MutableStateFlow<BleConnectionState>(BleConnectionState.Disconnected(DisconnectReason.Unknown))
|
||||
override val connectionState: StateFlow<BleConnectionState> = _connectionState.asStateFlow()
|
||||
|
||||
@Suppress("CyclomaticComplexMethod", "LongMethod")
|
||||
override suspend fun connect(device: BleDevice) {
|
||||
@@ -124,11 +119,7 @@ class KableBleConnection(private val scope: CoroutineScope) : BleConnection {
|
||||
|
||||
/** Applies logging, observation exception handling, and platform config shared by both peripheral types. */
|
||||
fun PeripheralBuilder.commonConfig() {
|
||||
logging {
|
||||
engine = KermitLogEngine
|
||||
level = Logging.Level.Events
|
||||
identifier = device.address
|
||||
}
|
||||
logging { applyConfig(loggingConfig, identifier = device.address) }
|
||||
observationExceptionHandler { cause ->
|
||||
Logger.w(cause) { "[${device.address}] Observation failure suppressed" }
|
||||
}
|
||||
@@ -182,8 +173,11 @@ class KableBleConnection(private val scope: CoroutineScope) : BleConnection {
|
||||
throw e
|
||||
} catch (@Suppress("TooGenericExceptionCaught", "SwallowedException") e: Exception) {
|
||||
if (autoConnect) {
|
||||
// autoConnect already true and still failed — don't loop forever.
|
||||
Logger.w { "[${device.address}] autoConnect attempt failed, giving up" }
|
||||
// Already on the autoConnect path and still failing: surface a clear Disconnected
|
||||
// and let the outer reconnect loop (BleRadioTransport) own the macro retry budget.
|
||||
Logger.w {
|
||||
"[${device.address}] autoConnect attempt also failed; deferring to outer reconnect loop"
|
||||
}
|
||||
_connectionState.emit(BleConnectionState.Disconnected(DisconnectReason.ConnectionFailed))
|
||||
throw e
|
||||
}
|
||||
|
||||
@@ -20,12 +20,12 @@ import kotlinx.coroutines.CoroutineScope
|
||||
import org.koin.core.annotation.Single
|
||||
|
||||
@Single
|
||||
class KableBleConnectionFactory : BleConnectionFactory {
|
||||
class KableBleConnectionFactory(private val loggingConfig: BleLoggingConfig) : BleConnectionFactory {
|
||||
/**
|
||||
* Creates a new [KableBleConnection].
|
||||
*
|
||||
* [tag] is unused because Kable's own log identifier is set per-peripheral inside [KableBleConnection.connect]
|
||||
* using the device address, which provides more precise context than a factory-time tag.
|
||||
*/
|
||||
override fun create(scope: CoroutineScope, tag: String): BleConnection = KableBleConnection(scope)
|
||||
override fun create(scope: CoroutineScope, tag: String): BleConnection = KableBleConnection(scope, loggingConfig)
|
||||
}
|
||||
|
||||
@@ -17,7 +17,6 @@
|
||||
package org.meshtastic.core.ble
|
||||
|
||||
import com.juul.kable.Scanner
|
||||
import com.juul.kable.logs.Logging
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.channelFlow
|
||||
import kotlinx.coroutines.withTimeoutOrNull
|
||||
@@ -26,13 +25,10 @@ import kotlin.time.Duration
|
||||
import kotlin.uuid.Uuid
|
||||
|
||||
@Single
|
||||
class KableBleScanner : BleScanner {
|
||||
class KableBleScanner(private val loggingConfig: BleLoggingConfig) : BleScanner {
|
||||
override fun scan(timeout: Duration, serviceUuid: Uuid?, address: String?): Flow<BleDevice> {
|
||||
val scanner = Scanner {
|
||||
logging {
|
||||
engine = KermitLogEngine
|
||||
level = Logging.Level.Events
|
||||
}
|
||||
logging { applyConfig(loggingConfig) }
|
||||
// Use separate match blocks so each filter is evaluated independently (OR semantics).
|
||||
// Combining address and service UUID in a single match{} creates an AND filter which
|
||||
// silently drops results on OEM stacks (Samsung, Xiaomi) when the device uses a
|
||||
|
||||
@@ -47,15 +47,24 @@ class KableMeshtasticRadioProfile(private val service: BleService) : MeshtasticR
|
||||
private val fromNum = service.characteristic(FROMNUM_CHARACTERISTIC)
|
||||
private val logRadioChar = service.characteristic(LOGRADIO_CHARACTERISTIC)
|
||||
|
||||
/**
|
||||
* Cached preferred write type for [toRadio]. Resolved once at construction so the hot send path doesn't have to
|
||||
* walk the discovered services list on every packet.
|
||||
*/
|
||||
private val toRadioWriteType: BleWriteType = service.preferredWriteType(toRadio)
|
||||
|
||||
companion object {
|
||||
private val TRANSIENT_RETRY_DELAY = 500.milliseconds
|
||||
}
|
||||
|
||||
private val subscriptionReady = CompletableDeferred<Unit>()
|
||||
|
||||
/** Seed with replay=1 so the config-handshake drain starts before FROMNUM notifications are gated in. */
|
||||
/**
|
||||
* Latched signal: a single buffered slot collapses bursts of drain triggers into one pending poll. Capacity 1 with
|
||||
* DROP_OLDEST means we never block writers and never let stale drain requests pile up.
|
||||
*/
|
||||
private val triggerDrain =
|
||||
MutableSharedFlow<Unit>(replay = 1, extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.DROP_OLDEST)
|
||||
MutableSharedFlow<Unit>(replay = 1, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
|
||||
|
||||
@Suppress("TooGenericExceptionCaught", "SwallowedException")
|
||||
override val fromRadio: Flow<ByteArray> = channelFlow {
|
||||
@@ -97,14 +106,15 @@ class KableMeshtasticRadioProfile(private val service: BleService) : MeshtasticR
|
||||
if (service.hasCharacteristic(logRadioChar)) {
|
||||
service.observe(logRadioChar).catch { e ->
|
||||
if (e is CancellationException) throw e
|
||||
// logRadio is optional — swallow observation errors silently.
|
||||
// logRadio is optional — log at debug for diagnostics but don't surface to callers.
|
||||
Logger.d(e) { "logRadio observation failure suppressed" }
|
||||
}
|
||||
} else {
|
||||
emptyFlow()
|
||||
}
|
||||
|
||||
override suspend fun sendToRadio(packet: ByteArray) {
|
||||
service.write(toRadio, packet, service.preferredWriteType(toRadio))
|
||||
service.write(toRadio, packet, toRadioWriteType)
|
||||
triggerDrain.tryEmit(Unit)
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,18 @@ package org.meshtastic.core.ble.di
|
||||
|
||||
import org.koin.core.annotation.ComponentScan
|
||||
import org.koin.core.annotation.Module
|
||||
import org.koin.core.annotation.Single
|
||||
import org.meshtastic.core.ble.BleLoggingConfig
|
||||
import org.meshtastic.core.common.BuildConfigProvider
|
||||
|
||||
@Module
|
||||
@ComponentScan("org.meshtastic.core.ble")
|
||||
class CoreBleModule
|
||||
class CoreBleModule {
|
||||
/**
|
||||
* Quiet by default in release; verbose (Kable [Events][com.juul.kable.logs.Logging.Level.Events]) in debug builds.
|
||||
* Always single-line for grep/logcat friendliness.
|
||||
*/
|
||||
@Single
|
||||
fun provideBleLoggingConfig(buildConfig: BuildConfigProvider): BleLoggingConfig =
|
||||
if (buildConfig.isDebug) BleLoggingConfig.Debug else BleLoggingConfig.Release
|
||||
}
|
||||
|
||||
@@ -26,9 +26,9 @@ import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.NonCancellable
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.catch
|
||||
import kotlinx.coroutines.flow.filterIsInstance
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
@@ -258,24 +258,25 @@ class BleRadioTransport(
|
||||
isFullyConnected = true
|
||||
onConnected()
|
||||
|
||||
// Scope the connectionState listener to this iteration so it's
|
||||
// cancelled automatically before the next reconnect cycle.
|
||||
var disconnectReason: DisconnectReason = DisconnectReason.Unknown
|
||||
coroutineScope {
|
||||
bleConnection.connectionState
|
||||
.onEach { s ->
|
||||
if (s is BleConnectionState.Disconnected && isFullyConnected) {
|
||||
isFullyConnected = false
|
||||
disconnectReason = s.reason
|
||||
onDisconnected()
|
||||
}
|
||||
}
|
||||
.catch { e -> Logger.w(e) { "[$address] bleConnection.connectionState flow crashed" } }
|
||||
.launchIn(this)
|
||||
discoverServicesAndSetupCharacteristics()
|
||||
|
||||
discoverServicesAndSetupCharacteristics()
|
||||
// Wait for the StateFlow to actually reflect Connected before watching for the next
|
||||
// Disconnected. connectAndAwait returns synchronously based on the underlying Kable
|
||||
// peripheral state, but our _connectionState observer runs on a separate coroutine and
|
||||
// may lag. Without this gate the next .first { Disconnected } below could match the
|
||||
// *previous* cycle's stale Disconnected value and fire immediately, breaking reconnect.
|
||||
bleConnection.connectionState.first { it is BleConnectionState.Connected }
|
||||
|
||||
bleConnection.connectionState.first { it is BleConnectionState.Disconnected }
|
||||
// Suspend until the next Disconnected emission. We deliberately do NOT wrap this in a
|
||||
// coroutineScope { launchIn(...); first(...) } pattern: launching a hot StateFlow
|
||||
// collector inside coroutineScope hangs the scope after .first returns (the launched
|
||||
// collector never completes naturally, and coroutineScope waits for all children).
|
||||
val disconnectedState =
|
||||
bleConnection.connectionState.filterIsInstance<BleConnectionState.Disconnected>().first()
|
||||
val disconnectReason = disconnectedState.reason
|
||||
if (isFullyConnected) {
|
||||
isFullyConnected = false
|
||||
onDisconnected()
|
||||
}
|
||||
|
||||
Logger.i { "[$address] BLE connection dropped (reason: $disconnectReason), preparing to reconnect" }
|
||||
@@ -350,6 +351,13 @@ class BleRadioTransport(
|
||||
val maxLen = bleConnection.maximumWriteValueLength(BleWriteType.WITHOUT_RESPONSE)
|
||||
Logger.i { "[$address] BLE Radio Session Ready. Max write length (WITHOUT_RESPONSE): $maxLen bytes" }
|
||||
|
||||
// Ask the platform for a low-latency / high-throughput connection interval
|
||||
// (~7.5 ms on Android). The Meshtastic firmware happily accepts this and it
|
||||
// materially speeds up the initial config drain and any bulk fromRadio reads.
|
||||
if (bleConnection.requestHighConnectionPriority()) {
|
||||
Logger.d { "[$address] Requested high BLE connection priority" }
|
||||
}
|
||||
|
||||
this@BleRadioTransport.callback.onConnect()
|
||||
}
|
||||
} catch (e: CancellationException) {
|
||||
|
||||
@@ -24,9 +24,9 @@ import dev.mokkery.mock
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.SharedFlow
|
||||
import kotlinx.coroutines.flow.asSharedFlow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.test.advanceTimeBy
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import org.meshtastic.core.ble.BleConnection
|
||||
@@ -35,6 +35,7 @@ import org.meshtastic.core.ble.BleConnectionState
|
||||
import org.meshtastic.core.ble.BleDevice
|
||||
import org.meshtastic.core.ble.BleService
|
||||
import org.meshtastic.core.ble.BleWriteType
|
||||
import org.meshtastic.core.ble.DisconnectReason
|
||||
import org.meshtastic.core.testing.FakeBleConnection
|
||||
import org.meshtastic.core.testing.FakeBleConnectionFactory
|
||||
import org.meshtastic.core.testing.FakeBleDevice
|
||||
@@ -227,6 +228,63 @@ class BleRadioTransportReconnectCrashTest {
|
||||
"disconnect() must be called after CancellationException in profile() — GATT leak fix",
|
||||
)
|
||||
}
|
||||
|
||||
// ─── Reconnect after a stable connection drops ───────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Regression test for the BLE reconnect hang.
|
||||
*
|
||||
* Symptom: after a stable connection (uptime > minStableConnection) was terminated by a remote disconnect (e.g.
|
||||
* node power-cycle), the transport's reconnect loop never iterated — `attemptConnection` ran exactly once, the GATT
|
||||
* disconnect callback fired, and then nothing.
|
||||
*
|
||||
* Root cause: `attemptConnection` wrapped its disconnect-watcher in a `coroutineScope {
|
||||
* connectionState.onEach{...}.launchIn(this); connectionState.first { Disconnected } }` block. `coroutineScope`
|
||||
* waits for ALL launched children before returning, but the `.launchIn` collector on a hot `StateFlow` (or
|
||||
* `SharedFlow(replay=1)`) never completes naturally. After `.first` returned, the scope hung forever, blocking
|
||||
* `BleReconnectPolicy.execute` from issuing the next attempt.
|
||||
*
|
||||
* This test exercises the full happy-path reconnect cycle: connect → stable uptime → external disconnect → expect a
|
||||
* second `connectAndAwait` call. With the bug present, only one `connectAndAwait` call ever happens.
|
||||
*/
|
||||
@Test
|
||||
fun `transport reconnects after a stable connection is dropped remotely`() = runTest {
|
||||
val device = FakeBleDevice(address = address, name = "Test Radio")
|
||||
bluetoothRepository.bond(device)
|
||||
|
||||
val bleTransport =
|
||||
BleRadioTransport(
|
||||
scope = this,
|
||||
scanner = scanner,
|
||||
bluetoothRepository = bluetoothRepository,
|
||||
connectionFactory = connectionFactory,
|
||||
callback = service,
|
||||
address = address,
|
||||
)
|
||||
bleTransport.start()
|
||||
|
||||
// Settle delay (3 s) + connect + handshake.
|
||||
advanceTimeBy(4_000L)
|
||||
assertTrue(connection.connectAndAwaitCalls == 1, "First connect must happen during initial start window")
|
||||
|
||||
// Stay connected long enough to be considered stable (> minStableConnection = 5 s).
|
||||
advanceTimeBy(10_000L)
|
||||
|
||||
// Simulate the firmware dying mid-session — the same path a node power-cycle takes.
|
||||
connection.simulateRemoteDisconnect(reason = DisconnectReason.Timeout)
|
||||
|
||||
// Settle delay (3 s) before the next attempt + re-connect window. Generous to absorb
|
||||
// the policy retry backoff (5 s on first failure) plus another 3 s settle delay.
|
||||
advanceTimeBy(30_000L)
|
||||
|
||||
assertTrue(
|
||||
connection.connectAndAwaitCalls >= 2,
|
||||
"Reconnect loop must call connectAndAwait again after a remote disconnect " +
|
||||
"(actual calls: ${connection.connectAndAwaitCalls})",
|
||||
)
|
||||
|
||||
bleTransport.close()
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Test doubles ────────────────────────────────────────────────────────────────────────────────
|
||||
@@ -237,19 +295,19 @@ class BleRadioTransportReconnectCrashTest {
|
||||
*/
|
||||
private class CancellingProfileBleConnection : BleConnection {
|
||||
|
||||
private val _deviceFlow = MutableSharedFlow<BleDevice?>(replay = 1)
|
||||
override val deviceFlow: SharedFlow<BleDevice?> = _deviceFlow.asSharedFlow()
|
||||
private val _deviceFlow = MutableStateFlow<BleDevice?>(null)
|
||||
override val deviceFlow: StateFlow<BleDevice?> = _deviceFlow.asStateFlow()
|
||||
|
||||
private val _connectionState = MutableSharedFlow<BleConnectionState>(replay = 1)
|
||||
override val connectionState: SharedFlow<BleConnectionState> = _connectionState.asSharedFlow()
|
||||
private val _connectionState = MutableStateFlow<BleConnectionState>(BleConnectionState.Disconnected())
|
||||
override val connectionState: StateFlow<BleConnectionState> = _connectionState.asStateFlow()
|
||||
|
||||
override val device: BleDevice? = null
|
||||
|
||||
var disconnectCalls = 0
|
||||
|
||||
override suspend fun connect(device: BleDevice) {
|
||||
_deviceFlow.emit(device)
|
||||
_connectionState.emit(BleConnectionState.Connected)
|
||||
_deviceFlow.value = device
|
||||
_connectionState.value = BleConnectionState.Connected
|
||||
}
|
||||
|
||||
override suspend fun connectAndAwait(device: BleDevice, timeout: Duration): BleConnectionState {
|
||||
@@ -259,8 +317,8 @@ private class CancellingProfileBleConnection : BleConnection {
|
||||
|
||||
override suspend fun disconnect() {
|
||||
disconnectCalls++
|
||||
_connectionState.emit(BleConnectionState.Disconnected())
|
||||
_deviceFlow.emit(null)
|
||||
_connectionState.value = BleConnectionState.Disconnected()
|
||||
_deviceFlow.value = null
|
||||
}
|
||||
|
||||
override suspend fun <T> profile(
|
||||
|
||||
@@ -20,9 +20,7 @@ import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.SharedFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asSharedFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.emitAll
|
||||
import kotlinx.coroutines.flow.flow
|
||||
@@ -36,6 +34,7 @@ import org.meshtastic.core.ble.BleService
|
||||
import org.meshtastic.core.ble.BleWriteType
|
||||
import org.meshtastic.core.ble.BluetoothRepository
|
||||
import org.meshtastic.core.ble.BluetoothState
|
||||
import org.meshtastic.core.ble.DisconnectReason
|
||||
import kotlin.time.Duration
|
||||
import kotlin.uuid.Uuid
|
||||
|
||||
@@ -91,11 +90,10 @@ class FakeBleConnection :
|
||||
override val device: BleDevice?
|
||||
get() = _device.value
|
||||
|
||||
private val _deviceFlow = mutableSharedFlow<BleDevice?>(replay = 1)
|
||||
override val deviceFlow: SharedFlow<BleDevice?> = _deviceFlow.asSharedFlow()
|
||||
override val deviceFlow: StateFlow<BleDevice?> = _device.asStateFlow()
|
||||
|
||||
private val _connectionState = mutableSharedFlow<BleConnectionState>(replay = 1)
|
||||
override val connectionState: SharedFlow<BleConnectionState> = _connectionState.asSharedFlow()
|
||||
private val _connectionState = mutableStateFlow<BleConnectionState>(BleConnectionState.Disconnected())
|
||||
override val connectionState: StateFlow<BleConnectionState> = _connectionState.asStateFlow()
|
||||
|
||||
/** When > 0, the next [failNextN] calls to [connectAndAwait] return [BleConnectionState.Disconnected]. */
|
||||
var failNextN: Int = 0
|
||||
@@ -109,6 +107,14 @@ class FakeBleConnection :
|
||||
/** Number of times [disconnect] has been invoked. */
|
||||
var disconnectCalls: Int = 0
|
||||
|
||||
/** Number of times [connectAndAwait] has been invoked (including failures). */
|
||||
var connectAndAwaitCalls: Int = 0
|
||||
|
||||
/** Externally simulate a remote disconnect (e.g. node power-cycle) for tests that exercise reconnect. */
|
||||
fun simulateRemoteDisconnect(reason: DisconnectReason = DisconnectReason.Timeout) {
|
||||
_connectionState.value = BleConnectionState.Disconnected(reason)
|
||||
}
|
||||
|
||||
/** Service UUIDs that should appear missing — `profile()` throws `NoSuchElementException` for these. */
|
||||
val missingServices: MutableSet<Uuid> = mutableSetOf()
|
||||
|
||||
@@ -116,18 +122,18 @@ class FakeBleConnection :
|
||||
|
||||
override suspend fun connect(device: BleDevice) {
|
||||
_device.value = device
|
||||
_deviceFlow.emit(device)
|
||||
_connectionState.emit(BleConnectionState.Connecting)
|
||||
_connectionState.value = BleConnectionState.Connecting
|
||||
if (device is FakeBleDevice) {
|
||||
device.setState(BleConnectionState.Connecting)
|
||||
}
|
||||
_connectionState.emit(BleConnectionState.Connected)
|
||||
_connectionState.value = BleConnectionState.Connected
|
||||
if (device is FakeBleDevice) {
|
||||
device.setState(BleConnectionState.Connected)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun connectAndAwait(device: BleDevice, timeout: Duration): BleConnectionState {
|
||||
connectAndAwaitCalls++
|
||||
connectException?.let { throw it }
|
||||
if (failNextN > 0) {
|
||||
failNextN--
|
||||
@@ -140,12 +146,11 @@ class FakeBleConnection :
|
||||
override suspend fun disconnect() {
|
||||
disconnectCalls++
|
||||
val currentDevice = _device.value
|
||||
_connectionState.emit(BleConnectionState.Disconnected())
|
||||
_connectionState.value = BleConnectionState.Disconnected()
|
||||
if (currentDevice is FakeBleDevice) {
|
||||
currentDevice.setState(BleConnectionState.Disconnected())
|
||||
}
|
||||
_device.value = null
|
||||
_deviceFlow.emit(null)
|
||||
}
|
||||
|
||||
override suspend fun <T> profile(
|
||||
|
||||
@@ -23,6 +23,8 @@ import kotlinx.coroutines.CoroutineDispatcher
|
||||
import org.koin.core.annotation.KoinExperimentalAPI
|
||||
import org.koin.dsl.module
|
||||
import org.koin.test.verify.verify
|
||||
import org.meshtastic.core.ble.BleLogFormat
|
||||
import org.meshtastic.core.ble.BleLogLevel
|
||||
import kotlin.test.Test
|
||||
|
||||
@OptIn(KoinExperimentalAPI::class)
|
||||
@@ -41,6 +43,11 @@ class DesktopKoinTest {
|
||||
CoroutineDispatcher::class,
|
||||
HttpClient::class,
|
||||
HttpClientEngine::class,
|
||||
// BleLoggingConfig is a data class assembled by a factory function. Koin Verify
|
||||
// still introspects its constructor params, so the wrapping enums need to be
|
||||
// declared as known types even though they're never resolved from the graph.
|
||||
BleLogLevel::class,
|
||||
BleLogFormat::class,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.SharedFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import org.meshtastic.core.ble.BleCharacteristic
|
||||
import org.meshtastic.core.ble.BleConnection
|
||||
@@ -510,10 +510,10 @@ class LegacyDfuTransportTest {
|
||||
override val device: BleDevice?
|
||||
get() = delegate.device
|
||||
|
||||
override val deviceFlow: SharedFlow<BleDevice?>
|
||||
override val deviceFlow: StateFlow<BleDevice?>
|
||||
get() = delegate.deviceFlow
|
||||
|
||||
override val connectionState: SharedFlow<BleConnectionState>
|
||||
override val connectionState: StateFlow<BleConnectionState>
|
||||
get() = delegate.connectionState
|
||||
|
||||
override suspend fun connect(device: BleDevice) = delegate.connect(device)
|
||||
|
||||
@@ -21,7 +21,7 @@ package org.meshtastic.feature.firmware.ota.dfu
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.SharedFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import org.meshtastic.core.ble.BleCharacteristic
|
||||
import org.meshtastic.core.ble.BleConnection
|
||||
@@ -632,10 +632,10 @@ class SecureDfuTransportTest {
|
||||
override val device: BleDevice?
|
||||
get() = delegate.device
|
||||
|
||||
override val deviceFlow: SharedFlow<BleDevice?>
|
||||
override val deviceFlow: StateFlow<BleDevice?>
|
||||
get() = delegate.deviceFlow
|
||||
|
||||
override val connectionState: SharedFlow<BleConnectionState>
|
||||
override val connectionState: StateFlow<BleConnectionState>
|
||||
get() = delegate.connectionState
|
||||
|
||||
override suspend fun connect(device: BleDevice) = delegate.connect(device)
|
||||
|
||||
Reference in New Issue
Block a user