fix(data): make MeshConnectionManagerImpl.onConnectionChanged atomic (#5076)

This commit is contained in:
James Rich
2026-04-11 19:49:09 -05:00
committed by GitHub
parent 62264b10c6
commit 5e44cbd3a9
2 changed files with 110 additions and 3 deletions

View File

@@ -25,6 +25,8 @@ import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.koin.core.annotation.Named
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.handledLaunch
@@ -83,6 +85,12 @@ class MeshConnectionManagerImpl(
private val appWidgetUpdater: AppWidgetUpdater,
@Named("ServiceScope") private val scope: CoroutineScope,
) : MeshConnectionManager {
/**
* Serializes [onConnectionChanged] to prevent TOCTOU races when multiple coroutines emit state transitions
* concurrently (e.g. flow collector vs. sleep-timeout coroutine).
*/
private val connectionMutex = Mutex()
private var sleepTimeout: Job? = null
private var locationRequestsJob: Job? = null
private var handshakeTimeout: Job? = null
@@ -139,14 +147,14 @@ class MeshConnectionManagerImpl(
onConnectionChanged(effectiveState)
}
private fun onConnectionChanged(c: ConnectionState) {
private suspend fun onConnectionChanged(c: ConnectionState) = connectionMutex.withLock {
val current = serviceRepository.connectionState.value
if (current == c) return
if (current == c) return@withLock
// If the transport reports 'Connected', but we are already in the middle of a handshake (Connecting)
if (c is ConnectionState.Connected && current is ConnectionState.Connecting) {
Logger.d { "Ignoring redundant transport connection signal while handshake is in progress" }
return
return@withLock
}
Logger.i { "onConnectionChanged: $current -> $c" }

View File

@@ -27,6 +27,7 @@ import dev.mokkery.verify
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.advanceUntilIdle
@@ -310,4 +311,102 @@ class MeshConnectionManagerImplTest {
"Should transition to Disconnected after capped timeout (300s), not the raw 3630s",
)
}
@Test
fun `rapid state transitions are serialized by connectionMutex`() = runTest(testDispatcher) {
// Power saving enabled so DeviceSleep is preserved (not mapped to Disconnected)
val config = LocalConfig(power = Config.PowerConfig(is_power_saving = true))
every { radioConfigRepository.localConfigFlow } returns flowOf(config)
every { packetHandler.sendToRadio(any<org.meshtastic.proto.ToRadio>()) } returns Unit
every { serviceNotifications.updateServiceStateNotification(any(), any()) } returns Unit
every { packetHandler.stopPacketQueue() } returns Unit
every { locationManager.stop() } returns Unit
every { mqttManager.stop() } returns Unit
every { nodeManager.nodeDBbyNodeNum } returns emptyMap()
// Record every state transition so we can verify ordering
val observed = mutableListOf<ConnectionState>()
every { serviceRepository.setConnectionState(any()) } calls
{ call ->
val state = call.arg<ConnectionState>(0)
observed.add(state)
connectionStateFlow.value = state
}
manager = createManager(backgroundScope)
advanceUntilIdle()
// Rapid-fire: Connected -> DeviceSleep -> Disconnected without yielding between them.
// Without the Mutex, the intermediate DeviceSleep could be missed or applied out of order.
radioConnectionState.value = ConnectionState.Connected
radioConnectionState.value = ConnectionState.DeviceSleep
radioConnectionState.value = ConnectionState.Disconnected
advanceUntilIdle()
// Verify final state
assertEquals(
ConnectionState.Disconnected,
serviceRepository.connectionState.value,
"Final state should be Disconnected after rapid transitions",
)
// Verify that all intermediate states were observed in correct order.
// Connected triggers handleConnected() which sets Connecting (handshake start),
// then DeviceSleep, then Disconnected.
assertEquals(
listOf(ConnectionState.Connecting, ConnectionState.DeviceSleep, ConnectionState.Disconnected),
observed,
"State transitions should be serialized in order: Connecting -> DeviceSleep -> Disconnected",
)
}
@Test
fun `concurrent sleep-timeout and radio state change are serialized`() {
val standardDispatcher = StandardTestDispatcher()
runTest(standardDispatcher) {
// Power saving enabled with a short ls_secs so the sleep timeout fires quickly
val config = LocalConfig(power = Config.PowerConfig(is_power_saving = true, ls_secs = 1))
every { radioConfigRepository.localConfigFlow } returns flowOf(config)
every { packetHandler.sendToRadio(any<org.meshtastic.proto.ToRadio>()) } returns Unit
every { serviceNotifications.updateServiceStateNotification(any(), any()) } returns Unit
every { packetHandler.stopPacketQueue() } returns Unit
every { locationManager.stop() } returns Unit
every { mqttManager.stop() } returns Unit
every { nodeManager.nodeDBbyNodeNum } returns emptyMap()
val observed = mutableListOf<ConnectionState>()
every { serviceRepository.setConnectionState(any()) } calls
{ call ->
val state = call.arg<ConnectionState>(0)
observed.add(state)
connectionStateFlow.value = state
}
manager = createManager(backgroundScope)
advanceUntilIdle()
// Transition to Connected -> DeviceSleep to start the sleep timer
radioConnectionState.value = ConnectionState.Connected
advanceUntilIdle()
radioConnectionState.value = ConnectionState.DeviceSleep
advanceUntilIdle()
observed.clear()
// Before the sleep timeout fires, emit Connected from the radio (simulating device
// waking up). Then let the timeout fire. The mutex ensures they don't race.
radioConnectionState.value = ConnectionState.Connected
// Advance past the sleep timeout (ls_secs=1 + 30s base = 31s)
advanceTimeBy(32_000L)
advanceUntilIdle()
// The Connected transition should have cancelled the sleep timeout, so we should
// end up in Connecting (from handleConnected), NOT Disconnected (from timeout).
assertEquals(
ConnectionState.Connecting,
serviceRepository.connectionState.value,
"Connected should cancel the sleep timeout; final state should be Connecting",
)
}
}
}