From 5e44cbd3a902f4152905e1d9558db0e92efe193b Mon Sep 17 00:00:00 2001 From: James Rich <2199651+jamesarich@users.noreply.github.com> Date: Sat, 11 Apr 2026 19:49:09 -0500 Subject: [PATCH] fix(data): make MeshConnectionManagerImpl.onConnectionChanged atomic (#5076) --- .../data/manager/MeshConnectionManagerImpl.kt | 14 ++- .../manager/MeshConnectionManagerImplTest.kt | 99 +++++++++++++++++++ 2 files changed, 110 insertions(+), 3 deletions(-) diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImpl.kt index dbf07fdaf..d64753bbf 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImpl.kt @@ -25,6 +25,8 @@ import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import org.koin.core.annotation.Named import org.koin.core.annotation.Single import org.meshtastic.core.common.util.handledLaunch @@ -83,6 +85,12 @@ class MeshConnectionManagerImpl( private val appWidgetUpdater: AppWidgetUpdater, @Named("ServiceScope") private val scope: CoroutineScope, ) : MeshConnectionManager { + /** + * Serializes [onConnectionChanged] to prevent TOCTOU races when multiple coroutines emit state transitions + * concurrently (e.g. flow collector vs. sleep-timeout coroutine). + */ + private val connectionMutex = Mutex() + private var sleepTimeout: Job? = null private var locationRequestsJob: Job? = null private var handshakeTimeout: Job? = null @@ -139,14 +147,14 @@ class MeshConnectionManagerImpl( onConnectionChanged(effectiveState) } - private fun onConnectionChanged(c: ConnectionState) { + private suspend fun onConnectionChanged(c: ConnectionState) = connectionMutex.withLock { val current = serviceRepository.connectionState.value - if (current == c) return + if (current == c) return@withLock // If the transport reports 'Connected', but we are already in the middle of a handshake (Connecting) if (c is ConnectionState.Connected && current is ConnectionState.Connecting) { Logger.d { "Ignoring redundant transport connection signal while handshake is in progress" } - return + return@withLock } Logger.i { "onConnectionChanged: $current -> $c" } diff --git a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImplTest.kt b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImplTest.kt index 36ee37f2e..55adf8b57 100644 --- a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImplTest.kt +++ b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImplTest.kt @@ -27,6 +27,7 @@ import dev.mokkery.verify import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.test.StandardTestDispatcher import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlinx.coroutines.test.advanceTimeBy import kotlinx.coroutines.test.advanceUntilIdle @@ -310,4 +311,102 @@ class MeshConnectionManagerImplTest { "Should transition to Disconnected after capped timeout (300s), not the raw 3630s", ) } + + @Test + fun `rapid state transitions are serialized by connectionMutex`() = runTest(testDispatcher) { + // Power saving enabled so DeviceSleep is preserved (not mapped to Disconnected) + val config = LocalConfig(power = Config.PowerConfig(is_power_saving = true)) + every { radioConfigRepository.localConfigFlow } returns flowOf(config) + every { packetHandler.sendToRadio(any()) } returns Unit + every { serviceNotifications.updateServiceStateNotification(any(), any()) } returns Unit + every { packetHandler.stopPacketQueue() } returns Unit + every { locationManager.stop() } returns Unit + every { mqttManager.stop() } returns Unit + every { nodeManager.nodeDBbyNodeNum } returns emptyMap() + + // Record every state transition so we can verify ordering + val observed = mutableListOf() + every { serviceRepository.setConnectionState(any()) } calls + { call -> + val state = call.arg(0) + observed.add(state) + connectionStateFlow.value = state + } + + manager = createManager(backgroundScope) + advanceUntilIdle() + + // Rapid-fire: Connected -> DeviceSleep -> Disconnected without yielding between them. + // Without the Mutex, the intermediate DeviceSleep could be missed or applied out of order. + radioConnectionState.value = ConnectionState.Connected + radioConnectionState.value = ConnectionState.DeviceSleep + radioConnectionState.value = ConnectionState.Disconnected + advanceUntilIdle() + + // Verify final state + assertEquals( + ConnectionState.Disconnected, + serviceRepository.connectionState.value, + "Final state should be Disconnected after rapid transitions", + ) + + // Verify that all intermediate states were observed in correct order. + // Connected triggers handleConnected() which sets Connecting (handshake start), + // then DeviceSleep, then Disconnected. + assertEquals( + listOf(ConnectionState.Connecting, ConnectionState.DeviceSleep, ConnectionState.Disconnected), + observed, + "State transitions should be serialized in order: Connecting -> DeviceSleep -> Disconnected", + ) + } + + @Test + fun `concurrent sleep-timeout and radio state change are serialized`() { + val standardDispatcher = StandardTestDispatcher() + runTest(standardDispatcher) { + // Power saving enabled with a short ls_secs so the sleep timeout fires quickly + val config = LocalConfig(power = Config.PowerConfig(is_power_saving = true, ls_secs = 1)) + every { radioConfigRepository.localConfigFlow } returns flowOf(config) + every { packetHandler.sendToRadio(any()) } returns Unit + every { serviceNotifications.updateServiceStateNotification(any(), any()) } returns Unit + every { packetHandler.stopPacketQueue() } returns Unit + every { locationManager.stop() } returns Unit + every { mqttManager.stop() } returns Unit + every { nodeManager.nodeDBbyNodeNum } returns emptyMap() + + val observed = mutableListOf() + every { serviceRepository.setConnectionState(any()) } calls + { call -> + val state = call.arg(0) + observed.add(state) + connectionStateFlow.value = state + } + + manager = createManager(backgroundScope) + advanceUntilIdle() + + // Transition to Connected -> DeviceSleep to start the sleep timer + radioConnectionState.value = ConnectionState.Connected + advanceUntilIdle() + radioConnectionState.value = ConnectionState.DeviceSleep + advanceUntilIdle() + + observed.clear() + + // Before the sleep timeout fires, emit Connected from the radio (simulating device + // waking up). Then let the timeout fire. The mutex ensures they don't race. + radioConnectionState.value = ConnectionState.Connected + // Advance past the sleep timeout (ls_secs=1 + 30s base = 31s) + advanceTimeBy(32_000L) + advanceUntilIdle() + + // The Connected transition should have cancelled the sleep timeout, so we should + // end up in Connecting (from handleConnected), NOT Disconnected (from timeout). + assertEquals( + ConnectionState.Connecting, + serviceRepository.connectionState.value, + "Connected should cancel the sleep timeout; final state should be Connecting", + ) + } + } }