fix(service): resolve selected-device startup race (#5828)

This commit is contained in:
Jeremiah K
2026-06-17 12:54:00 -05:00
committed by GitHub
parent 5b19a5f31f
commit acf9ff9349
11 changed files with 661 additions and 51 deletions

View File

@@ -28,3 +28,17 @@ fun normalizeAddress(addr: String?): String {
else -> u.replace(":", "")
}
}
/**
* True iff [normalized] (the output of [normalizeAddress]) is a no-device sentinel that DB naming collapses to the
* default database. Single source of truth for "no real device selected".
*/
fun isNoDeviceSentinel(normalized: String): Boolean = normalized == "DEFAULT" || normalized == ".N"
/**
* True iff [address] refers to a real selected device. Rejects null/blank and all legacy no-device sentinels (`"n"`,
* `"null"`, `".n"`, `"default"`, case-insensitive) by delegating to [normalizeAddress] and [isNoDeviceSentinel]. Any
* input that [buildDbName] would collapse to `DEFAULT_DB_NAME` is rejected here too, so the foreground-service
* stay-alive decision and the DB name resolution can never diverge.
*/
fun isValidDeviceAddress(address: String?): Boolean = !isNoDeviceSentinel(normalizeAddress(address))

View File

@@ -18,6 +18,8 @@ package org.meshtastic.core.common.util
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class AddressUtilsTest {
@@ -69,4 +71,18 @@ class AddressUtilsTest {
fun mixedCaseWithColons() {
assertEquals("AABBCC", normalizeAddress("aA:Bb:cC"))
}
@Test
fun isValidDeviceAddressAcceptsRealDeviceAddress() {
assertTrue(isValidDeviceAddress("xAA:BB:CC:DD:EE:FF"))
assertTrue(isValidDeviceAddress("t192.168.1.100"))
}
@Test
fun isValidDeviceAddressRejectsNoDeviceSentinels() {
listOf(null, "", " ", "\t", "n", "N", "null", "NULL", "Null", ".n", ".N", "default", "DEFAULT", "Default")
.forEach { sentinel ->
assertFalse(isValidDeviceAddress(sentinel), "Sentinel ${sentinel ?: "null"} must be invalid")
}
}
}

View File

@@ -31,14 +31,22 @@ import okio.Path.Companion.toPath
import org.meshtastic.core.common.ContextServices
import org.meshtastic.core.database.MeshtasticDatabase.Companion.configureCommon
/** Returns a [RoomDatabase.Builder] configured for Android with the given [dbName]. */
/**
* Returns a [RoomDatabase.Builder] configured for Android with the given [dbName].
*
* Android production deliberately opts out of Room KMP's multi-reader connection pool (`multiConnection = false`).
* Under coroutine cancellation churn (e.g. DB switches via `flatMapLatest`), the reader-pool permit semaphore can
* wedge: all reader connections report `Free` but `permits=0`, so every new read acquisition times out indefinitely.
* Single-connection mode serializes reads and writes through one connection, eliminating the separate reader permit
* pool entirely. JVM/iOS may still use the pool; see [MeshtasticDatabase.configureCommon].
*/
actual fun getDatabaseBuilder(dbName: String): RoomDatabase.Builder<MeshtasticDatabase> {
val dbFile = ContextServices.app.getDatabasePath(dbName)
return Room.databaseBuilder<MeshtasticDatabase>(
name = dbFile.absolutePath,
factory = { MeshtasticDatabaseConstructor.initialize() },
)
.configureCommon()
.configureCommon(multiConnection = false)
.setDriver(BundledSQLiteDriver())
}

View File

@@ -17,6 +17,7 @@
package org.meshtastic.core.database
import okio.ByteString.Companion.encodeUtf8
import org.meshtastic.core.common.util.isNoDeviceSentinel
import org.meshtastic.core.common.util.normalizeAddress
object DatabaseConstants {
@@ -43,10 +44,14 @@ object DatabaseConstants {
fun shortSha1(s: String): String = s.encodeUtf8().sha1().hex().take(DatabaseConstants.DB_NAME_HASH_LEN)
fun buildDbName(address: String?): String = if (address.isNullOrBlank()) {
DatabaseConstants.DEFAULT_DB_NAME
} else {
"${DatabaseConstants.DB_PREFIX}_${shortSha1(normalizeAddress(address))}"
fun buildDbName(address: String?): String {
val normalized = normalizeAddress(address)
// No-device sentinels must resolve to the canonical DEFAULT_DB_NAME file, not be hashed into a
// separate DB — otherwise data for "no device selected" would split across files depending on
// which sentinel form prefs happened to emit. Sentinel detection is shared with isValidDeviceAddress
// via isNoDeviceSentinel so DB naming and the foreground-service stay-alive decision never diverge.
if (isNoDeviceSentinel(normalized)) return DatabaseConstants.DEFAULT_DB_NAME
return "${DatabaseConstants.DB_PREFIX}_${shortSha1(normalized)}"
}
fun anonymizeAddress(address: String?): String = when {

View File

@@ -142,11 +142,22 @@ abstract class MeshtasticDatabase : RoomDatabase() {
/**
* Configures a [RoomDatabase.Builder] with standard settings for this project.
*
* @param multiConnection opens a multi-reader connection pool for concurrent reads. Production/file databases
* want this. In-memory databases (tests) MUST pass `false`: a pooled reader connection can serve a snapshot
* older than the latest write on the writer connection, so a read immediately after a write may observe stale
* rows — making read-after-write assertions non-deterministically flaky (see `DeviceLinkRepositoryImplTest`).
* A single connection serializes reads behind writes.
* @param multiConnection when `true` (default), opens a multi-reader connection pool (`maxNumOfReaders = 4`,
* `maxNumOfWriters = 1`) so reads can run concurrently. Pass `false` to serialize all reads and writes
* through a single connection (no separate reader pool).
*
* **Android production passes `false`.** Under coroutine cancellation churn (e.g. DB switches via
* `flatMapLatest`), the Room KMP reader-pool permit semaphore can wedge: all reader connections report `Free`
* but `permits=0`, so every read acquisition times out indefinitely. Single-connection mode eliminates the
* separate reader permit pool. See `DatabaseBuilder.kt` (androidMain).
*
* **In-memory databases MUST pass `false`** for deterministic read-after-write: a pooled reader connection can
* serve a snapshot older than the latest write on the writer connection, so a read immediately after a write
* may observe stale rows — making read-after-write assertions non-deterministically flaky (see
* `DeviceLinkRepositoryImplTest`). A single connection serializes reads behind writes.
*
* **JVM/iOS production uses `true`** (the default). Revisit if desktop/iOS field logs show similar
* pool-exhaustion patterns under cancellation churn.
*/
fun <T : RoomDatabase> RoomDatabase.Builder<T>.configureCommon(
multiConnection: Boolean = true,

View File

@@ -0,0 +1,56 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.database
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals
import kotlin.test.assertTrue
class BuildDbNameTest {
@Test
fun `no-device sentinels all resolve to DEFAULT_DB_NAME`() {
val expected = DatabaseConstants.DEFAULT_DB_NAME
val sentinels =
listOf(null, "", " ", "\t", "n", "N", "null", "NULL", "Null", ".n", ".N", "default", "DEFAULT", "Default")
sentinels.forEach { sentinel ->
assertEquals(
expected,
buildDbName(sentinel),
"Sentinel ${sentinel?.let { "'$it'" } ?: "null"} must collapse to DEFAULT_DB_NAME",
)
}
}
@Test
fun `BLE MAC address hashes consistently regardless of punctuation or case`() {
val colonUpper = buildDbName("AA:BB:CC:DD:EE:FF")
val noColonLower = buildDbName("aabbccddeeff")
val colonMixed = buildDbName("aA:Bb:cC:dD:eE:fF")
assertEquals(colonUpper, noColonLower)
assertEquals(colonUpper, colonMixed)
}
@Test
fun `non-default real device address does not equal DEFAULT_DB_NAME`() {
val realDb = buildDbName("AA:BB:CC:DD:EE:FF")
assertNotEquals(DatabaseConstants.DEFAULT_DB_NAME, realDb)
// Real device DB names are still scoped under the standard prefix.
assertTrue(realDb.startsWith("${DatabaseConstants.DB_PREFIX}_"))
}
}

View File

@@ -27,9 +27,19 @@ import android.os.IBinder
import android.os.PowerManager
import androidx.core.app.ServiceCompat
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeoutOrNull
import org.koin.android.ext.android.inject
import org.meshtastic.core.common.hasLocationPermission
import org.meshtastic.core.common.util.isValidDeviceAddress
import org.meshtastic.core.model.DeviceVersion
import org.meshtastic.core.model.util.anonymize
import org.meshtastic.core.repository.MeshConnectionManager
import org.meshtastic.core.repository.MeshNotificationManager
import org.meshtastic.core.repository.RadioInterfaceService
@@ -58,6 +68,18 @@ class MeshService : Service() {
private var isServiceInitialized = false
/**
* Scope for short-lived coroutines owned by this service (e.g. waiting for the selected-device address to load).
* Canceled in [onDestroy].
*/
private val serviceScope = CoroutineScope(SupervisorJob() + Dispatchers.Main.immediate)
/**
* Active job waiting for the selected-device address to be loaded from DataStore. Null when no wait is pending
* (either the address was already valid at [onStartCommand] time, or the wait already resolved).
*/
private var addressWaitJob: Job? = null
/**
* Partial wake lock held while the foreground service is running. Prevents the CPU from being throttled while the
* TAK server's keepalive coroutines, socket writes, and mesh packet handlers need to run on a regular cadence.
@@ -73,6 +95,13 @@ class MeshService : Service() {
val absoluteMinDeviceVersion = DeviceVersion(DeviceVersion.ABS_MIN_FW_VERSION)
private const val WAKE_LOCK_TIMEOUT_MS = 30L * 60L * 1_000L // 30 minutes
/**
* How long [onStartCommand] will keep the service alive waiting for the selected-device address flow to emit a
* valid value before concluding that no device is genuinely selected. Covers cold-start DataStore load time;
* only the initial null/blank value is treated as transient.
*/
private const val DEVICE_ADDRESS_SETTLE_MS = 5_000L
}
override fun onCreate() {
@@ -97,9 +126,6 @@ class MeshService : Service() {
return START_NOT_STICKY
}
val address = radioInterfaceService.getDeviceAddress()
val wantForeground = address != null && address != "n"
connectionManager.updateStatusNotification()
val notification = androidNotifications.getServiceNotification()
@@ -114,18 +140,56 @@ class MeshService : Service() {
0
}
// Start foreground FIRST. Android requires startForeground() within ~5s of onStartCommand and before any
// potentially-blocking work. We never defer this — even when the selected-device address is not yet loaded.
startForegroundSafely(notification, foregroundServiceType)
return if (!wantForeground) {
Logger.i { "Stopping mesh service because no device is selected" }
releaseWakeLock()
ServiceCompat.stopForeground(this, ServiceCompat.STOP_FOREGROUND_REMOVE)
stopSelf()
START_NOT_STICKY
} else {
val address = radioInterfaceService.getDeviceAddress()
if (isValidDeviceAddress(address)) {
// Address is already loaded and valid — proceed normally.
addressWaitJob?.cancel()
addressWaitJob = null
acquireWakeLock()
START_STICKY
Logger.i { "MeshService: selected device ready (${address.anonymize}), staying foreground" }
return START_STICKY
}
// Address is currently null/blank/sentinel. This may be a transient state while DataStore emits the persisted
// value (cold-start race: RadioPrefsImpl.devAddr starts as null until the first DataStore emission). Make no
// irreversible stopSelf() decision here; wait briefly for the address flow to settle.
Logger.i { "MeshService: selected address not yet loaded (${address.anonymize}); waiting for address flow" }
scheduleDeviceAddressResolution()
return START_STICKY
}
/**
* Waits for [RadioInterfaceService.currentDeviceAddressFlow] to emit a valid device address. Resolves the transient
* null/blank window at cold start without busy-polling [RadioInterfaceService.getDeviceAddress].
* - On a valid emission: acquires the wake lock and the service continues as a normal foreground service.
* - On timeout (no valid address observed): concludes no device is genuinely selected and stops cleanly.
*
* Uses [first] with a predicate rather than `drop(1)` so we never skip an already-current valid StateFlow value: if
* the address arrived between the synchronous [onStartCommand] read and this subscription, [first] returns it
* immediately instead of waiting the full timeout and spuriously stopping a service that has a valid device.
*/
private fun scheduleDeviceAddressResolution() {
addressWaitJob?.cancel()
addressWaitJob =
serviceScope.launch {
val resolved =
withTimeoutOrNull(DEVICE_ADDRESS_SETTLE_MS) {
radioInterfaceService.currentDeviceAddressFlow.first(::isValidDeviceAddress)
}
if (isValidDeviceAddress(resolved)) {
Logger.i { "MeshService: selected device resolved (${resolved.anonymize}) after address-flow wait" }
acquireWakeLock()
} else {
Logger.i { "MeshService: no device selected after address flow settled; stopping" }
releaseWakeLock()
ServiceCompat.stopForeground(this@MeshService, ServiceCompat.STOP_FOREGROUND_REMOVE)
stopSelf()
}
}
}
private fun startForegroundSafely(notification: android.app.Notification, foregroundServiceType: Int) {
@@ -201,6 +265,9 @@ class MeshService : Service() {
override fun onDestroy() {
Logger.i { "Destroying mesh service" }
addressWaitJob?.cancel()
addressWaitJob = null
serviceScope.cancel()
releaseWakeLock()
ServiceCompat.stopForeground(this, ServiceCompat.STOP_FOREGROUND_REMOVE)
if (isServiceInitialized) {

View File

@@ -22,6 +22,7 @@ import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
@@ -29,6 +30,8 @@ import kotlinx.coroutines.launch
import org.koin.core.annotation.Single
import org.meshtastic.core.common.database.DatabaseManager
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.isValidDeviceAddress
import org.meshtastic.core.common.util.safeCatching
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.repository.MeshConnectionManager
import org.meshtastic.core.repository.MeshMessageProcessor
@@ -119,18 +122,47 @@ class MeshServiceOrchestrator(
}
.launchIn(newScope)
// Cold-start lifecycle invariant: a transport must NOT start for a selected address until
// the active DB has been switched to that same address. We enforce the ordering
// (valid address -> DB switch -> connect) by waiting for currentDeviceAddressFlow to
// surface a real selected address before performing the DB switch and connecting the
// radio. Address sync inside SharedRadioInterfaceService now runs independently of
// connect() (via its init{} listener), so this wait completes as soon as prefs load —
// without it, connect() would race ahead of the DB switch and the firmware handshake
// would write into the wrong (or null) DB.
//
// If no device is ever selected this suspends indefinitely for the lifetime of newScope;
// by design the radio must not connect without a selected device. This mirrors
// MeshService's foreground-service stay-alive gate (isValidDeviceAddress).
newScope.handledLaunch {
// Ensure the per-device database is active before the radio connects.
// On Android this is handled by MeshUtilApplication.init(); on Desktop (and any
// future KMP host) the orchestrator is the first entry point, so it must initialize
// the database here. Without this, DatabaseManager._currentDb stays null and all
// Room writes via withDb() are silently dropped — causing ourNodeInfo to remain null
// after the handshake completes.
databaseManager.switchActiveDatabase(radioInterfaceService.getDeviceAddress())
val address = radioInterfaceService.currentDeviceAddressFlow.first(::isValidDeviceAddress)
databaseManager.switchActiveDatabase(address)
// Load cached nodes from the now-active per-device DB before connect() so the firmware
// handshake doesn't see a stale/empty node set. Previously this ran synchronously in
// start() and raced ahead of the DB switch, reading the default (or null) DB.
nodeManager.loadCachedNodeDB()
Logger.i { "Per-device database initialized, connecting radio" }
radioInterfaceService.connect()
}
// Mid-session device-address transitions (late process-lifecycle devAddr propagation,
// user-initiated device switch, etc.): keep the active DB in sync with the selected
// device. The initial emission (matching the .first() snapshot above) is an idempotent
// no-op in DatabaseManager. wrap in safeCatching so a single transient failure (e.g. Room
// I/O error) doesn't kill the collector and silently halt ALL future address propagation
// for the lifetime of this orchestrator scope. safeCatching re-throws CancellationException
// so scope cancellation still propagates cleanly.
//
// Note: cold-start ordering (valid address -> DB switch -> connect) is enforced above,
// but a mid-session device switch via setDeviceAddress() races this DB switch against the
// transport restart. Pre-existing behavior, not introduced by the lifecycle refactor.
radioInterfaceService.currentDeviceAddressFlow
.onEach { addr ->
safeCatching { databaseManager.switchActiveDatabase(addr) }
.onFailure { err -> Logger.e(err) { "Failed to switch active database on address update" } }
}
.launchIn(newScope)
radioInterfaceService.receivedData
.onEach { bytes -> messageProcessor.handleFromRadio(bytes, nodeManager.myNodeNum.value) }
.launchIn(newScope)
@@ -138,8 +170,6 @@ class MeshServiceOrchestrator(
radioInterfaceService.connectionError
.onEach { errorMessage -> serviceStateWriter.setErrorMessage(errorMessage, Severity.Warn) }
.launchIn(newScope)
nodeManager.loadCachedNodeDB()
}
/**

View File

@@ -128,6 +128,17 @@ class SharedRadioInterfaceService(
*/
@Volatile private var isStopping = false
/**
* True while an explicit connection lifecycle is active (set by [connect]/[setDeviceAddress], cleared by
* [disconnect]). The hardware ([bluetoothRepository.state]) and network ([networkRepository.networkAvailable])
* listeners and the [checkLiveness] zombie-recovery path consult this to avoid starting a transport the user has
* torn down — without it, BT/network recovery emissions can wake a transport after explicit disconnect, leaving the
* app "connected" with no orchestrator collector and an unloaded NodeDB/channels.
*
* Guarded by [transportMutex]; every read/write site holds the lock. The @Volatile keeps diagnostic reads honest.
*/
@Volatile private var connectionRequested = false
/** Prevents concurrent liveness-induced transport restarts from stacking. */
private val isRestarting = atomic(false)
@@ -170,6 +181,29 @@ class SharedRadioInterfaceService(
private val initLock = Mutex()
private val transportMutex = Mutex()
init {
// Address sync runs INDEPENDENTLY of connect() so that observers (notably
// MeshServiceOrchestrator) see a valid currentDeviceAddressFlow BEFORE the transport
// starts. Previously this mirror lived inside initStateListeners()'s devAddr listener and
// was coupled to startTransportLocked() — but initStateListeners() is only invoked from
// connect(), so the flow never updated until connect() ran. That created a cold-start race:
// the orchestrator's currentDeviceAddressFlow observer could fire AFTER the transport had
// already started, violating the invariant that the active DB must be switched to the
// selected device's DB before its transport starts.
//
// This listener ONLY mirrors radioPrefs.devAddr into _currentDeviceAddressFlow; it never
// starts a transport. Transport start remains driven exclusively by connect() (initial),
// setDeviceAddress() (explicit user switch), BLE/network state changes (environment
// recovery), and liveness restarts (zombie recovery) — see startTransportLocked() callers.
// _currentDeviceAddressFlow is a MutableStateFlow (atomic .value), so the unconditional
// assignment here is race-free without holding transportMutex; same-address writes are
// idempotent no-ops.
radioPrefs.devAddr
.onEach { addr -> _currentDeviceAddressFlow.value = addr }
.catch { Logger.e(it) { "radioPrefs.devAddr address-sync flow crashed" } }
.launchIn(processLifecycle.coroutineScope)
}
private fun initStateListeners() {
if (listenersInitialized.value) return
processLifecycle.coroutineScope.launch {
@@ -177,23 +211,17 @@ class SharedRadioInterfaceService(
if (listenersInitialized.value) return@withLock
listenersInitialized.value = true
radioPrefs.devAddr
.onEach { addr ->
transportMutex.withLock {
if (_currentDeviceAddressFlow.value != addr) {
_currentDeviceAddressFlow.value = addr
startTransportLocked()
}
}
}
.catch { Logger.e(it) { "devAddr flow crashed" } }
.launchIn(processLifecycle.coroutineScope)
bluetoothRepository.state
.onEach { state ->
transportMutex.withLock {
if (state.enabled) {
startTransportLocked()
// Environmental recovery only: don't wake a transport the user has
// explicitly disconnected from. stopTransportLocked() below still fires on
// BLE-disabled to tear down a running BLE link, but we deliberately do NOT
// clear connectionRequested here — that is disconnect()'s job.
if (connectionRequested) {
startTransportLocked()
}
} else if (runningTransportId == InterfaceId.BLUETOOTH) {
stopTransportLocked()
}
@@ -206,7 +234,10 @@ class SharedRadioInterfaceService(
.onEach { state ->
transportMutex.withLock {
if (state) {
startTransportLocked()
// Environmental recovery only — see the BLE listener above for rationale.
if (connectionRequested) {
startTransportLocked()
}
} else if (runningTransportId == InterfaceId.TCP) {
stopTransportLocked()
}
@@ -219,12 +250,24 @@ class SharedRadioInterfaceService(
}
override fun connect() {
processLifecycle.coroutineScope.launch { transportMutex.withLock { startTransportLocked() } }
processLifecycle.coroutineScope.launch {
transportMutex.withLock {
// Mark the connection lifecycle as active BEFORE starting so concurrent
// hardware/network listeners observe the gate as open.
connectionRequested = true
startTransportLocked()
}
}
initStateListeners()
}
override suspend fun disconnect() {
transportMutex.withLock { ignoreExceptionSuspend { stopTransportLocked() } }
transportMutex.withLock {
// Tear the gate down BEFORE stopTransportLocked() so a concurrent state-listener
// emission arriving while we wait for the mutex cannot re-start the transport.
connectionRequested = false
ignoreExceptionSuspend { stopTransportLocked() }
}
}
override fun isMockTransport(): Boolean = transportFactory.isMockTransport()
@@ -259,8 +302,17 @@ class SharedRadioInterfaceService(
processLifecycle.coroutineScope.launch {
transportMutex.withLock {
// The sanitized address is the single source of truth for the connectionRequested
// gate: a real address arms the lifecycle (connect() equivalent) so environmental
// listeners cannot race the rebind into a "down" state; null/("n") is a deselect
// that MUST clear the gate so subsequent BT/network recovery cannot resurrect a
// transport for a device the user explicitly tore down. Only start a fresh
// transport when an address was actually selected.
connectionRequested = sanitized != null
ignoreExceptionSuspend { stopTransportLocked() }
startTransportLocked()
if (sanitized != null) {
startTransportLocked()
}
}
}
return true
@@ -385,10 +437,24 @@ class SharedRadioInterfaceService(
// liveness recovery is self-healing — surfacing a modal dialog for a transient
// condition the app already handled is confusing UX. The warning log above
// remains the observability surface for this event.
//
// Ordering note (pre-existing): onDisconnect fires here, BEFORE the launched
// restart coroutine's `connectionRequested` check below. If an explicit disconnect()
// races this timeout, a spurious DeviceSleep emission can leak to observers. The
// connectionRequested gate still prevents the worse outcome — transport resurrection
// — so this is a benign UI-level transient, not a state-machine bug.
onDisconnect(isPermanent = false)
processLifecycle.coroutineScope.launch {
try {
transportMutex.withLock {
// Defense against a race between checkLiveness() firing and a
// concurrent disconnect(): if the user has torn the connection down
// since the heartbeat scheduled this restart, leave it down. The
// transport is already null after disconnect()'s stopTransportLocked().
if (!connectionRequested) {
Logger.d { "Skipping liveness restart: connection no longer requested" }
return@withLock
}
ignoreExceptionSuspend {
stopTransportLocked(notifyPermanent = false, sendPoliteDisconnect = false)
}

View File

@@ -18,8 +18,10 @@ package org.meshtastic.core.service
import co.touchlab.kermit.Severity
import dev.mokkery.MockMode
import dev.mokkery.answering.calls
import dev.mokkery.answering.returns
import dev.mokkery.every
import dev.mokkery.everySuspend
import dev.mokkery.matcher.any
import dev.mokkery.mock
import dev.mokkery.verify
@@ -32,6 +34,7 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import org.meshtastic.core.common.database.DatabaseManager
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.model.ConnectionState
import org.meshtastic.core.repository.CommandSender
import org.meshtastic.core.repository.MeshConfigHandler
import org.meshtastic.core.repository.MeshConnectionManager
@@ -46,6 +49,7 @@ import org.meshtastic.core.takserver.TAKMeshIntegration
import org.meshtastic.core.takserver.TAKServerManager
import org.meshtastic.proto.LocalModuleConfig
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
@@ -75,11 +79,18 @@ class MeshServiceOrchestratorTest {
private fun createOrchestrator(
receivedData: MutableSharedFlow<ByteArray> = MutableSharedFlow(),
connectionError: MutableSharedFlow<String> = MutableSharedFlow(),
connectionState: MutableStateFlow<ConnectionState> = MutableStateFlow(ConnectionState.Disconnected),
// A valid default address lets every start() proceed through the new
// wait-for-valid-address -> DB switch -> connect ordering without per-test boilerplate.
// Tests that need a different initial address (or no address) override it explicitly.
currentDeviceAddressFlow: MutableStateFlow<String?> = MutableStateFlow<String?>("x:AA:BB:CC:DD:EE:FF"),
takEnabledFlow: MutableStateFlow<Boolean> = MutableStateFlow(false),
takRunningFlow: MutableStateFlow<Boolean> = MutableStateFlow(false),
): MeshServiceOrchestrator {
every { radioInterfaceService.receivedData } returns receivedData
every { radioInterfaceService.connectionError } returns connectionError
every { radioInterfaceService.connectionState } returns connectionState
every { radioInterfaceService.currentDeviceAddressFlow } returns currentDeviceAddressFlow
every { serviceRepository.meshPacketFlow } returns MutableSharedFlow()
every { meshConfigHandler.moduleConfig } returns MutableStateFlow(LocalModuleConfig())
every { takPrefs.isTakServerEnabled } returns takEnabledFlow
@@ -151,14 +162,40 @@ class MeshServiceOrchestratorTest {
@Test
fun testStartCallsSwitchActiveDatabase() {
every { radioInterfaceService.getDeviceAddress() } returns "tcp:192.168.1.100"
// New ordering: start() waits for currentDeviceAddressFlow to surface a valid address,
// then switches the DB to it and connects. getDeviceAddress() is no longer consulted
// from start(), so drive the address via the flow.
val deviceAddressFlow = MutableStateFlow<String?>("tcp:192.168.1.100")
val orchestrator = createOrchestrator(currentDeviceAddressFlow = deviceAddressFlow)
// Event recorder: locks the cold-start ordering (DB switch -> load cached NodeDB ->
// connect) without depending on Mokkery's global call-order semantics. The global
// order verifier (verifySuspend(order) { ... }) fails because start() also drives
// other calls on the same mocks (resetReceivedBuffer, currentDeviceAddressFlow
// collectors, the mid-session DB switch replay, etc.) that interleave with the
// three calls we care about and break the global sequence. Recording only the
// three calls under test sidesteps the global check entirely.
val events = mutableListOf<String>()
everySuspend { databaseManager.switchActiveDatabase(any()) } calls { events.add("switchDb") }
every { nodeManager.loadCachedNodeDB() } calls { events.add("loadCachedNodeDb") }
every { radioInterfaceService.connect() } calls { events.add("connect") }
val orchestrator = createOrchestrator()
orchestrator.start()
verifySuspend { databaseManager.switchActiveDatabase("tcp:192.168.1.100") }
verify { nodeManager.loadCachedNodeDB() }
verify { radioInterfaceService.connect() }
// Locks in the cold-start ordering invariant: DB switch -> load cached NodeDB ->
// connect. loadCachedNodeDB must run AFTER the DB switch (so it reads from the
// freshly-selected per-device DB) and BEFORE connect() (so the firmware handshake
// doesn't see a stale or empty node set). On UnconfinedTestDispatcher the
// handledLaunch block runs eagerly, producing these three events in order. The
// mid-session address observer then replays the initial value, producing a
// redundant "switchDb"; assert only the first three to lock the order without
// coupling to the redundant replay.
assertEquals(listOf("switchDb", "loadCachedNodeDb", "connect"), events.take(3))
orchestrator.stop()
}
@@ -270,4 +307,69 @@ class MeshServiceOrchestratorTest {
orchestrator.stop()
}
/**
* Regression: when [RadioInterfaceService.currentDeviceAddressFlow] emits a new address mid-session (e.g. a late
* process-lifecycle address resolution on Android), the orchestrator must propagate it to [DatabaseManager] so Room
* writes land in the right per-device DB. Previously start() only switched the DB once via getDeviceAddress() and
* missed subsequent address changes, leaving the new session writing to the old DB.
*
* DatabaseManager is idempotent, so the redundant initial replay (matching the getDeviceAddress() snapshot) is a
* no-op; we therefore assert by argument value rather than brittle exact counts.
*/
@Test
fun testCurrentDeviceAddressChangeSwitchesActiveDatabaseAfterStart() {
val deviceAddressFlow = MutableStateFlow<String?>("tcp:192.168.1.100")
val orchestrator = createOrchestrator(currentDeviceAddressFlow = deviceAddressFlow)
orchestrator.start()
// Initial address was switched (the first-valid emission in start() + the StateFlow
// replay into the mid-session observer; DatabaseManager is idempotent for the dup).
verifySuspend { databaseManager.switchActiveDatabase("tcp:192.168.1.100") }
// Mid-session address resolution must propagate to DatabaseManager.
deviceAddressFlow.value = "tcp:10.0.0.5"
verifySuspend { databaseManager.switchActiveDatabase("tcp:10.0.0.5") }
orchestrator.stop()
}
/**
* Lifecycle invariant: when the transport reports [ConnectionState.Connected] while the orchestrator is stopped
* (e.g. a late BLE liveness reconnect, or an emission that arrives after [MeshService.onDestroy]), the orchestrator
* MUST NOT auto-restart. The orchestrator is a Koin `@Single` whose lifetime exceeds the Android `Service`, so an
* unguarded observer launched in `init {}` would resurrect the orchestrator after a deliberate stop — collecting
* from [RadioInterfaceService.receivedData] in the background with no foreground service, no wake lock, and no UI.
*
* This also preserves the documented invariant that [MeshConnectionManagerImpl] is the only consumer of
* [RadioInterfaceService.connectionState]. Recovery after stop is the host's responsibility: it must call `start()`
* explicitly (e.g. via a fresh `MeshService.onCreate()`).
*
* Replaces the previous "orphan-Connected recovery" behavior that was removed for violating both invariants.
*/
@Test
fun testConnectedWhileStoppedDoesNotRestartWithoutExplicitStart() {
val receivedData = MutableSharedFlow<ByteArray>(extraBufferCapacity = 8)
val connectionState = MutableStateFlow<ConnectionState>(ConnectionState.Disconnected)
val orchestrator = createOrchestrator(receivedData = receivedData, connectionState = connectionState)
every { nodeManager.myNodeNum } returns MutableStateFlow(null)
// Stopped; never call start() explicitly.
assertFalse(orchestrator.isRunning)
// Transport reaches Connected — orchestrator must stay stopped.
connectionState.value = ConnectionState.Connected
assertFalse(orchestrator.isRunning)
// No collector may have been attached: a packet emitted now is unhandled.
val packet = byteArrayOf(7, 7, 7)
receivedData.tryEmit(packet)
verifySuspend(exactly(0)) { messageProcessor.handleFromRadio(packet, null) }
// Likewise, subsequent transport state cycles must not restart the orchestrator.
connectionState.value = ConnectionState.Disconnected
connectionState.value = ConnectionState.Connected
assertFalse(orchestrator.isRunning)
}
}

View File

@@ -214,8 +214,9 @@ class SharedRadioInterfaceServiceLivenessTest {
private fun createConnectedService(
address: String,
transportProvider: () -> RadioTransport = { FakeRadioTransport().also { createdTransports.add(it) } },
networkAvailability: MutableStateFlow<Boolean> = MutableStateFlow(true),
): SharedRadioInterfaceService {
every { networkRepository.networkAvailable } returns MutableStateFlow(true)
every { networkRepository.networkAvailable } returns networkAvailability
every { networkRepository.resolvedList } returns MutableSharedFlow()
every { analytics.isPlatformServicesAvailable } returns false
every { transportFactory.supportedDeviceTypes } returns listOf(DeviceType.BLE)
@@ -550,4 +551,238 @@ class SharedRadioInterfaceServiceLivenessTest {
advanceTimeBy(1_000L)
}
}
// ─── connectionRequested gate: environmental recovery vs explicit disconnect ────────────
/**
* Regression: after an explicit [SharedRadioInterfaceService.disconnect], BT state emissions MUST NOT restart the
* transport. Without the `connectionRequested` gate, a subsequent `bluetoothRepository.state = enabled` emission
* would silently resurrect the transport the user tore down — leaving the app "connected" with no orchestrator
* collector, no NodeDB load, and no channels. Only [disconnect] clears the gate; the state listener checks it
* before calling `startTransportLocked()`.
*/
@Test
fun `BLE state recovery does not restart transport after explicit disconnect`() = runTest(testDispatcher) {
clock = 0L
val service = createConnectedService("xAA:BB:CC:DD:EE:FF")
try {
assertEquals(1, createdTransports.size, "Initial connect should create one transport")
// Explicit user-initiated disconnect: clears the connectionRequested gate BEFORE
// stopTransportLocked() so a racing state-listener emission cannot re-arm the transport.
service.disconnect()
// Drain the polite-disconnect frame (production waits POLITE_DISCONNECT_DRAIN_MS = 500ms).
advanceTimeBy(1_000L)
val transportCountAfterDisconnect = createdTransports.size
// Force a fresh BLE state cycle (disabled → enabled). The initial listener subscription
// already consumed the default enabled=true emission during connect(), so toggling is
// required to deliver a NEW enabled=true emission that would trigger startTransportLocked().
bluetoothRepository.setBluetoothEnabled(false)
testDispatcher.scheduler.runCurrent()
bluetoothRepository.setBluetoothEnabled(true)
testDispatcher.scheduler.runCurrent()
advanceTimeBy(1_000L)
assertEquals(
transportCountAfterDisconnect,
createdTransports.size,
"BT-enabled emission after disconnect must NOT restart transport (connectionRequested gate)",
)
assertFalse(
service.connectionState.value == ConnectionState.Connected,
"State must remain Disconnected after post-disconnect BT recovery emission",
)
} finally {
service.disconnect()
advanceTimeBy(1_000L)
}
}
/**
* Counterpart to the disconnect-gate test: environmental recovery MUST still function while a connection is
* explicitly desired (connectionRequested == true). When the BT radio toggles off then back on (user toggled
* airplane mode, BT permission revoked/restored, etc.) the state listener must tear down and restart the transport.
* Only [disconnect] clears the gate; environmental stops via `stopTransportLocked()` do not.
*/
@Test
fun `BLE environmental recovery restarts transport while connection is still desired`() = runTest(testDispatcher) {
clock = 0L
val service = createConnectedService("xAA:BB:CC:DD:EE:FF")
try {
assertEquals(1, createdTransports.size, "Initial connect should create one transport")
// Environmental stop: BT radio disabled while a connection is active.
// stopTransportLocked() fires but connectionRequested stays true (only disconnect() clears it).
bluetoothRepository.setBluetoothEnabled(false)
testDispatcher.scheduler.runCurrent()
// Drain the polite-disconnect frame inside the listener's stopTransportLocked() (500ms).
advanceTimeBy(1_000L)
assertTrue(
createdTransports.first().closeCalled,
"BLE-disabled should close the running transport via environmental stop",
)
// Environmental recovery: BT radio re-enabled. connectionRequested is still true, so the
// listener MUST call startTransportLocked() and bring the transport back.
bluetoothRepository.setBluetoothEnabled(true)
testDispatcher.scheduler.runCurrent()
advanceTimeBy(1_000L)
assertEquals(
2,
createdTransports.size,
"BLE-enabled should restart transport via environmental recovery (connectionRequested still true)",
)
} finally {
service.disconnect()
advanceTimeBy(1_000L)
}
}
/**
* Network/TCP counterpart to `BLE state recovery does not restart transport after explicit disconnect`.
*
* The [networkRepository.networkAvailable] listener (see `initStateListeners`) consults the same
* `connectionRequested` gate as the BLE listener: after an explicit [SharedRadioInterfaceService.disconnect], a
* network-available emission MUST NOT resurrect the transport. Without the gate, a connectivity cycle (Wi-Fi
* toggled off→on, network handoff) would silently restart a transport the user tore down.
*/
@Test
fun `network available recovery does not restart transport after explicit disconnect`() = runTest(testDispatcher) {
clock = 0L
val networkAvailability = MutableStateFlow<Boolean>(true)
val service = createConnectedService("t192.168.1.100", networkAvailability = networkAvailability)
try {
assertEquals(1, createdTransports.size, "Initial connect should create one transport")
// Explicit user-initiated disconnect: clears the connectionRequested gate BEFORE
// stopTransportLocked() so a racing network-listener emission cannot re-arm the transport.
service.disconnect()
// Drain the polite-disconnect frame (production waits POLITE_DISCONNECT_DRAIN_MS = 500ms).
advanceTimeBy(1_000L)
val transportCountAfterDisconnect = createdTransports.size
// Force a fresh network-available cycle (false → true). The initial listener subscription
// already consumed the default true emission during connect(), so toggling is required to
// deliver a NEW true emission that would trigger startTransportLocked().
networkAvailability.value = false
testDispatcher.scheduler.runCurrent()
networkAvailability.value = true
testDispatcher.scheduler.runCurrent()
advanceTimeBy(1_000L)
assertEquals(
transportCountAfterDisconnect,
createdTransports.size,
"network-available emission after disconnect must NOT restart transport (connectionRequested gate)",
)
assertFalse(
service.connectionState.value == ConnectionState.Connected,
"State must remain Disconnected after post-disconnect network recovery emission",
)
} finally {
service.disconnect()
advanceTimeBy(1_000L)
}
}
// ─── connectionRequested gate: setDeviceAddress(null)/("n") deselect ────────────────────
/**
* Regression: after [SharedRadioInterfaceService.setDeviceAddress] with `null`/`"n"` (deselect), the
* `connectionRequested` gate MUST be cleared so subsequent BLE state emissions cannot restart the transport.
* Without the gate-clear in setDeviceAddress(), BT recovery (user toggled BT off→on) would silently resurrect a
* transport for a device the user explicitly deselected — leaving the app "connected" to an unselected device with
* no orchestrator collector.
*/
@Test
fun `BLE state recovery does not restart transport after setDeviceAddress deselect`() = runTest(testDispatcher) {
clock = 0L
val service = createConnectedService("xAA:BB:CC:DD:EE:FF")
try {
assertEquals(1, createdTransports.size, "Initial connect should create one transport")
// Explicit device deselect: setDeviceAddress(null)/("n") must clear the connectionRequested
// gate BEFORE stopTransportLocked() so a racing state-listener emission cannot re-arm it.
service.setDeviceAddress("n")
// Drain the polite-disconnect frame (production waits POLITE_DISCONNECT_DRAIN_MS = 500ms).
advanceTimeBy(1_000L)
val transportCountAfterDeselect = createdTransports.size
// Force a fresh BLE state cycle (disabled → enabled). The initial listener subscription
// already consumed the default enabled=true emission during connect(), so toggling is
// required to deliver a NEW enabled=true emission that would trigger startTransportLocked().
bluetoothRepository.setBluetoothEnabled(false)
testDispatcher.scheduler.runCurrent()
bluetoothRepository.setBluetoothEnabled(true)
testDispatcher.scheduler.runCurrent()
advanceTimeBy(1_000L)
assertEquals(
transportCountAfterDeselect,
createdTransports.size,
"BT-enabled emission after deselect must NOT restart transport (connectionRequested gate cleared)",
)
assertFalse(
service.connectionState.value == ConnectionState.Connected,
"State must NOT be Connected after post-deselect BT recovery emission",
)
} finally {
service.disconnect()
advanceTimeBy(1_000L)
}
}
/**
* Network/TCP counterpart to `BLE state recovery does not restart transport after setDeviceAddress deselect`.
*
* After [SharedRadioInterfaceService.setDeviceAddress] with `null`/`"n"`, a network-available emission MUST NOT
* resurrect the transport. Without the gate-clear in setDeviceAddress(), network recovery (Wi-Fi toggled off→on,
* network handoff) would silently restart a transport for a device the user explicitly deselected.
*/
@Test
fun `network available recovery does not restart transport after setDeviceAddress deselect`() =
runTest(testDispatcher) {
clock = 0L
val networkAvailability = MutableStateFlow<Boolean>(true)
val service = createConnectedService("t192.168.1.100", networkAvailability = networkAvailability)
try {
assertEquals(1, createdTransports.size, "Initial connect should create one transport")
// Explicit device deselect: setDeviceAddress(null)/("n") must clear the connectionRequested
// gate BEFORE stopTransportLocked() so a racing network-listener emission cannot re-arm it.
service.setDeviceAddress("n")
// Drain the polite-disconnect frame (production waits POLITE_DISCONNECT_DRAIN_MS = 500ms).
advanceTimeBy(1_000L)
val transportCountAfterDeselect = createdTransports.size
// Force a fresh network-available cycle (false → true). The initial listener subscription
// already consumed the default true emission during connect(), so toggling is required to
// deliver a NEW true emission that would trigger startTransportLocked().
networkAvailability.value = false
testDispatcher.scheduler.runCurrent()
networkAvailability.value = true
testDispatcher.scheduler.runCurrent()
advanceTimeBy(1_000L)
assertEquals(
transportCountAfterDeselect,
createdTransports.size,
"network-available emission after deselect must NOT restart transport (connectionRequested gate cleared)",
)
assertFalse(
service.connectionState.value == ConnectionState.Connected,
"State must NOT be Connected after post-deselect network recovery emission",
)
} finally {
service.disconnect()
advanceTimeBy(1_000L)
}
}
}