fix(ble): unblock reconnect + kable audit (logging, priority, backoff, StateFlow) (#5222)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: garthvh <1795163+garthvh@users.noreply.github.com>
This commit is contained in:
James Rich
2026-04-22 18:35:22 -05:00
committed by GitHub
parent 939132f2ae
commit 3a4cae5d0b
16 changed files with 302 additions and 95 deletions

View File

@@ -30,6 +30,8 @@ import org.koin.test.verify.definition
import org.koin.test.verify.injectedParameters
import org.koin.test.verify.verify
import org.meshtastic.app.map.MapViewModel
import org.meshtastic.core.ble.BleLogFormat
import org.meshtastic.core.ble.BleLogLevel
import org.meshtastic.core.model.util.NodeIdLookup
import org.meshtastic.feature.node.metrics.MetricsViewModel
import kotlin.test.Test
@@ -53,6 +55,11 @@ class KoinVerificationTest {
NodeIdLookup::class,
HttpClient::class,
HttpClientEngine::class,
// BleLoggingConfig is a data class assembled by a factory function. Koin Verify
// still introspects its constructor params, so the wrapping enums need to be
// declared as known types even though they're never resolved from the graph.
BleLogLevel::class,
BleLogFormat::class,
),
injections =
injectedParameters(

View File

@@ -43,15 +43,21 @@ internal actual fun PeripheralBuilder.platformConfig(device: BleDevice, autoConn
threadingStrategy = sharedThreadingStrategy
// We intentionally keep Kable's defaults for `transport` (Le) and `phy` (Le1M).
// Meshtastic radios (nRF52, ESP32-S3, RP2040+nRF) advertise BLE-only and don't support
// the LE 2M PHY in any first-party firmware, so changing these would be a regression risk
// with no upside. If a future hardware revision exposes 2M PHY, override `phy = Phy.Le2M`
// here after confirming the firmware advertises it.
onServicesDiscovered {
try {
// Android defaults to 23 bytes MTU. Meshtastic packets can be 512 bytes.
// Requesting the max MTU is critical for preventing dropped packets and stalls.
@Suppress("MagicNumber")
val negotiatedMtu = requestMtu(512)
Logger.i { "Negotiated MTU: $negotiatedMtu" }
Logger.i { "[${device.address}] Negotiated MTU: $negotiatedMtu" }
} catch (@Suppress("TooGenericExceptionCaught") e: Exception) {
Logger.w(e) { "Failed to request MTU" }
Logger.w(e) { "[${device.address}] Failed to request MTU" }
}
}
}

View File

@@ -18,7 +18,7 @@ package org.meshtastic.core.ble
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.onStart
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
@@ -41,11 +41,17 @@ interface BleConnection {
/** The currently connected [BleDevice], or null if not connected. */
val device: BleDevice?
/** A flow of the current device. */
val deviceFlow: SharedFlow<BleDevice?>
/**
* A flow of the current device. [StateFlow] semantics: replays the latest value to new collectors and conflates
* rapid updates.
*/
val deviceFlow: StateFlow<BleDevice?>
/** A flow of [BleConnectionState] changes. */
val connectionState: SharedFlow<BleConnectionState>
/**
* A flow of [BleConnectionState] changes. [StateFlow] semantics ensure the latest state is always observable and
* distinct-equals deduplication avoids spurious re-emissions.
*/
val connectionState: StateFlow<BleConnectionState>
/** Connects to the given [BleDevice]. */
suspend fun connect(device: BleDevice)

View File

@@ -0,0 +1,82 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.ble
import com.juul.kable.logs.Logging
/**
* Verbosity for Kable's internal BLE logging. Wraps [Logging.Level] so callers and the Koin DI graph don't leak Kable
* types into modules that don't directly depend on Kable.
*/
enum class BleLogLevel {
/** Only failures (lowest noise; default in release builds). */
Warnings,
/** [Warnings] plus connect / disconnect / subscribe / GATT operation events. */
Events,
/** [Events] plus a hex dump of every read/write/notify payload. Very noisy. */
Data,
}
/**
* Format for Kable's internal BLE log entries.
*
* [Compact] keeps each entry on a single line — strongly preferred for `adb logcat`, grep, and bug reports. [Multiline]
* pretty-prints across several lines, which is harder to read in tooling.
*/
enum class BleLogFormat {
Compact,
Multiline,
}
/**
* Verbosity and formatting controls for Kable's internal BLE logging.
*
* @see BleLogLevel for the verbosity scale.
* @see BleLogFormat for the layout choices.
*/
data class BleLoggingConfig(val level: BleLogLevel, val format: BleLogFormat = BleLogFormat.Compact) {
companion object {
/** Quiet defaults suitable for release builds — only warnings, single-line. */
val Release: BleLoggingConfig = BleLoggingConfig(level = BleLogLevel.Warnings)
/** Verbose defaults suitable for debug builds — every BLE event, single-line. */
val Debug: BleLoggingConfig = BleLoggingConfig(level = BleLogLevel.Events)
}
}
internal fun BleLogLevel.toKable(): Logging.Level = when (this) {
BleLogLevel.Warnings -> Logging.Level.Warnings
BleLogLevel.Events -> Logging.Level.Events
BleLogLevel.Data -> Logging.Level.Data
}
internal fun BleLogFormat.toKable(): Logging.Format = when (this) {
BleLogFormat.Compact -> Logging.Format.Compact
BleLogFormat.Multiline -> Logging.Format.Multiline
}
/** Applies this [BleLoggingConfig] to a Kable `logging { }` block, routing through [KermitLogEngine]. */
internal fun Logging.applyConfig(config: BleLoggingConfig, identifier: String? = null) {
engine = KermitLogEngine
level = config.level.toKable()
format = config.format.toKable()
if (identifier != null) {
this.identifier = identifier
}
}

View File

@@ -19,20 +19,33 @@ package org.meshtastic.core.ble
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.delay
import kotlin.math.pow
import kotlin.random.Random
/** Cap on the per-attempt backoff to prevent unbounded growth. */
private const val MAX_RETRY_DELAY_MS = 2_000L
/** Multiplicative growth factor between attempts (delay doubles each time). */
private const val BACKOFF_FACTOR = 2.0
/**
* Retries a BLE operation a specified number of times with a delay between attempts.
* Retries a BLE operation with bounded exponential backoff and jitter.
*
* @param count The number of attempts to make.
* @param delayMs The delay in milliseconds between attempts.
* @param tag A tag for logging.
* Each retry waits `delayMs * 2^(attempt-1)`, capped at [MAX_RETRY_DELAY_MS], with a random ±25% jitter applied to
* avoid synchronised retry storms when multiple operations fail in lockstep (e.g. a TX/RX pair both failing the same
* `STATUS_GATT_BUSY` window).
*
* @param count Total attempt count (default 3).
* @param delayMs Initial delay before the first retry. Subsequent delays grow exponentially.
* @param tag Tag for log prefixes.
* @param block The operation to perform.
* @return The result of the operation.
* @throws Exception if the operation fails after all attempts.
* @throws Exception If the operation fails after all attempts. [CancellationException] is always re-thrown immediately.
*/
@Suppress("MagicNumber")
suspend fun <T> retryBleOperation(
count: Int = 3,
delayMs: Long = 500L,
delayMs: Long = 250L,
tag: String = "BLE",
block: suspend () -> T,
): T {
@@ -48,8 +61,12 @@ suspend fun <T> retryBleOperation(
Logger.w(e) { "[$tag] BLE operation failed after $count attempts, giving up" }
throw e
}
Logger.w(e) { "[$tag] BLE operation failed (attempt $currentAttempt/$count), retrying in ${delayMs}ms..." }
delay(delayMs)
val backoffMs = (delayMs * BACKOFF_FACTOR.pow(currentAttempt - 1)).toLong().coerceAtMost(MAX_RETRY_DELAY_MS)
val jitterRange = (backoffMs / 4).coerceAtLeast(1L)
val jitter = Random.nextLong(-jitterRange, jitterRange + 1)
val sleepMs = (backoffMs + jitter).coerceAtLeast(0L)
Logger.w(e) { "[$tag] BLE operation failed (attempt $currentAttempt/$count), retrying in ${sleepMs}ms..." }
delay(sleepMs)
}
}
}

View File

@@ -22,18 +22,16 @@ import com.juul.kable.PeripheralBuilder
import com.juul.kable.State
import com.juul.kable.WriteType
import com.juul.kable.characteristicOf
import com.juul.kable.logs.Logging
import com.juul.kable.writeWithoutResponse
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.job
@@ -90,7 +88,8 @@ class KableBleService(private val peripheral: Peripheral, private val serviceUui
* fall back to `autoConnect = true` on failure. Only two attempts are made per [connect] call — the caller
* ([BleRadioTransport]) owns the macro-level retry/backoff loop.
*/
class KableBleConnection(private val scope: CoroutineScope) : BleConnection {
class KableBleConnection(private val scope: CoroutineScope, private val loggingConfig: BleLoggingConfig) :
BleConnection {
@Volatile private var peripheral: Peripheral? = null
@@ -103,19 +102,15 @@ class KableBleConnection(private val scope: CoroutineScope) : BleConnection {
private val AUTOCONNECT_FALLBACK_DELAY = 1.seconds
}
private val _deviceFlow = MutableSharedFlow<BleDevice?>(replay = 1)
override val deviceFlow: SharedFlow<BleDevice?> = _deviceFlow.asSharedFlow()
private val _deviceFlow = MutableStateFlow<BleDevice?>(null)
override val deviceFlow: StateFlow<BleDevice?> = _deviceFlow.asStateFlow()
override val device: BleDevice?
get() = _deviceFlow.replayCache.firstOrNull()
get() = _deviceFlow.value
private val _connectionState =
MutableSharedFlow<BleConnectionState>(
replay = 1,
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)
override val connectionState: SharedFlow<BleConnectionState> = _connectionState.asSharedFlow()
MutableStateFlow<BleConnectionState>(BleConnectionState.Disconnected(DisconnectReason.Unknown))
override val connectionState: StateFlow<BleConnectionState> = _connectionState.asStateFlow()
@Suppress("CyclomaticComplexMethod", "LongMethod")
override suspend fun connect(device: BleDevice) {
@@ -124,11 +119,7 @@ class KableBleConnection(private val scope: CoroutineScope) : BleConnection {
/** Applies logging, observation exception handling, and platform config shared by both peripheral types. */
fun PeripheralBuilder.commonConfig() {
logging {
engine = KermitLogEngine
level = Logging.Level.Events
identifier = device.address
}
logging { applyConfig(loggingConfig, identifier = device.address) }
observationExceptionHandler { cause ->
Logger.w(cause) { "[${device.address}] Observation failure suppressed" }
}
@@ -182,8 +173,11 @@ class KableBleConnection(private val scope: CoroutineScope) : BleConnection {
throw e
} catch (@Suppress("TooGenericExceptionCaught", "SwallowedException") e: Exception) {
if (autoConnect) {
// autoConnect already true and still failed — don't loop forever.
Logger.w { "[${device.address}] autoConnect attempt failed, giving up" }
// Already on the autoConnect path and still failing: surface a clear Disconnected
// and let the outer reconnect loop (BleRadioTransport) own the macro retry budget.
Logger.w {
"[${device.address}] autoConnect attempt also failed; deferring to outer reconnect loop"
}
_connectionState.emit(BleConnectionState.Disconnected(DisconnectReason.ConnectionFailed))
throw e
}

View File

@@ -20,12 +20,12 @@ import kotlinx.coroutines.CoroutineScope
import org.koin.core.annotation.Single
@Single
class KableBleConnectionFactory : BleConnectionFactory {
class KableBleConnectionFactory(private val loggingConfig: BleLoggingConfig) : BleConnectionFactory {
/**
* Creates a new [KableBleConnection].
*
* [tag] is unused because Kable's own log identifier is set per-peripheral inside [KableBleConnection.connect]
* using the device address, which provides more precise context than a factory-time tag.
*/
override fun create(scope: CoroutineScope, tag: String): BleConnection = KableBleConnection(scope)
override fun create(scope: CoroutineScope, tag: String): BleConnection = KableBleConnection(scope, loggingConfig)
}

View File

@@ -17,7 +17,6 @@
package org.meshtastic.core.ble
import com.juul.kable.Scanner
import com.juul.kable.logs.Logging
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.withTimeoutOrNull
@@ -26,13 +25,10 @@ import kotlin.time.Duration
import kotlin.uuid.Uuid
@Single
class KableBleScanner : BleScanner {
class KableBleScanner(private val loggingConfig: BleLoggingConfig) : BleScanner {
override fun scan(timeout: Duration, serviceUuid: Uuid?, address: String?): Flow<BleDevice> {
val scanner = Scanner {
logging {
engine = KermitLogEngine
level = Logging.Level.Events
}
logging { applyConfig(loggingConfig) }
// Use separate match blocks so each filter is evaluated independently (OR semantics).
// Combining address and service UUID in a single match{} creates an AND filter which
// silently drops results on OEM stacks (Samsung, Xiaomi) when the device uses a

View File

@@ -47,15 +47,24 @@ class KableMeshtasticRadioProfile(private val service: BleService) : MeshtasticR
private val fromNum = service.characteristic(FROMNUM_CHARACTERISTIC)
private val logRadioChar = service.characteristic(LOGRADIO_CHARACTERISTIC)
/**
* Cached preferred write type for [toRadio]. Resolved once at construction so the hot send path doesn't have to
* walk the discovered services list on every packet.
*/
private val toRadioWriteType: BleWriteType = service.preferredWriteType(toRadio)
companion object {
private val TRANSIENT_RETRY_DELAY = 500.milliseconds
}
private val subscriptionReady = CompletableDeferred<Unit>()
/** Seed with replay=1 so the config-handshake drain starts before FROMNUM notifications are gated in. */
/**
* Latched signal: a single buffered slot collapses bursts of drain triggers into one pending poll. Capacity 1 with
* DROP_OLDEST means we never block writers and never let stale drain requests pile up.
*/
private val triggerDrain =
MutableSharedFlow<Unit>(replay = 1, extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.DROP_OLDEST)
MutableSharedFlow<Unit>(replay = 1, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
@Suppress("TooGenericExceptionCaught", "SwallowedException")
override val fromRadio: Flow<ByteArray> = channelFlow {
@@ -97,14 +106,15 @@ class KableMeshtasticRadioProfile(private val service: BleService) : MeshtasticR
if (service.hasCharacteristic(logRadioChar)) {
service.observe(logRadioChar).catch { e ->
if (e is CancellationException) throw e
// logRadio is optional — swallow observation errors silently.
// logRadio is optional — log at debug for diagnostics but don't surface to callers.
Logger.d(e) { "logRadio observation failure suppressed" }
}
} else {
emptyFlow()
}
override suspend fun sendToRadio(packet: ByteArray) {
service.write(toRadio, packet, service.preferredWriteType(toRadio))
service.write(toRadio, packet, toRadioWriteType)
triggerDrain.tryEmit(Unit)
}

View File

@@ -18,7 +18,18 @@ package org.meshtastic.core.ble.di
import org.koin.core.annotation.ComponentScan
import org.koin.core.annotation.Module
import org.koin.core.annotation.Single
import org.meshtastic.core.ble.BleLoggingConfig
import org.meshtastic.core.common.BuildConfigProvider
@Module
@ComponentScan("org.meshtastic.core.ble")
class CoreBleModule
class CoreBleModule {
/**
* Quiet by default in release; verbose (Kable [Events][com.juul.kable.logs.Logging.Level.Events]) in debug builds.
* Always single-line for grep/logcat friendliness.
*/
@Single
fun provideBleLoggingConfig(buildConfig: BuildConfigProvider): BleLoggingConfig =
if (buildConfig.isDebug) BleLoggingConfig.Debug else BleLoggingConfig.Release
}

View File

@@ -26,9 +26,9 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
@@ -258,24 +258,25 @@ class BleRadioTransport(
isFullyConnected = true
onConnected()
// Scope the connectionState listener to this iteration so it's
// cancelled automatically before the next reconnect cycle.
var disconnectReason: DisconnectReason = DisconnectReason.Unknown
coroutineScope {
bleConnection.connectionState
.onEach { s ->
if (s is BleConnectionState.Disconnected && isFullyConnected) {
isFullyConnected = false
disconnectReason = s.reason
onDisconnected()
}
}
.catch { e -> Logger.w(e) { "[$address] bleConnection.connectionState flow crashed" } }
.launchIn(this)
discoverServicesAndSetupCharacteristics()
discoverServicesAndSetupCharacteristics()
// Wait for the StateFlow to actually reflect Connected before watching for the next
// Disconnected. connectAndAwait returns synchronously based on the underlying Kable
// peripheral state, but our _connectionState observer runs on a separate coroutine and
// may lag. Without this gate the next .first { Disconnected } below could match the
// *previous* cycle's stale Disconnected value and fire immediately, breaking reconnect.
bleConnection.connectionState.first { it is BleConnectionState.Connected }
bleConnection.connectionState.first { it is BleConnectionState.Disconnected }
// Suspend until the next Disconnected emission. We deliberately do NOT wrap this in a
// coroutineScope { launchIn(...); first(...) } pattern: launching a hot StateFlow
// collector inside coroutineScope hangs the scope after .first returns (the launched
// collector never completes naturally, and coroutineScope waits for all children).
val disconnectedState =
bleConnection.connectionState.filterIsInstance<BleConnectionState.Disconnected>().first()
val disconnectReason = disconnectedState.reason
if (isFullyConnected) {
isFullyConnected = false
onDisconnected()
}
Logger.i { "[$address] BLE connection dropped (reason: $disconnectReason), preparing to reconnect" }
@@ -350,6 +351,13 @@ class BleRadioTransport(
val maxLen = bleConnection.maximumWriteValueLength(BleWriteType.WITHOUT_RESPONSE)
Logger.i { "[$address] BLE Radio Session Ready. Max write length (WITHOUT_RESPONSE): $maxLen bytes" }
// Ask the platform for a low-latency / high-throughput connection interval
// (~7.5 ms on Android). The Meshtastic firmware happily accepts this and it
// materially speeds up the initial config drain and any bulk fromRadio reads.
if (bleConnection.requestHighConnectionPriority()) {
Logger.d { "[$address] Requested high BLE connection priority" }
}
this@BleRadioTransport.callback.onConnect()
}
} catch (e: CancellationException) {

View File

@@ -24,9 +24,9 @@ import dev.mokkery.mock
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.runTest
import org.meshtastic.core.ble.BleConnection
@@ -35,6 +35,7 @@ import org.meshtastic.core.ble.BleConnectionState
import org.meshtastic.core.ble.BleDevice
import org.meshtastic.core.ble.BleService
import org.meshtastic.core.ble.BleWriteType
import org.meshtastic.core.ble.DisconnectReason
import org.meshtastic.core.testing.FakeBleConnection
import org.meshtastic.core.testing.FakeBleConnectionFactory
import org.meshtastic.core.testing.FakeBleDevice
@@ -227,6 +228,63 @@ class BleRadioTransportReconnectCrashTest {
"disconnect() must be called after CancellationException in profile() — GATT leak fix",
)
}
// ─── Reconnect after a stable connection drops ───────────────────────────────────────────────
/**
* Regression test for the BLE reconnect hang.
*
* Symptom: after a stable connection (uptime > minStableConnection) was terminated by a remote disconnect (e.g.
* node power-cycle), the transport's reconnect loop never iterated — `attemptConnection` ran exactly once, the GATT
* disconnect callback fired, and then nothing.
*
* Root cause: `attemptConnection` wrapped its disconnect-watcher in a `coroutineScope {
* connectionState.onEach{...}.launchIn(this); connectionState.first { Disconnected } }` block. `coroutineScope`
* waits for ALL launched children before returning, but the `.launchIn` collector on a hot `StateFlow` (or
* `SharedFlow(replay=1)`) never completes naturally. After `.first` returned, the scope hung forever, blocking
* `BleReconnectPolicy.execute` from issuing the next attempt.
*
* This test exercises the full happy-path reconnect cycle: connect → stable uptime → external disconnect → expect a
* second `connectAndAwait` call. With the bug present, only one `connectAndAwait` call ever happens.
*/
@Test
fun `transport reconnects after a stable connection is dropped remotely`() = runTest {
val device = FakeBleDevice(address = address, name = "Test Radio")
bluetoothRepository.bond(device)
val bleTransport =
BleRadioTransport(
scope = this,
scanner = scanner,
bluetoothRepository = bluetoothRepository,
connectionFactory = connectionFactory,
callback = service,
address = address,
)
bleTransport.start()
// Settle delay (3 s) + connect + handshake.
advanceTimeBy(4_000L)
assertTrue(connection.connectAndAwaitCalls == 1, "First connect must happen during initial start window")
// Stay connected long enough to be considered stable (> minStableConnection = 5 s).
advanceTimeBy(10_000L)
// Simulate the firmware dying mid-session — the same path a node power-cycle takes.
connection.simulateRemoteDisconnect(reason = DisconnectReason.Timeout)
// Settle delay (3 s) before the next attempt + re-connect window. Generous to absorb
// the policy retry backoff (5 s on first failure) plus another 3 s settle delay.
advanceTimeBy(30_000L)
assertTrue(
connection.connectAndAwaitCalls >= 2,
"Reconnect loop must call connectAndAwait again after a remote disconnect " +
"(actual calls: ${connection.connectAndAwaitCalls})",
)
bleTransport.close()
}
}
// ─── Test doubles ────────────────────────────────────────────────────────────────────────────────
@@ -237,19 +295,19 @@ class BleRadioTransportReconnectCrashTest {
*/
private class CancellingProfileBleConnection : BleConnection {
private val _deviceFlow = MutableSharedFlow<BleDevice?>(replay = 1)
override val deviceFlow: SharedFlow<BleDevice?> = _deviceFlow.asSharedFlow()
private val _deviceFlow = MutableStateFlow<BleDevice?>(null)
override val deviceFlow: StateFlow<BleDevice?> = _deviceFlow.asStateFlow()
private val _connectionState = MutableSharedFlow<BleConnectionState>(replay = 1)
override val connectionState: SharedFlow<BleConnectionState> = _connectionState.asSharedFlow()
private val _connectionState = MutableStateFlow<BleConnectionState>(BleConnectionState.Disconnected())
override val connectionState: StateFlow<BleConnectionState> = _connectionState.asStateFlow()
override val device: BleDevice? = null
var disconnectCalls = 0
override suspend fun connect(device: BleDevice) {
_deviceFlow.emit(device)
_connectionState.emit(BleConnectionState.Connected)
_deviceFlow.value = device
_connectionState.value = BleConnectionState.Connected
}
override suspend fun connectAndAwait(device: BleDevice, timeout: Duration): BleConnectionState {
@@ -259,8 +317,8 @@ private class CancellingProfileBleConnection : BleConnection {
override suspend fun disconnect() {
disconnectCalls++
_connectionState.emit(BleConnectionState.Disconnected())
_deviceFlow.emit(null)
_connectionState.value = BleConnectionState.Disconnected()
_deviceFlow.value = null
}
override suspend fun <T> profile(

View File

@@ -20,9 +20,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
@@ -36,6 +34,7 @@ import org.meshtastic.core.ble.BleService
import org.meshtastic.core.ble.BleWriteType
import org.meshtastic.core.ble.BluetoothRepository
import org.meshtastic.core.ble.BluetoothState
import org.meshtastic.core.ble.DisconnectReason
import kotlin.time.Duration
import kotlin.uuid.Uuid
@@ -91,11 +90,10 @@ class FakeBleConnection :
override val device: BleDevice?
get() = _device.value
private val _deviceFlow = mutableSharedFlow<BleDevice?>(replay = 1)
override val deviceFlow: SharedFlow<BleDevice?> = _deviceFlow.asSharedFlow()
override val deviceFlow: StateFlow<BleDevice?> = _device.asStateFlow()
private val _connectionState = mutableSharedFlow<BleConnectionState>(replay = 1)
override val connectionState: SharedFlow<BleConnectionState> = _connectionState.asSharedFlow()
private val _connectionState = mutableStateFlow<BleConnectionState>(BleConnectionState.Disconnected())
override val connectionState: StateFlow<BleConnectionState> = _connectionState.asStateFlow()
/** When > 0, the next [failNextN] calls to [connectAndAwait] return [BleConnectionState.Disconnected]. */
var failNextN: Int = 0
@@ -109,6 +107,14 @@ class FakeBleConnection :
/** Number of times [disconnect] has been invoked. */
var disconnectCalls: Int = 0
/** Number of times [connectAndAwait] has been invoked (including failures). */
var connectAndAwaitCalls: Int = 0
/** Externally simulate a remote disconnect (e.g. node power-cycle) for tests that exercise reconnect. */
fun simulateRemoteDisconnect(reason: DisconnectReason = DisconnectReason.Timeout) {
_connectionState.value = BleConnectionState.Disconnected(reason)
}
/** Service UUIDs that should appear missing — `profile()` throws `NoSuchElementException` for these. */
val missingServices: MutableSet<Uuid> = mutableSetOf()
@@ -116,18 +122,18 @@ class FakeBleConnection :
override suspend fun connect(device: BleDevice) {
_device.value = device
_deviceFlow.emit(device)
_connectionState.emit(BleConnectionState.Connecting)
_connectionState.value = BleConnectionState.Connecting
if (device is FakeBleDevice) {
device.setState(BleConnectionState.Connecting)
}
_connectionState.emit(BleConnectionState.Connected)
_connectionState.value = BleConnectionState.Connected
if (device is FakeBleDevice) {
device.setState(BleConnectionState.Connected)
}
}
override suspend fun connectAndAwait(device: BleDevice, timeout: Duration): BleConnectionState {
connectAndAwaitCalls++
connectException?.let { throw it }
if (failNextN > 0) {
failNextN--
@@ -140,12 +146,11 @@ class FakeBleConnection :
override suspend fun disconnect() {
disconnectCalls++
val currentDevice = _device.value
_connectionState.emit(BleConnectionState.Disconnected())
_connectionState.value = BleConnectionState.Disconnected()
if (currentDevice is FakeBleDevice) {
currentDevice.setState(BleConnectionState.Disconnected())
}
_device.value = null
_deviceFlow.emit(null)
}
override suspend fun <T> profile(

View File

@@ -23,6 +23,8 @@ import kotlinx.coroutines.CoroutineDispatcher
import org.koin.core.annotation.KoinExperimentalAPI
import org.koin.dsl.module
import org.koin.test.verify.verify
import org.meshtastic.core.ble.BleLogFormat
import org.meshtastic.core.ble.BleLogLevel
import kotlin.test.Test
@OptIn(KoinExperimentalAPI::class)
@@ -41,6 +43,11 @@ class DesktopKoinTest {
CoroutineDispatcher::class,
HttpClient::class,
HttpClientEngine::class,
// BleLoggingConfig is a data class assembled by a factory function. Koin Verify
// still introspects its constructor params, so the wrapping enums need to be
// declared as known types even though they're never resolved from the graph.
BleLogLevel::class,
BleLogFormat::class,
),
)
}

View File

@@ -22,7 +22,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.test.runTest
import org.meshtastic.core.ble.BleCharacteristic
import org.meshtastic.core.ble.BleConnection
@@ -510,10 +510,10 @@ class LegacyDfuTransportTest {
override val device: BleDevice?
get() = delegate.device
override val deviceFlow: SharedFlow<BleDevice?>
override val deviceFlow: StateFlow<BleDevice?>
get() = delegate.deviceFlow
override val connectionState: SharedFlow<BleConnectionState>
override val connectionState: StateFlow<BleConnectionState>
get() = delegate.connectionState
override suspend fun connect(device: BleDevice) = delegate.connect(device)

View File

@@ -21,7 +21,7 @@ package org.meshtastic.feature.firmware.ota.dfu
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.test.runTest
import org.meshtastic.core.ble.BleCharacteristic
import org.meshtastic.core.ble.BleConnection
@@ -632,10 +632,10 @@ class SecureDfuTransportTest {
override val device: BleDevice?
get() = delegate.device
override val deviceFlow: SharedFlow<BleDevice?>
override val deviceFlow: StateFlow<BleDevice?>
get() = delegate.deviceFlow
override val connectionState: SharedFlow<BleConnectionState>
override val connectionState: StateFlow<BleConnectionState>
get() = delegate.connectionState
override suspend fun connect(device: BleDevice) = delegate.connect(device)