diff --git a/.skills/compose-ui/strings-index.txt b/.skills/compose-ui/strings-index.txt index bf12fdb7c..2af559b4c 100644 --- a/.skills/compose-ui/strings-index.txt +++ b/.skills/compose-ui/strings-index.txt @@ -513,6 +513,7 @@ environment_metrics_update_interval_seconds environment_metrics_use_fahrenheit error error_duty_cycle +error_recovery_exhausted establish_session establishing_session ethernet_config @@ -1133,6 +1134,7 @@ rebroadcast_mode_known_only_desc rebroadcast_mode_local_only_desc rebroadcast_mode_none_desc recent_network_devices +reconnecting red refresh refresh_metadata @@ -1411,6 +1413,23 @@ traceroute_route_towards_dest traceroute_showing_nodes track_and_share_locations track_point +### TRAFFIC ### +traffic_management +traffic_management_config +traffic_management_drop_unknown_enabled +traffic_management_enabled +traffic_management_exhaust_hop_position +traffic_management_exhaust_hop_telemetry +traffic_management_nodeinfo_direct_response +traffic_management_nodeinfo_direct_response_max_hops +traffic_management_position_dedup +traffic_management_position_min_interval +traffic_management_position_precision +traffic_management_rate_limit_enabled +traffic_management_rate_limit_max_packets +traffic_management_rate_limit_window +traffic_management_router_preserve_hops +traffic_management_unknown_packet_threshold transmit_over_lora transport_ble transport_tcp diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImpl.kt index 602c7964d..6d0bd29ea 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImpl.kt @@ -18,11 +18,13 @@ package org.meshtastic.core.data.manager import co.touchlab.kermit.Logger import kotlinx.atomicfu.atomic +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.delay import org.koin.core.annotation.Named import org.koin.core.annotation.Single import org.meshtastic.core.common.util.handledLaunch +import org.meshtastic.core.common.util.safeCatching import org.meshtastic.core.model.ConnectionState import org.meshtastic.core.model.DeviceVersion import org.meshtastic.core.repository.CommandSender @@ -77,10 +79,14 @@ class MeshConfigFlowManagerImpl( * Stage 1: receiving device config, module config, channels, and metadata. * * [rawMyNodeInfo] arrives first (my_info packet); [metadata] may arrive shortly after. Both are consumed - * together by [buildMyNodeInfo] at Stage 1 completion. + * together by [buildMyNodeInfo] at Stage 1 completion. Some firmware/network paths can deliver NodeInfo before + * the Stage 2 request; keep those packets so the later node-list phase can still make progress. */ - data class ReceivingConfig(val rawMyNodeInfo: ProtoMyNodeInfo, val metadata: DeviceMetadata? = null) : - HandshakeState() + data class ReceivingConfig( + val rawMyNodeInfo: ProtoMyNodeInfo, + val metadata: DeviceMetadata? = null, + val earlyNodes: List = emptyList(), + ) : HandshakeState() /** * Stage 2: receiving node-info packets from the firmware. @@ -95,13 +101,18 @@ class MeshConfigFlowManagerImpl( data class Complete(val myNodeInfo: SharedMyNodeInfo) : HandshakeState() } - private var handshakeState: HandshakeState = HandshakeState.Idle + private val handshakeState = atomic(HandshakeState.Idle) override val newNodeCount: Int - get() = (handshakeState as? HandshakeState.ReceivingNodeInfo)?.nodes?.size ?: 0 + get() = + when (val state = handshakeState.value) { + is HandshakeState.ReceivingConfig -> state.earlyNodes.size + is HandshakeState.ReceivingNodeInfo -> state.nodes.size + else -> 0 + } override fun handleConfigComplete(configCompleteId: Int) { - val state = handshakeState + val state = handshakeState.value when (configCompleteId) { HandshakeConstants.CONFIG_NONCE -> { if (state !is HandshakeState.ReceivingConfig) { @@ -129,7 +140,7 @@ class MeshConfigFlowManagerImpl( val finalizedInfo = buildMyNodeInfo(state.rawMyNodeInfo, state.metadata) if (finalizedInfo == null) { Logger.w { "Stage 1 failed: could not build MyNodeInfo, retrying Stage 1" } - handshakeState = HandshakeState.Idle + handshakeState.value = HandshakeState.Idle scope.handledLaunch { delay(wantConfigDelay) connectionManager.value.startConfigOnly() @@ -149,9 +160,10 @@ class MeshConfigFlowManagerImpl( } } - handshakeState = HandshakeState.ReceivingNodeInfo(myNodeInfo = finalizedInfo) - Logger.i { "myNodeInfo committed (nodeNum=${finalizedInfo.myNodeNum})" } + handshakeState.value = HandshakeState.ReceivingNodeInfo(myNodeInfo = finalizedInfo, nodes = state.earlyNodes) + Logger.i { "myNodeInfo committed" } connectionManager.value.onRadioConfigLoaded() + serviceStateWriter.setConnectionProgress("Loading node list") scope.handledLaunch { delay(wantConfigDelay) @@ -160,6 +172,7 @@ class MeshConfigFlowManagerImpl( Logger.i { "Requesting NodeInfo (Stage 2)" } connectionManager.value.startNodeInfoOnly() } + connectionManager.value.onHandshakeProgress() } private fun handleNodeInfoComplete(state: HandshakeState.ReceivingNodeInfo) { @@ -171,7 +184,15 @@ class MeshConfigFlowManagerImpl( // The async work below (DB writes, broadcasts) proceeds without the guard. // Because nodes is now immutable, no snapshot is needed — state.nodes IS the snapshot. // Any stall-guard retry that re-enters handleNodeInfo will see Complete state and be ignored. - handshakeState = HandshakeState.Complete(myNodeInfo = info) + handshakeState.value = HandshakeState.Complete(myNodeInfo = info) + + // Cancel the transport-aware fast-recovery watchdog SYNCHRONOUSLY, before the async DB + // install work below is launched. The firmware handshake has already completed at this + // point (NODE_INFO_NONCE received); a slow Room commit on a large mesh would otherwise + // falsely trip the 12s fast-recovery timeout before onNodeDbReady() gets a chance to + // cancel it. onNodeDbReady() still performs the same cancel as part of its larger post- + // NodeDB side-effect set, but it runs only after the DB install block finishes. + connectionManager.value.onHandshakeComplete() val entities = state.nodes.mapNotNull { nodeInfo -> @@ -184,20 +205,39 @@ class MeshConfigFlowManagerImpl( } scope.handledLaunch { - nodeRepository.installConfig(info, entities) - analytics.setDeviceAttributes(info.firmwareVersion ?: "unknown", info.model ?: "unknown") + try { + nodeRepository.installConfig(info, entities) + } catch (e: CancellationException) { + throw e + } catch (@Suppress("TooGenericExceptionCaught") e: Exception) { + Logger.e(e) { "Post-handshake NodeDB install failed; restarting transport to recover" } + nodeManager.setNodeDbReady(false) + nodeManager.setAllowNodeDbWrites(false) + connectionManager.value.recoverPostHandshakeFailure() + return@handledLaunch + } + nodeManager.setNodeDbReady(true) nodeManager.setAllowNodeDbWrites(true) serviceStateWriter.setConnectionState(ConnectionState.Connected) - connectionManager.value.onNodeDbReady() + + safeCatching { analytics.setDeviceAttributes(info.firmwareVersion ?: "unknown", info.model ?: "unknown") } + .onFailure { e -> Logger.w(e) { "Failed to set post-handshake analytics attributes" } } + safeCatching { connectionManager.value.onNodeDbReady() } + .onFailure { e -> Logger.e(e) { "Post-connected onNodeDbReady side effects failed" } } } + // Note: onHandshakeProgress() is intentionally NOT called here. By this point the + // handshake has reached HandshakeState.Complete and the synchronous onHandshakeComplete() + // call above has already cancelled the watchdog. Re-arming via onHandshakeProgress() + // would be both semantically wrong and wasted work. The remaining onHandshakeProgress + // sites cover all genuine progress. } override fun handleMyInfo(myInfo: ProtoMyNodeInfo) { - Logger.i { "MyNodeInfo received: ${myInfo.my_node_num}" } + Logger.i { "MyNodeInfo received" } // Transition to Stage 1, discarding any stale data from a prior interrupted handshake. - handshakeState = HandshakeState.ReceivingConfig(rawMyNodeInfo = myInfo) + handshakeState.value = HandshakeState.ReceivingConfig(rawMyNodeInfo = myInfo) nodeManager.setMyNodeNum(myInfo.my_node_num) nodeManager.setFirmwareEdition(myInfo.firmware_edition) applyEventFirmwareNotificationDefaults(myInfo.firmware_edition) @@ -220,35 +260,47 @@ class MeshConfigFlowManagerImpl( radioConfigRepository.clearDeviceUIConfig() radioConfigRepository.clearFileManifest() } + connectionManager.value.onHandshakeProgress() } override fun handleLocalMetadata(metadata: DeviceMetadata) { Logger.i { "Local Metadata received: ${metadata.firmware_version}" } - val state = handshakeState + val state = handshakeState.value if (state is HandshakeState.ReceivingConfig) { - handshakeState = state.copy(metadata = metadata) + handshakeState.value = state.copy(metadata = metadata) // Persist the metadata immediately — buildMyNodeInfo() reads it at Stage 1 complete, // but the DB write does not need to wait until then. if (metadata != DeviceMetadata()) { scope.handledLaunch { nodeRepository.insertMetadata(state.rawMyNodeInfo.my_node_num, metadata) } } + connectionManager.value.onHandshakeProgress() } else { Logger.w { "Ignoring metadata outside Stage 1 (state=$state)" } } } override fun handleNodeInfo(info: NodeInfo) { - val state = handshakeState - if (state is HandshakeState.ReceivingNodeInfo) { - handshakeState = state.copy(nodes = state.nodes + info) - } else { - Logger.w { "Ignoring NodeInfo outside Stage 2 (state=$state)" } + val state = handshakeState.value + when (state) { + is HandshakeState.ReceivingConfig -> { + Logger.d { "Buffering NodeInfo received during Stage 1" } + handshakeState.value = state.copy(earlyNodes = state.earlyNodes.withNodeInfo(info)) + connectionManager.value.onHandshakeProgress() + } + + is HandshakeState.ReceivingNodeInfo -> { + handshakeState.value = state.copy(nodes = state.nodes.withNodeInfo(info)) + connectionManager.value.onHandshakeProgress() + } + + else -> Logger.w { "Ignoring NodeInfo outside active handshake (state=$state)" } } } override fun handleFileInfo(info: FileInfo) { Logger.d { "FileInfo received: ${info.file_name} (${info.size_bytes} bytes)" } scope.handledLaunch { radioConfigRepository.addFileInfo(info) } + connectionManager.value.onHandshakeProgress() } override fun triggerWantConfig() { @@ -305,3 +357,12 @@ class MeshConfigFlowManagerImpl( } } } + +private fun List.withNodeInfo(info: NodeInfo): List { + val index = indexOfFirst { it.num == info.num } + return if (index >= 0) { + toMutableList().apply { this[index] = info } + } else { + this + info + } +} diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConfigHandlerImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConfigHandlerImpl.kt index 9945f9bd6..4917bacfa 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConfigHandlerImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConfigHandlerImpl.kt @@ -26,6 +26,7 @@ import org.koin.core.annotation.Named import org.koin.core.annotation.Single import org.meshtastic.core.common.util.handledLaunch import org.meshtastic.core.repository.MeshConfigHandler +import org.meshtastic.core.repository.MeshConnectionManager import org.meshtastic.core.repository.NodeManager import org.meshtastic.core.repository.RadioConfigRepository import org.meshtastic.core.repository.ServiceStateWriter @@ -41,6 +42,7 @@ class MeshConfigHandlerImpl( private val radioConfigRepository: RadioConfigRepository, private val serviceStateWriter: ServiceStateWriter, private val nodeManager: NodeManager, + private val connectionManager: Lazy, @Named("ServiceScope") private val scope: CoroutineScope, ) : MeshConfigHandler { @@ -59,6 +61,7 @@ class MeshConfigHandlerImpl( Logger.d { "Device config received: ${config.summarize()}" } scope.handledLaunch { radioConfigRepository.setLocalConfig(config) } serviceStateWriter.setConnectionProgress("Device config received") + connectionManager.value.onHandshakeProgress() } override fun handleModuleConfig(config: ModuleConfig) { @@ -69,6 +72,7 @@ class MeshConfigHandlerImpl( config.statusmessage?.let { sm -> nodeManager.myNodeNum.value?.let { num -> nodeManager.updateNodeStatus(num, sm.node_status) } } + connectionManager.value.onHandshakeProgress() } override fun handleChannel(channel: Channel) { @@ -83,11 +87,16 @@ class MeshConfigHandlerImpl( } else { serviceStateWriter.setConnectionProgress("Channels (${index + 1})") } + connectionManager.value.onHandshakeProgress() } override fun handleDeviceUIConfig(config: DeviceUIConfig) { Logger.d { "DeviceUI config received" } scope.handledLaunch { radioConfigRepository.setDeviceUIConfig(config) } + // deviceuiConfig arrives during Stage 1 immediately after my_info. It proves the transport + // is alive, so surface it as handshake progress — without this, a long gap before the next + // meaningful packet could falsely trip the fast-path watchdog on TCP/USB. + connectionManager.value.onHandshakeProgress() } } diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImpl.kt index 0db2490db..66231231e 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImpl.kt @@ -17,6 +17,8 @@ package org.meshtastic.core.data.manager import co.touchlab.kermit.Logger +import co.touchlab.kermit.Severity +import kotlinx.atomicfu.atomic import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job @@ -32,6 +34,7 @@ import org.koin.core.annotation.Single import org.meshtastic.core.common.util.handledLaunch import org.meshtastic.core.common.util.nowMillis import org.meshtastic.core.common.util.nowSeconds +import org.meshtastic.core.common.util.safeCatchingAll import org.meshtastic.core.model.ConnectionState import org.meshtastic.core.model.DeviceType import org.meshtastic.core.model.TelemetryType @@ -55,6 +58,9 @@ import org.meshtastic.core.repository.RadioInterfaceService import org.meshtastic.core.repository.ServiceRepository import org.meshtastic.core.repository.SessionManager import org.meshtastic.core.repository.UiPrefs +import org.meshtastic.core.resources.Res +import org.meshtastic.core.resources.error_recovery_exhausted +import org.meshtastic.core.resources.getStringSuspend import org.meshtastic.proto.AdminMessage import org.meshtastic.proto.Config import org.meshtastic.proto.Telemetry @@ -96,7 +102,30 @@ class MeshConnectionManagerImpl( private var preHandshakeJob: Job? = null private var sleepTimeout: Job? = null private var locationRequestsJob: Job? = null - private var handshakeTimeout: Job? = null + + private val handshakeTimeout = atomic(null) + + /** + * One-way latch set by [onHandshakeComplete] and cleared at the start of each fresh handshake in [handleConnected]. + * + * Prevents late-arriving handshake-progress packets (e.g. a FileInfo that lands between NODE_INFO_NONCE and the + * completion of the async Room DB install) from re-arming the fast watchdog that [onHandshakeComplete] already + * cancelled. Without this latch, those late packets would re-introduce the slow-DB false-trip on large meshes that + * [onHandshakeComplete] was added to prevent: the app state is still Connecting during that window, so the existing + * Connecting-state guard inside [onHandshakeProgress] is insufficient on its own. + */ + private val handshakeCompleteLatch = atomic(false) + + /** + * Consecutive handshake-recovery failure count for [runSiblingHandshakeRecovery]. + * + * Incremented atomically on each recovery attempt. Reset to 0 by [onHandshakeComplete] (a successful handshake + * breaks the failure streak) and also reset when the cap is reached and a sticky error is surfaced (so a manual + * user retry starts fresh). When this reaches [MAX_CONSECUTIVE_RECOVERY_FAILURES], recovery stops and the sticky + * error is surfaced instead of silently retrying indefinitely. + */ + private val consecutiveRecoveryFailures = atomic(0) + private var connectTimeMsec = 0L private var connectionRestored = false @@ -163,33 +192,40 @@ class MeshConnectionManagerImpl( onConnectionChanged(effectiveState) } - private suspend fun onConnectionChanged(c: ConnectionState) = connectionMutex.withLock { - val current = serviceRepository.connectionState.value - if (current == c) return@withLock + private suspend fun onConnectionChanged(c: ConnectionState, fromState: ConnectionState? = null): Boolean = + connectionMutex.withLock { + val current = serviceRepository.connectionState.value + if (fromState != null && current != fromState) { + Logger.d { "Skipping connection transition $current -> $c, expected current state $fromState" } + return@withLock false + } + if (current == c) return@withLock false - // If the transport reports 'Connected', but we are already in the middle of a handshake (Connecting) - if (c is ConnectionState.Connected && current is ConnectionState.Connecting) { - Logger.d { "Ignoring redundant transport connection signal while handshake is in progress" } - return@withLock + // If the transport reports 'Connected', but we are already in the middle of a handshake (Connecting) + if (c is ConnectionState.Connected && current is ConnectionState.Connecting) { + Logger.d { "Ignoring redundant transport connection signal while handshake is in progress" } + return@withLock false + } + + Logger.i { "onConnectionChanged: $current -> $c" } + + sleepTimeout?.cancel() + sleepTimeout = null + preHandshakeJob?.cancel() + preHandshakeJob = null + // Collapse cancel+clear into one atomic swap so a concurrent re-arm cannot + // orphan a job in the gap between cancel and reassign. + handshakeTimeout.getAndSet(null)?.cancel() + + when (c) { + is ConnectionState.Connecting -> serviceRepository.setConnectionState(ConnectionState.Connecting) + is ConnectionState.Connected -> handleConnected() + is ConnectionState.DeviceSleep -> handleDeviceSleep() + is ConnectionState.Disconnected -> handleDisconnected() + } + true } - Logger.i { "onConnectionChanged: $current -> $c" } - - sleepTimeout?.cancel() - sleepTimeout = null - preHandshakeJob?.cancel() - preHandshakeJob = null - handshakeTimeout?.cancel() - handshakeTimeout = null - - when (c) { - is ConnectionState.Connecting -> serviceRepository.setConnectionState(ConnectionState.Connecting) - is ConnectionState.Connected -> handleConnected() - is ConnectionState.DeviceSleep -> handleDeviceSleep() - is ConnectionState.Disconnected -> handleDisconnected() - } - } - private fun handleConnected() { // Track whether this connection was restored from device sleep (vs. a fresh connect), // matching Apple's "connectionRestored" attribute for cross-platform DataDog parity. @@ -199,7 +235,13 @@ class MeshConnectionManagerImpl( serviceRepository.setConnectionState(ConnectionState.Connecting) } connectTimeMsec = nowMillis - + // A fresh handshake is starting — clear the completion latch so progress signals during + // this handshake can re-arm the fast watchdog. The latch is set by onHandshakeComplete() + // and only matters in the window between Stage 2 completion and the subsequent Connected + // transition; without this reset, a recovery-restarted handshake would have its progress + // signals ignored (latch left set from the prior failed cycle). This covers both initial + // user-initiated connects and transport-restart recovery siblings. + handshakeCompleteLatch.value = false // Send a wake-up heartbeat before the config request. The firmware may be in a // power-saving state where the NimBLE callback context needs warming up. The 100ms // delay ensures the heartbeat BLE write is enqueued before the want_config_id @@ -213,27 +255,156 @@ class MeshConnectionManagerImpl( } } - private fun startHandshakeStallGuard(stage: Int, timeout: Duration, action: () -> Unit) { - handshakeTimeout?.cancel() - handshakeTimeout = - scope.handledLaunch { - delay(timeout) - if (serviceRepository.connectionState.value is ConnectionState.Connecting) { - // Attempt one retry. Note: the firmware silently drops identical consecutive - // writes (per-connection dedup). If the first want_config_id was received and - // the stall is on our side, the retry will be dropped and the reconnect below - // will trigger instead — which is the right recovery in that case. - Logger.w { - "Handshake stall detected at Stage $stage — retrying, then reconnecting if still stalled" + private fun startHandshakeStallGuard(stage: Int, timeout: Duration) { + val fastTransport = isFastRecoveryTransport() + // On TCP/USB the firmware handshake completes in roughly 1s when healthy (logs show), + // while a wedged socket takes the full ~30s transport read timeout without any further + // progress. The aggressive 12s fast timeout recovers a stuck session quickly; BLE keeps + // the original generous budget because its GATT latency is high and variable. + val effectiveTimeout = if (fastTransport) FAST_HANDSHAKE_TIMEOUT else timeout + val transportLabel = if (fastTransport) "fast transport" else "BLE" + // Collapse cancel+reassign into one atomic swap so a concurrent re-arm cannot orphan a + // job in the gap between cancel and reassign. + handshakeTimeout + .getAndSet( + scope.handledLaunch { + delay(effectiveTimeout) + if (serviceRepository.connectionState.value !is ConnectionState.Connecting) { + return@handledLaunch } - action() - delay(HANDSHAKE_RETRY_TIMEOUT) - if (serviceRepository.connectionState.value is ConnectionState.Connecting) { - Logger.e { "Handshake still stalled after retry, forcing reconnect" } - onConnectionChanged(ConnectionState.Disconnected) + // A clean transport restart is the ONLY safe stall recovery on every transport. + // The previous BLE branch re-sent want_config mid-session via action() here; + // that re-send is now deliberately removed because firmware's handleStartConfig() + // has no in-flight guard, so a second want_config on the same session re-enters + // it and crashes the firmware (reproduced on T-Beam v2.7.25.104df5f in QA). The + // firmware per-write dedup that was supposed to drop the retry is single-slot + // (memcmp vs the previous write only), and interleaved heartbeats mean the re-sent + // nonce is not byte-identical to the prior write — so it slips past dedup and + // re-enters handleStartConfig(). A clean transport restart creates a fresh + // session and re-enters the handshake naturally, which is both safer and more + // deterministic than a same-session retry. + Logger.e { + "Handshake stall detected at Stage $stage on $transportLabel — " + + "requesting forced transport restart" } + runSiblingHandshakeRecovery() + }, + ) + ?.cancel() + } + + /** + * Launches the deterministic two-phase stall-recovery sibling used by [startHandshakeStallGuard] on every + * transport. + * + * Phase 1 flips the app-level state from Connecting to Disconnected first, guarded by the connection mutex so a + * just-completed handshake cannot be torn down after winning the race. Phase 2 then calls + * [RadioInterfaceService.restartTransport], whose emissions (DeviceSleep → Connected) now arrive from app-level + * Disconnected, bypass the redundant-Connecting guard in [onConnectionChanged], and re-enter [handleConnected] to + * restart the handshake cleanly. + * + * We MUST NOT call [onConnectionChanged] from the [handshakeTimeout] coroutine after launching the sibling: + * [onConnectionChanged] cancels handshakeTimeout (the very job running this code), and any work chained after the + * launch is not guaranteed to run. We MUST ALSO NOT leave the explicit Disconnected call in this coroutine after + * the sibling launch — otherwise the sibling's restart emissions (DeviceSleep, then Connected) can arrive while the + * app-level state is still Connecting, causing [onConnectionChanged]'s redundant-Connected-while-Connecting guard + * to ignore the fresh Connected emission. That leaves the app Disconnected while transport is Connected — the same + * split-brain this restart path is meant to break. + * + * By the time the sibling runs, handshakeTimeout has already completed naturally (it launched the sibling and + * returned), so the cancellation [onConnectionChanged] would attempt is a no-op on an already-completed job — and + * because the sibling is parented to `scope`, not to handshakeTimeout, it survives independently. + * + * Concurrent-recovery protection is layered across three independent mechanisms, each sufficient on its own: + * 1. The [connectionMutex] serializes the [onConnectionChanged]`(Disconnected, fromState=Connecting)` call inside + * the sibling. If a concurrent sibling already transitioned the app state to Disconnected, a second caller's + * `fromState=Connecting` precondition fails and [onConnectionChanged] returns `false`, so the `if (disconnected + * && ...)` gate skips `restartTransport()`. + * 2. The transport-level `isRestarting` CAS inside `SharedRadioInterfaceService.restartTransport()` provides + * authoritative dedup at the layer that actually tears down and re-creates the transport, independent of the + * app-level state machine above it. + * 3. The atomic [consecutiveRecoveryFailures]`getAndIncrement()` ensures each concurrent caller observes a unique + * `priorFailures` slot, so the give-up decision (and the backoff duration) is race-free even if two siblings + * race the same stall window. + * + * Backoff and give-up cap: each call atomically increments [consecutiveRecoveryFailures]. Prior failures trigger an + * exponential [delay] (2 s base, doubling each failure, capped at 30 s) before the transport restart so a node that + * keeps crashing under handshake re-entry is not re-driven in a tight loop. After + * [MAX_CONSECUTIVE_RECOVERY_FAILURES] consecutive failed recoveries, recovery stops and a sticky user-facing error + * is surfaced (Disconnected + error message) instead of silently retrying indefinitely; the user must manually + * re-select the node to retry, which resets the streak. The streak is also reset to 0 by [onHandshakeComplete] (a + * successful handshake). + */ + private fun runSiblingHandshakeRecovery() { + // Atomically claim a failure slot. getAndIncrement guarantees each concurrent caller + // observes a unique priorFailures value, so the give-up decision is race-free. + val priorFailures = consecutiveRecoveryFailures.getAndIncrement() + scope.handledLaunch { + // After MAX_CONSECUTIVE_RECOVERY_FAILURES consecutive failed recoveries, stop retrying + // and surface a sticky error. A node that keeps failing the handshake is likely + // crashing firmware under re-entry (reproduced on T-Beam v2.7.25.104df5f), and + // re-driving it indefinitely only makes things worse. Reset the counter here so a + // manual retry from the user starts a fresh streak. + if (priorFailures >= MAX_CONSECUTIVE_RECOVERY_FAILURES) { + Logger.e { + "Handshake recovery exhausted after $MAX_CONSECUTIVE_RECOVERY_FAILURES consecutive " + + "failures; surfacing sticky error" } + consecutiveRecoveryFailures.value = 0 + serviceRepository.setConnectionProgress("") + onConnectionChanged(ConnectionState.Disconnected, fromState = ConnectionState.Connecting) + // safeCatchingAll swallows Skiko ExceptionInInitializerError on headless JVM tests + // where compose-resources can't load native libs. Production resolves the localized + // string normally; tests fall back to empty and setErrorMessage is still called. + val errorMessage = + safeCatchingAll { getStringSuspend(Res.string.error_recovery_exhausted) }.getOrDefault("") + serviceRepository.setErrorMessage(errorMessage, Severity.Error) + return@handledLaunch } + // Exponential backoff before the next attempt: 2 s base, doubling for each prior + // failure, hard-capped at 30 s. Skipped on the first attempt (no prior failures). + // The delay is parented to `scope`, so it is cancelled cleanly if the user + // disconnects, navigates away, or the service shuts down (structured concurrency). + if (priorFailures > 0) { + val backoffSeconds = + (RECOVERY_BACKOFF_BASE_SECONDS shl (priorFailures - 1)).coerceAtMost(RECOVERY_BACKOFF_CAP_SECONDS) + Logger.w { + "Handshake recovery backoff: waiting ${backoffSeconds}s before retry (attempt ${priorFailures + 1})" + } + delay(backoffSeconds.seconds) + } + // Re-check state after backoff: the user may have disconnected during the delay. + if (serviceRepository.connectionState.value !is ConnectionState.Connecting) return@handledLaunch + // Surface the forced-recovery progress to the UI before the app-level Disconnected + // transition lands, so the user sees "Reconnecting…" rather than a stale + // "Loading node list" while the transport is being torn down and re-established. + // + // This progress is intentionally NOT cleared on the recovery's Disconnected window + // (i.e. NOT in handleDisconnected or this sibling). If recovery fails permanently, + // "Reconnecting…" may persist on the Disconnected screen until the user retries or + // navigates away. That leak is acceptable UX: the transport restart has been + // requested and may still be in flight (restartTransport is one-shot — if the fresh + // transport also fails, nothing here retries automatically; transport-level + // network-recovery listeners may independently re-bring-up the transport, and the + // user can manually retry). Clearing it here would race the deliberate UX signal: + // handleDisconnected runs synchronously after this call inside the same + // onConnectionChanged transition, so any clear there would clobber the signal before + // restartTransport runs. Stale progress is instead cleared at the first downstream + // setConnectionProgress call during the recovery handshake (e.g. "Device config + // received" or "Loading node list"), not by handleConnected itself — and at the + // ViewModel level the Connecting state already dominates progress (the CONNECTING + // status ignores the progress string), so the UI is correct regardless. + serviceRepository.setConnectionProgress(ServiceRepository.RECONNECTING_PROGRESS_TEXT) + val disconnected = onConnectionChanged(ConnectionState.Disconnected, fromState = ConnectionState.Connecting) + if (disconnected && serviceRepository.connectionState.value is ConnectionState.Disconnected) { + radioInterfaceService.restartTransport() + } + } + } + + override fun recoverPostHandshakeFailure() { + Logger.w { "Recovering from post-handshake failure by restarting transport" } + runSiblingHandshakeRecovery() } private fun tearDownConnection() { @@ -288,15 +459,13 @@ class MeshConnectionManagerImpl( } override fun startConfigOnly() { - val action = { packetHandler.sendToRadio(ToRadio(want_config_id = HandshakeConstants.CONFIG_NONCE)) } - startHandshakeStallGuard(1, HANDSHAKE_TIMEOUT_STAGE1, action) - action() + startHandshakeStallGuard(1, HANDSHAKE_TIMEOUT_STAGE1) + packetHandler.sendToRadio(ToRadio(want_config_id = HandshakeConstants.CONFIG_NONCE)) } override fun startNodeInfoOnly() { - val action = { packetHandler.sendToRadio(ToRadio(want_config_id = HandshakeConstants.NODE_INFO_NONCE)) } - startHandshakeStallGuard(2, HANDSHAKE_TIMEOUT_STAGE2, action) - action() + startHandshakeStallGuard(2, HANDSHAKE_TIMEOUT_STAGE2) + packetHandler.sendToRadio(ToRadio(want_config_id = HandshakeConstants.NODE_INFO_NONCE)) } override fun onRadioConfigLoaded() { @@ -313,11 +482,11 @@ class MeshConnectionManagerImpl( } override suspend fun onNodeDbReady() { - handshakeTimeout?.cancel() - handshakeTimeout = null + // Collapse cancel+clear into one atomic swap so a concurrent re-arm cannot + // orphan a job in the gap between cancel and reassign. + handshakeTimeout.getAndSet(null)?.cancel() val myNodeNum = nodeManager.myNodeNum.value ?: 0 - // Set device time now that the full node picture is ready. Sending this during Stage 1 // (onRadioConfigLoaded) introduced GATT write contention with the Stage 2 node-info burst. commandSender.sendAdmin(myNodeNum) { AdminMessage(set_time_only = nowSeconds.toInt()) } @@ -352,6 +521,30 @@ class MeshConnectionManagerImpl( commandSender.requestTelemetry(commandSender.generatePacketId(), myNodeNum, TelemetryType.DEVICE.ordinal) } + /** + * Synchronously cancels the transport-aware handshake watchdog the moment Stage 2 completes (NODE_INFO_NONCE + * received). Does NOT replicate [onNodeDbReady]'s post-NodeDB side effects (analytics, MQTT start, history replay, + * telemetry requests) — those remain gated on [onNodeDbReady] at the end of the async DB install block. + * + * See [MeshConnectionManager.onHandshakeComplete] for the full rationale. + */ + override fun onHandshakeComplete() { + // Collapse cancel+clear into one atomic swap so a concurrent re-arm cannot orphan a + // job in the gap between cancel and reassign. + handshakeTimeout.getAndSet(null)?.cancel() + // Set the completion latch so late-arriving progress packets (e.g. FileInfo that lands + // between NODE_INFO_NONCE and the async Room DB install completion) cannot re-arm the + // fast watchdog we just cancelled. Without this latch, those late packets would + // re-introduce the slow-DB false-trip on large meshes that this method was added to + // prevent — the app state is still Connecting during that window, so the Connecting- + // state guard inside onHandshakeProgress() is insufficient on its own. The latch is + // cleared at the start of the next fresh handshake in handleConnected(). + handshakeCompleteLatch.value = true + // A successful handshake breaks the recovery failure streak — reset the consecutive + // failure counter so the next stall starts from a fresh count. + consecutiveRecoveryFailures.value = 0 + } + private fun reportConnection() { val myNode = nodeManager.getMyNodeInfo() val radioModel = DataPair(KEY_RADIO_MODEL, myNode?.model ?: "unknown") @@ -378,6 +571,45 @@ class MeshConnectionManagerImpl( updateStatusNotification(t) } + /** + * True when the active transport is a TCP or USB serial connection — i.e. a transport whose firmware handshake + * reliably completes in roughly 1s when healthy and therefore benefits from aggressive silent-restart on stall. + * Uses the same [DeviceType.fromAddress] pattern as [reportConnection] for transport classification. BLE is + * excluded because its GATT latency budget is high and variable enough that the long-and-retry stall-guard budgets + * remain the right trade-off. + */ + private fun isFastRecoveryTransport(): Boolean = + radioInterfaceService.getDeviceAddress()?.let { DeviceType.fromAddress(it) } in FAST_RECOVERY_TYPES + + override fun onHandshakeProgress() { + // Arm only inside the fast-recovery envelope, while Connecting, before the completion latch + // has fired. BLE retains the long stall-guard budget because its GATT latency is variable. + val shouldArmFastWatchdog = + isFastRecoveryTransport() && + serviceRepository.connectionState.value is ConnectionState.Connecting && + !handshakeCompleteLatch.value + if (!shouldArmFastWatchdog) return + // Atomic swap: cancel any in-flight fast watchdog and re-arm it with the full fast + // timeout in a single operation. This keeps the watchdog quiet as long as meaningful + // progress keeps arriving within the window, while a true stall still fires on + // schedule. getAndSet prevents a concurrent re-arm from orphaning a job in the gap + // between cancel and reassign. + handshakeTimeout + .getAndSet( + scope.handledLaunch { + delay(FAST_HANDSHAKE_TIMEOUT) + if (serviceRepository.connectionState.value !is ConnectionState.Connecting) { + return@handledLaunch + } + Logger.e { + "Fast-handshake watchdog expired after progress stalled — requesting forced transport restart" + } + runSiblingHandshakeRecovery() + }, + ) + ?.cancel() + } + override fun updateStatusNotification(telemetry: Telemetry?) { serviceNotifications.updateServiceStateNotification( serviceRepository.connectionState.value, @@ -386,6 +618,10 @@ class MeshConnectionManagerImpl( } companion object { + // Hoisted constant — used on every meaningful handshake packet via + // isFastRecoveryTransport(); avoids allocating a fresh Set per packet. + private val FAST_RECOVERY_TYPES = setOf(DeviceType.TCP, DeviceType.USB) + private const val DEVICE_SLEEP_TIMEOUT_SECONDS = 30 // Maximum time (in seconds) to wait for a sleeping device before declaring it @@ -411,10 +647,54 @@ class MeshConnectionManagerImpl( */ private val HANDSHAKE_TIMEOUT_STAGE2 = 60.seconds - // Shorter window for the retry attempt: if the device genuinely didn't receive the - // first want_config_id the retry completes within a few seconds. Waiting another 30s - // before reconnecting just delays recovery unnecessarily. - private val HANDSHAKE_RETRY_TIMEOUT = 15.seconds + /** + * Transport-aware fast-recovery timeout for the handshake stall guard, applied only to TCP and USB serial + * transports. + * + * Production logs on TCP/USB show a healthy firmware handshake completes in roughly 1 second, while a wedged + * socket sits idle for the full transport-level read timeout (~30s) without any further progress. 12s sits + * comfortably above the healthy success envelope and well below the transport read timeout, so firing a silent + * [RadioInterfaceService.restartTransport] at 12s recovers a stuck TCP/USB session quickly without + * false-positiving on healthy connections. + * + * BLE is intentionally excluded — its GATT latency budget is variable enough that the existing + * [HANDSHAKE_TIMEOUT_STAGE1] (30s) and [HANDSHAKE_TIMEOUT_STAGE2] (60s) budgets remain the right trade-off. + * Both transports now recover exclusively via [runSiblingHandshakeRecovery] (transport restart); the previous + * BLE mid-session want_config retry has been removed (see [startHandshakeStallGuard] for the firmware crash + * rationale). + */ + private val FAST_HANDSHAKE_TIMEOUT = 12.seconds + + /** + * Maximum consecutive handshake-recovery attempts before surfacing a sticky error to the user. + * + * Each call to [runSiblingHandshakeRecovery] counts as one attempt. After this many consecutive attempts fail + * to lead to a successful handshake, recovery stops and a sticky Disconnected + error state is surfaced, + * requiring the user to manually re-select the node to retry. This prevents indefinite re-driving of a node + * whose firmware is crashing under handshake re-entry (reproduced on T-Beam v2.7.25.104df5f). + */ + private const val MAX_CONSECUTIVE_RECOVERY_FAILURES = 3 + + /** + * Base delay (seconds) for the exponential backoff applied between consecutive recovery attempts in + * [runSiblingHandshakeRecovery]. Doubled for each prior failure and hard-capped at + * [RECOVERY_BACKOFF_CAP_SECONDS]. + * + * 2 s base gives the firmware and transport a brief breather after a failed restart without adding perceptible + * latency to the first retry; doubling bounds the worst-case loop tightly under the 3-strike cap. + */ + private const val RECOVERY_BACKOFF_BASE_SECONDS = 2L + + /** + * Hard cap (seconds) for the exponential backoff between recovery attempts. + * + * Keeps the per-attempt delay bounded even if [MAX_CONSECUTIVE_RECOVERY_FAILURES] is raised in the future. 30 s + * matches the transport-level read timeout envelope, so a backed-off retry never waits longer than the + * underlying transport would have taken to fail on its own. + */ + // Cap unreachable while MAX_CONSECUTIVE_RECOVERY_FAILURES=3 (max computed delay is 4s); + // retained as defense-in-depth if MAX is raised in the future. + private const val RECOVERY_BACKOFF_CAP_SECONDS = 30L private const val EVENT_CONNECTED_SECONDS = "connected_seconds" private const val EVENT_MESH_DISCONNECT = "mesh_disconnect" diff --git a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImplTest.kt b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImplTest.kt index 5ce2bc20a..66a2f1f00 100644 --- a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImplTest.kt +++ b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImplTest.kt @@ -20,6 +20,7 @@ import dev.mokkery.MockMode import dev.mokkery.answering.calls import dev.mokkery.answering.returns import dev.mokkery.every +import dev.mokkery.everySuspend import dev.mokkery.matcher.any import dev.mokkery.mock import dev.mokkery.verify @@ -29,9 +30,12 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.test.StandardTestDispatcher import kotlinx.coroutines.test.TestScope +import kotlinx.coroutines.test.advanceTimeBy import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runCurrent import kotlinx.coroutines.test.runTest import okio.ByteString.Companion.encodeUtf8 +import org.meshtastic.core.model.ConnectionState import org.meshtastic.core.repository.CommandSender import org.meshtastic.core.repository.HandshakeConstants import org.meshtastic.core.repository.MeshConnectionManager @@ -55,6 +59,13 @@ import org.meshtastic.proto.MyNodeInfo as ProtoMyNodeInfo @OptIn(ExperimentalCoroutinesApi::class) class MeshConfigFlowManagerImplTest { + private companion object { + // Production issues two sequential delay(wantConfigDelay) calls (100ms each = 200ms) + // plus scheduler slack for the inter-stage heartbeat and startNodeInfoOnly(); bump in + // lockstep with wantConfigDelay if it changes. + const val STAGE_TRANSITION_ADVANCE_MS = 250L + } + private val nodeManager = mock(MockMode.autofill) private val connectionManager = mock(MockMode.autofill) private val nodeRepository = mock(MockMode.autofill) @@ -173,7 +184,8 @@ class MeshConfigFlowManagerImplTest { advanceUntilIdle() manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) - advanceUntilIdle() + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() verify { connectionManager.onRadioConfigLoaded() } verify { connectionManager.startNodeInfoOnly() } @@ -194,7 +206,8 @@ class MeshConfigFlowManagerImplTest { sentPackets.clear() // Clear any packets from prior phases manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) - advanceUntilIdle() + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() val heartbeats = sentPackets.filter { it.heartbeat != null } assertEquals(1, heartbeats.size, "Expected exactly one inter-stage heartbeat") @@ -215,7 +228,8 @@ class MeshConfigFlowManagerImplTest { advanceUntilIdle() manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) - advanceUntilIdle() + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() // Handshake should still progress despite old firmware verify { connectionManager.onRadioConfigLoaded() } @@ -234,6 +248,20 @@ class MeshConfigFlowManagerImplTest { verify { connectionManager.onRadioConfigLoaded() } } + @Test + fun `Stage 1 complete updates progress for node list loading`() = testScope.runTest { + manager.handleMyInfo(protoMyNodeInfo) + advanceUntilIdle() + manager.handleLocalMetadata(metadata) + advanceUntilIdle() + + manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() + + verify { serviceRepository.setConnectionProgress("Loading node list") } + } + @Test fun `Stage 1 complete id ignored when not in ReceivingConfig state`() = testScope.runTest { // State is Idle — should be a no-op @@ -250,11 +278,13 @@ class MeshConfigFlowManagerImplTest { advanceUntilIdle() manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) - advanceUntilIdle() + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() // Now in ReceivingNodeInfo — a second Stage 1 complete should be ignored manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) - advanceUntilIdle() + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() } // ---------- handleNodeInfo ---------- @@ -267,7 +297,8 @@ class MeshConfigFlowManagerImplTest { manager.handleLocalMetadata(metadata) advanceUntilIdle() manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) - advanceUntilIdle() + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() // Now in ReceivingNodeInfo manager.handleNodeInfo(NodeInfo(num = 100)) @@ -276,6 +307,29 @@ class MeshConfigFlowManagerImplTest { assertEquals(2, manager.newNodeCount) } + @Test + fun `handleNodeInfo buffers nodes received during Stage 1`() = testScope.runTest { + val testNode = org.meshtastic.core.testing.TestDataFactory.createTestNode(num = 100) + every { nodeManager.nodeDBbyNodeNum } returns mapOf(100 to testNode) + + manager.handleMyInfo(protoMyNodeInfo) + advanceUntilIdle() + + manager.handleNodeInfo(NodeInfo(num = 100)) + assertEquals(1, manager.newNodeCount) + + manager.handleLocalMetadata(metadata) + advanceUntilIdle() + manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() + manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE) + advanceUntilIdle() + + verify { nodeManager.installNodeInfo(NodeInfo(num = 100)) } + verifySuspend { connectionManager.onNodeDbReady() } + } + @Test fun `handleNodeInfo ignored outside Stage 2`() = testScope.runTest { // State is Idle @@ -297,7 +351,8 @@ class MeshConfigFlowManagerImplTest { manager.handleLocalMetadata(metadata) advanceUntilIdle() manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) - advanceUntilIdle() + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() manager.handleNodeInfo(NodeInfo(num = 100)) manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE) @@ -323,7 +378,8 @@ class MeshConfigFlowManagerImplTest { manager.handleLocalMetadata(metadata) advanceUntilIdle() manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) - advanceUntilIdle() + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() // No handleNodeInfo calls — empty node list manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE) @@ -333,6 +389,54 @@ class MeshConfigFlowManagerImplTest { verifySuspend { connectionManager.onNodeDbReady() } } + @Test + fun `Stage 2 complete keeps connected when post NodeDB side effects fail`() = testScope.runTest { + every { analytics.setDeviceAttributes(any(), any()) } calls { throw IllegalStateException("analytics") } + everySuspend { connectionManager.onNodeDbReady() } calls { throw IllegalStateException("side effects") } + + manager.handleMyInfo(protoMyNodeInfo) + advanceUntilIdle() + manager.handleLocalMetadata(metadata) + advanceUntilIdle() + manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() + + manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE) + advanceUntilIdle() + + verify { nodeManager.setNodeDbReady(true) } + verify { nodeManager.setAllowNodeDbWrites(true) } + verify { serviceRepository.setConnectionState(ConnectionState.Connected) } + verifySuspend { connectionManager.onNodeDbReady() } + verify(mode = VerifyMode.not) { nodeManager.setNodeDbReady(false) } + verify(mode = VerifyMode.not) { nodeManager.setAllowNodeDbWrites(false) } + verify(mode = VerifyMode.not) { connectionManager.recoverPostHandshakeFailure() } + } + + @Test + fun `Stage 2 complete disconnects when NodeDB install fails`() = testScope.runTest { + everySuspend { nodeRepository.installConfig(any(), any()) } calls { throw IllegalStateException("room") } + + manager.handleMyInfo(protoMyNodeInfo) + advanceUntilIdle() + manager.handleLocalMetadata(metadata) + advanceUntilIdle() + manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() + + manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE) + advanceUntilIdle() + + verify { connectionManager.onHandshakeComplete() } + verify { nodeManager.setNodeDbReady(false) } + verify { nodeManager.setAllowNodeDbWrites(false) } + verify { connectionManager.recoverPostHandshakeFailure() } + verify(mode = VerifyMode.not) { nodeManager.setNodeDbReady(true) } + verifySuspend(mode = VerifyMode.not) { connectionManager.onNodeDbReady() } + } + // ---------- Unknown config_complete_id ---------- @Test @@ -386,7 +490,8 @@ class MeshConfigFlowManagerImplTest { // Stage 1 complete manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) - advanceUntilIdle() + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() verify { connectionManager.onRadioConfigLoaded() } // Receive NodeInfo during Stage 2 @@ -468,4 +573,202 @@ class MeshConfigFlowManagerImplTest { verify(mode = VerifyMode.not) { notificationPrefs.setNodeEventsEnabled(any()) } verify(mode = VerifyMode.not) { notificationPrefs.setNodeEventsAutoDisabledForEvent(any()) } } + + // ---------- onHandshakeProgress ---------- + + @Test + fun `handleMyInfo calls onHandshakeProgress`() = testScope.runTest { + manager.handleMyInfo(protoMyNodeInfo) + advanceUntilIdle() + + verify { connectionManager.onHandshakeProgress() } + } + + @Test + fun `handleLocalMetadata in ReceivingConfig calls onHandshakeProgress`() = testScope.runTest { + manager.handleMyInfo(protoMyNodeInfo) + advanceUntilIdle() + manager.handleLocalMetadata(metadata) + advanceUntilIdle() + + // handleMyInfo fires one call; handleLocalMetadata adds exactly one more in ReceivingConfig. + verify(mode = VerifyMode.exactly(2)) { connectionManager.onHandshakeProgress() } + } + + @Test + fun `handleLocalMetadata outside ReceivingConfig does not call onHandshakeProgress`() = testScope.runTest { + // State is Idle — metadata is ignored and must not reset the watchdog. + manager.handleLocalMetadata(metadata) + advanceUntilIdle() + + verify(mode = VerifyMode.not) { connectionManager.onHandshakeProgress() } + } + + @Test + fun `handleFileInfo calls onHandshakeProgress`() = testScope.runTest { + val fileInfo = FileInfo(file_name = "firmware.bin", size_bytes = 1024) + manager.handleFileInfo(fileInfo) + advanceUntilIdle() + + verify { connectionManager.onHandshakeProgress() } + } + + @Test + fun `handleNodeInfo during Stage 1 calls onHandshakeProgress`() = testScope.runTest { + manager.handleMyInfo(protoMyNodeInfo) + advanceUntilIdle() + manager.handleNodeInfo(NodeInfo(num = 100)) + + // handleMyInfo fires one call; handleNodeInfo in ReceivingConfig adds exactly one more. + verify(mode = VerifyMode.exactly(2)) { connectionManager.onHandshakeProgress() } + } + + @Test + fun `handleNodeInfo during Stage 2 calls onHandshakeProgress`() = testScope.runTest { + manager.handleMyInfo(protoMyNodeInfo) + advanceUntilIdle() + manager.handleLocalMetadata(metadata) + advanceUntilIdle() + manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() + + // Now in ReceivingNodeInfo — a NodeInfo packet must reset the watchdog. + manager.handleNodeInfo(NodeInfo(num = 100)) + + // Prior calls: handleMyInfo(1) + handleLocalMetadata(1) + handleConfigOnlyComplete(1) = 3. + // handleNodeInfo adds exactly one more. + verify(mode = VerifyMode.exactly(4)) { connectionManager.onHandshakeProgress() } + } + + @Test + fun `Stage 1 complete calls onHandshakeProgress`() = testScope.runTest { + manager.handleMyInfo(protoMyNodeInfo) + advanceUntilIdle() + manager.handleLocalMetadata(metadata) + advanceUntilIdle() + manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() + + // handleMyInfo(1) + handleLocalMetadata(1) + handleConfigOnlyComplete(1) = 3. + verify(mode = VerifyMode.exactly(3)) { connectionManager.onHandshakeProgress() } + } + + @Test + fun `handleNodeInfo outside active handshake does not call onHandshakeProgress`() = testScope.runTest { + // State is Idle — NodeInfo is ignored and must not reset the watchdog. + manager.handleNodeInfo(NodeInfo(num = 999)) + + verify(mode = VerifyMode.not) { connectionManager.onHandshakeProgress() } + } + + /** + * Regression guard: a full handshake must fire [MeshConnectionManager.onHandshakeProgress] exactly for meaningful + * handshake events only. Packet types not meaningful to the handshake — queueStatus, mqttClientProxyMessage, + * xmodemPacket, clientNotification, deviceuiConfig, and rebooted — are routed to sibling handlers + * (PacketHandlerImpl, MqttManagerImpl, FromRadioPacketHandlerImpl, MeshConfigHandlerImpl) and must never reset the + * watchdog from this manager. This test pins the exact count so any accidental wiring surfaces as a failure. + */ + @Test + fun `full handshake fires onHandshakeProgress exactly for meaningful events only`() = testScope.runTest { + val testNode = org.meshtastic.core.testing.TestDataFactory.createTestNode(num = 100) + every { nodeManager.nodeDBbyNodeNum } returns mapOf(100 to testNode) + + manager.handleMyInfo(protoMyNodeInfo) + advanceUntilIdle() + manager.handleLocalMetadata(metadata) + advanceUntilIdle() + manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() + manager.handleNodeInfo(NodeInfo(num = 100)) + manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE) + advanceUntilIdle() + + // handleMyInfo(1) + handleLocalMetadata(1) + handleConfigOnlyComplete(1) + // + handleNodeInfo(1) = 4. handleNodeInfoComplete intentionally does NOT call + // onHandshakeProgress: by that point the handshake is Complete and the synchronous + // onHandshakeComplete() call (verified in a separate test) cancels the watchdog. + verify(mode = VerifyMode.exactly(4)) { connectionManager.onHandshakeProgress() } + } + + /** + * Regression guard for the Stage 2 watchdog-cancellation race fixed by adding + * [MeshConnectionManager.onHandshakeComplete]. Stage 2 completion must synchronously fire the terminal callback + * exactly once so the transport-aware fast-recovery watchdog is cancelled before any async DB install work begins. + */ + @Test + fun `Stage 2 complete fires onHandshakeComplete exactly once`() = testScope.runTest { + val testNode = org.meshtastic.core.testing.TestDataFactory.createTestNode(num = 100) + every { nodeManager.nodeDBbyNodeNum } returns mapOf(100 to testNode) + + manager.handleMyInfo(protoMyNodeInfo) + advanceUntilIdle() + manager.handleLocalMetadata(metadata) + advanceUntilIdle() + manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() + manager.handleNodeInfo(NodeInfo(num = 100)) + manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE) + advanceUntilIdle() + + verify(mode = VerifyMode.exactly(1)) { connectionManager.onHandshakeComplete() } + } + + /** + * Regression guard for the actual Stage 2 watchdog-cancellation race: the synchronous + * [MeshConnectionManager.onHandshakeComplete] call MUST land before the asynchronous NodeDB install work begins, so + * that a slow Room commit on a large mesh cannot trip the 12 s fast-recovery timeout after the firmware handshake + * has already succeeded. + * + * Under [StandardTestDispatcher] the async DB install coroutine does not run until the test dispatcher is advanced, + * so we can assert the call ordering deterministically without any suspension trick on + * [NodeRepository.installConfig]. + */ + @Test + fun `Stage 2 complete cancels watchdog synchronously before async DB install work`() = testScope.runTest { + val testNode = org.meshtastic.core.testing.TestDataFactory.createTestNode(num = 100) + every { nodeManager.nodeDBbyNodeNum } returns mapOf(100 to testNode) + + // Record the order in which the synchronous watchdog-cancellation callback and the + // async DB install entry point are observed. The order is the invariant under test. + val callOrder = mutableListOf() + every { connectionManager.onHandshakeComplete() } calls { callOrder.add("handshakeComplete") } + everySuspend { nodeRepository.installConfig(any(), any()) } calls { callOrder.add("installConfig") } + + manager.handleMyInfo(protoMyNodeInfo) + advanceUntilIdle() + manager.handleLocalMetadata(metadata) + advanceUntilIdle() + manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) + advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS) + runCurrent() + manager.handleNodeInfo(NodeInfo(num = 100)) + + // Drive Stage 2 complete. handleNodeInfoComplete runs synchronously: state becomes + // Complete, onHandshakeComplete() fires (cancelling the watchdog), then the async DB + // install block is launched but not yet executed under StandardTestDispatcher. + manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE) + + // Synchronous post-condition: the watchdog was cancelled BEFORE the async DB install + // block was scheduled to run. If this ordering invariant ever regresses, a slow Room + // commit on a large mesh would falsely trip the 12s fast-recovery timeout. + assertEquals( + listOf("handshakeComplete"), + callOrder, + "onHandshakeComplete() must fire synchronously at Stage 2 complete, BEFORE any " + + "async DB install work begins — this is the race this test guards against", + ) + + // Now let the async DB install block run. + advanceUntilIdle() + assertEquals( + listOf("handshakeComplete", "installConfig"), + callOrder, + "installConfig must run AFTER onHandshakeComplete, ensuring the watchdog is " + + "already cancelled before any DB work could trip the fast-recovery timeout", + ) + } } diff --git a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConfigHandlerImplTest.kt b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConfigHandlerImplTest.kt index 83fd4b70a..c11da84cc 100644 --- a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConfigHandlerImplTest.kt +++ b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConfigHandlerImplTest.kt @@ -30,6 +30,7 @@ import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlinx.coroutines.test.advanceUntilIdle import kotlinx.coroutines.test.runTest import org.meshtastic.core.model.MyNodeInfo +import org.meshtastic.core.repository.MeshConnectionManager import org.meshtastic.core.repository.NodeManager import org.meshtastic.core.repository.RadioConfigRepository import org.meshtastic.core.repository.ServiceRepository @@ -49,6 +50,7 @@ class MeshConfigHandlerImplTest { private val radioConfigRepository = mock(MockMode.autofill) private val serviceRepository = mock(MockMode.autofill) private val nodeManager = mock(MockMode.autofill) + private val connectionManager = mock(MockMode.autofill) private val localConfigFlow = MutableStateFlow(LocalConfig()) private val moduleConfigFlow = MutableStateFlow(LocalModuleConfig()) @@ -67,6 +69,7 @@ class MeshConfigHandlerImplTest { radioConfigRepository = radioConfigRepository, serviceStateWriter = serviceRepository, nodeManager = nodeManager, + connectionManager = lazy { this.connectionManager }, scope = scope, ) @@ -228,4 +231,47 @@ class MeshConfigHandlerImplTest { verifySuspend { radioConfigRepository.setDeviceUIConfig(config) } } + + // ---------- onHandshakeProgress ---------- + + @Test + fun `handleDeviceConfig calls onHandshakeProgress`() = runTest(testDispatcher) { + handler = createHandler(backgroundScope) + val config = Config(device = Config.DeviceConfig(role = Config.DeviceConfig.Role.CLIENT)) + handler.handleDeviceConfig(config) + advanceUntilIdle() + + verify { connectionManager.onHandshakeProgress() } + } + + @Test + fun `handleModuleConfig calls onHandshakeProgress`() = runTest(testDispatcher) { + handler = createHandler(backgroundScope) + val config = ModuleConfig(mqtt = ModuleConfig.MQTTConfig(enabled = true)) + handler.handleModuleConfig(config) + advanceUntilIdle() + + verify { connectionManager.onHandshakeProgress() } + } + + @Test + fun `handleChannel calls onHandshakeProgress`() = runTest(testDispatcher) { + handler = createHandler(backgroundScope) + every { nodeManager.getMyNodeInfo() } returns null + + val channel = Channel(index = 0) + handler.handleChannel(channel) + advanceUntilIdle() + + verify { connectionManager.onHandshakeProgress() } + } + + @Test + fun `handleDeviceUIConfig calls onHandshakeProgress`() = runTest(testDispatcher) { + handler = createHandler(backgroundScope) + handler.handleDeviceUIConfig(DeviceUIConfig()) + advanceUntilIdle() + + verify { connectionManager.onHandshakeProgress() } + } } diff --git a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImplTest.kt b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImplTest.kt index 541c4cead..93e53a023 100644 --- a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImplTest.kt +++ b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImplTest.kt @@ -16,6 +16,7 @@ */ package org.meshtastic.core.data.manager +import co.touchlab.kermit.Severity import dev.mokkery.MockMode import dev.mokkery.answering.calls import dev.mokkery.answering.returns @@ -24,14 +25,17 @@ import dev.mokkery.everySuspend import dev.mokkery.matcher.any import dev.mokkery.mock import dev.mokkery.verify +import dev.mokkery.verify.VerifyMode.Companion.exactly import dev.mokkery.verifySuspend import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.test.StandardTestDispatcher +import kotlinx.coroutines.test.TestDispatcher import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlinx.coroutines.test.advanceTimeBy import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runCurrent import kotlinx.coroutines.test.runTest import org.meshtastic.core.model.ConnectionState import org.meshtastic.core.model.DataPacket @@ -61,27 +65,28 @@ import kotlin.test.AfterTest import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertTrue @OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class) class MeshConnectionManagerImplTest { - private val radioInterfaceService = mock(MockMode.autofill) - private val serviceRepository = mock(MockMode.autofill) + private lateinit var radioInterfaceService: RadioInterfaceService + private lateinit var serviceRepository: ServiceRepository - private val serviceNotifications = mock(MockMode.autofill) - private val uiPrefs = mock(MockMode.autofill) - private val packetHandler = mock(MockMode.autofill) - private val nodeRepository = FakeNodeRepository() - private val locationManager = mock(MockMode.autofill) - private val mqttManager = mock(MockMode.autofill) - private val historyManager = mock(MockMode.autofill) - private val radioConfigRepository = mock(MockMode.autofill) - private val commandSender = mock(MockMode.autofill) - private val sessionManager = mock(MockMode.autofill) - private val nodeManager = mock(MockMode.autofill) - private val analytics = mock(MockMode.autofill) - private val packetRepository = mock(MockMode.autofill) - private val workerManager = mock(MockMode.autofill) - private val appWidgetUpdater = mock(MockMode.autofill) + private lateinit var serviceNotifications: MeshNotificationManager + private lateinit var uiPrefs: UiPrefs + private lateinit var packetHandler: PacketHandler + private lateinit var nodeRepository: FakeNodeRepository + private lateinit var locationManager: MeshLocationManager + private lateinit var mqttManager: MqttManager + private lateinit var historyManager: HistoryManager + private lateinit var radioConfigRepository: RadioConfigRepository + private lateinit var commandSender: CommandSender + private lateinit var sessionManager: SessionManager + private lateinit var nodeManager: NodeManager + private lateinit var analytics: PlatformAnalytics + private lateinit var packetRepository: PacketRepository + private lateinit var workerManager: MeshWorkerManager + private lateinit var appWidgetUpdater: AppWidgetUpdater private val dataPacket = DataPacket(id = 456, time = 0L, to = "0", from = "0", bytes = null, dataType = 0) @@ -90,12 +95,36 @@ class MeshConnectionManagerImplTest { private val localConfigFlow = MutableStateFlow(LocalConfig()) private val moduleConfigFlow = MutableStateFlow(LocalModuleConfig()) - private val testDispatcher = UnconfinedTestDispatcher() + private lateinit var testDispatcher: TestDispatcher private lateinit var manager: MeshConnectionManagerImpl @BeforeTest fun setUp() { + radioInterfaceService = mock(MockMode.autofill) + serviceRepository = mock(MockMode.autofill) + serviceNotifications = mock(MockMode.autofill) + uiPrefs = mock(MockMode.autofill) + packetHandler = mock(MockMode.autofill) + nodeRepository = FakeNodeRepository() + locationManager = mock(MockMode.autofill) + mqttManager = mock(MockMode.autofill) + historyManager = mock(MockMode.autofill) + radioConfigRepository = mock(MockMode.autofill) + commandSender = mock(MockMode.autofill) + sessionManager = mock(MockMode.autofill) + nodeManager = mock(MockMode.autofill) + analytics = mock(MockMode.autofill) + packetRepository = mock(MockMode.autofill) + workerManager = mock(MockMode.autofill) + appWidgetUpdater = mock(MockMode.autofill) + + testDispatcher = UnconfinedTestDispatcher() + radioConnectionState.value = ConnectionState.Disconnected + connectionStateFlow.value = ConnectionState.Disconnected + localConfigFlow.value = LocalConfig() + moduleConfigFlow.value = LocalModuleConfig() + every { radioInterfaceService.connectionState } returns radioConnectionState every { radioConfigRepository.localConfigFlow } returns localConfigFlow every { radioConfigRepository.moduleConfigFlow } returns moduleConfigFlow @@ -135,6 +164,12 @@ class MeshConnectionManagerImplTest { scope, ) + private fun restartTransportCallCounter(): () -> Int { + var restartCalls = 0 + everySuspend { radioInterfaceService.restartTransport() } calls { restartCalls += 1 } + return { restartCalls } + } + @AfterTest fun tearDown() = Unit @Test @@ -386,8 +421,9 @@ class MeshConnectionManagerImplTest { fun `concurrent sleep-timeout and radio state change are serialized`() { val standardDispatcher = StandardTestDispatcher() runTest(standardDispatcher) { - // Power saving enabled with a short ls_secs so the sleep timeout fires quickly - val config = LocalConfig(power = Config.PowerConfig(is_power_saving = true, ls_secs = 1)) + // Power saving enabled with ls_secs=0 so the sleep timeout boundary is just before the + // Stage 1 handshake watchdog. That keeps this test isolated to sleep-timeout behavior. + val config = LocalConfig(power = Config.PowerConfig(is_power_saving = true, ls_secs = 0)) every { radioConfigRepository.localConfigFlow } returns flowOf(config) every { nodeManager.nodeDBbyNodeNum } returns emptyMap() @@ -400,22 +436,24 @@ class MeshConnectionManagerImplTest { } manager = createManager(backgroundScope) - advanceUntilIdle() + runCurrent() // Transition to Connected -> DeviceSleep to start the sleep timer radioConnectionState.value = ConnectionState.Connected - advanceUntilIdle() + runCurrent() radioConnectionState.value = ConnectionState.DeviceSleep - advanceUntilIdle() + runCurrent() observed.clear() // Before the sleep timeout fires, emit Connected from the radio (simulating device // waking up). Then let the timeout fire. The mutex ensures they don't race. radioConnectionState.value = ConnectionState.Connected - // Advance past the sleep timeout (ls_secs=1 + 30s base = 31s) - advanceTimeBy(32_000L) - advanceUntilIdle() + runCurrent() + // Advance past the sleep timeout (ls_secs=0 + 30s base), but stop before the Stage 1 + // handshake watchdog fires at roughly 30.1s after the wake-up Connected signal. + advanceTimeBy(30_050L) + runCurrent() // The Connected transition should have cancelled the sleep timeout, so we should // end up in Connecting (from handleConnected), NOT Disconnected (from timeout). @@ -426,4 +464,713 @@ class MeshConnectionManagerImplTest { ) } } + + @Test + fun `Stage 1 config stall triggers transport restart and ends Disconnected`() = runTest(testDispatcher) { + manager = createManager(backgroundScope) + // Disconnected -> Connected: handleConnected() sets Connecting, sends pre-handshake + // heartbeat, and (after PRE_HANDSHAKE_SETTLE_MS=100ms) calls startConfigOnly() which + // arms the Stage 1 stall guard (HANDSHAKE_TIMEOUT_STAGE1 = 30s). + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // Pre-condition: Stage 1 is in flight — manager is Connecting and a ToRadio has been sent + // (heartbeat + want_config_id). Use at-least-one here so the test isn't brittle on the + // exact packet count. + assertEquals( + ConnectionState.Connecting, + serviceRepository.connectionState.value, + "Manager should be Connecting after radio Connected", + ) + verify { packetHandler.sendToRadio(any()) } + + // Advance past HANDSHAKE_TIMEOUT_STAGE1 (30s) WITHOUT any config arrival. The stall + // fires the recovery sibling unconditionally on every transport — there is no longer a + // want_config retry delay before restart. The production code runs BOTH transitions + // inside one sibling recovery job — onConnectionChanged(Disconnected) FIRST, then + // restartTransport() — so the fresh Connected emission from restartTransport arrives + // with app-level state already Disconnected and is not ignored by the redundant- + // Connecting guard in onConnectionChanged. The advanceTimeBy keeps extra slack so the + // test stays robust against small timeout tweaks. + advanceTimeBy(46_000L) + advanceUntilIdle() + + verifySuspend(exactly(1)) { radioInterfaceService.restartTransport() } + assertEquals( + ConnectionState.Disconnected, + serviceRepository.connectionState.value, + "Stage 1 stall should end in Disconnected after restart is requested", + ) + } + + @Test + fun `Handshake stall recovery orders app disconnect before transport restart emissions`() = + runTest(testDispatcher) { + // This test locks in the ordering invariant of the stall recovery + // sibling: onConnectionChanged(Disconnected) runs FIRST, then restartTransport(). + // We deliberately do NOT stub restartTransport() — the default mock no-op leaves it + // as a pure boundary call so the sibling's two phases can be observed independently. + // + // After the stall fires and the sibling completes, we MANUALLY replay the + // transport-level emissions that the real restartTransport() would produce: + // - DeviceSleep (onDisconnect(isPermanent=false) on the old transport) + // - Connected (the new transport's onConnect callback) + // Under the FIXED ordering, the fresh Connected arrives with app state already + // Disconnected, bypasses the redundant-Connecting guard in onConnectionChanged, + // and re-enters handleConnected → state returns to Connecting. + // Under the BROKEN (old) ordering — restartTransport() BEFORE Disconnected — the + // fresh Connected would arrive while app state was still Connecting, the redundant- + // Connecting guard would drop it, and the state would never return to Connecting. + // + // Restructured to be deterministic on JVM CI: rather than relying on a stubbed + // restartTransport() lambda whose StateFlow side-effect emissions race with the + // flow collector under Mokkery, the test body itself drives the emissions in order. + manager = createManager(backgroundScope) + // Disconnected -> Connected: handleConnected() sets Connecting, sends pre-handshake + // heartbeat, and (after PRE_HANDSHAKE_SETTLE_MS=100ms) calls startConfigOnly() which + // arms the Stage 1 stall guard (HANDSHAKE_TIMEOUT_STAGE1 = 30s). + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // Pre-condition: Stage 1 is in flight. + assertEquals( + ConnectionState.Connecting, + serviceRepository.connectionState.value, + "Manager should be Connecting after radio Connected", + ) + + // Advance past HANDSHAKE_TIMEOUT_STAGE1 (30s) WITHOUT any config arrival. The stall + // fires the recovery sibling: onConnectionChanged(Disconnected) FIRST, then + // restartTransport() (default mock no-op, so nothing re-arms a stall guard and + // advanceUntilIdle is safe here). + advanceTimeBy(46_000L) + advanceUntilIdle() + + verifySuspend(exactly(1)) { radioInterfaceService.restartTransport() } + assertEquals( + ConnectionState.Disconnected, + serviceRepository.connectionState.value, + "Sibling must run onConnectionChanged(Disconnected) BEFORE restartTransport() — " + + "proves the app-level Disconnected transition landed", + ) + + // Manually replay the transport-level restart signals that the real restartTransport() + // would emit. DeviceSleep corresponds to onDisconnect(isPermanent=false) on the old + // transport; Connected corresponds to the new transport's onConnect callback. With + // UnconfinedTestDispatcher each emission is collected synchronously inline, so no + // advanceUntilIdle() is needed between them — and none is safe AFTER the Connected + // emission, because handleConnected re-arms a fresh Stage 1 stall guard and + // advanceUntilIdle would advance virtual time past it (and every subsequent re-arm), + // looping the recovery and obscuring the single-shot ordering this test locks in. + radioConnectionState.value = ConnectionState.DeviceSleep + radioConnectionState.value = ConnectionState.Connected + + assertEquals( + ConnectionState.Connecting, + serviceRepository.connectionState.value, + "Fresh Connected emission must re-enter handleConnected (NOT be ignored by the " + + "redundant-Connecting guard) because the app-level Disconnected transition " + + "already ran BEFORE restartTransport's transport cycle — this is the ordering " + + "invariant under test", + ) + } + + @Test + fun `Stage 2 node-info stall triggers transport restart`() = runTest(testDispatcher) { + manager = createManager(backgroundScope) + radioConnectionState.value = ConnectionState.Connected + // Pre-handshake settle completes; Stage 1 stall guard armed. + advanceTimeBy(200) + advanceUntilIdle() + + // Drive the connection into Stage 2. In production this is done by the config-flow + // manager once Stage 1 config arrives; here we invoke it directly. startNodeInfoOnly() + // cancels the Stage 1 stall guard and arms Stage 2 (HANDSHAKE_TIMEOUT_STAGE2 = 60s). + manager.startNodeInfoOnly() + advanceUntilIdle() + + assertEquals( + ConnectionState.Connecting, + serviceRepository.connectionState.value, + "Manager should still be Connecting entering Stage 2", + ) + + // Advance past HANDSHAKE_TIMEOUT_STAGE2 (60s) WITHOUT invoking onNodeDbReady(). The + // stall must fire the recovery sibling unconditionally. + advanceTimeBy(76_000L) + advanceUntilIdle() + + verifySuspend(exactly(1)) { radioInterfaceService.restartTransport() } + assertEquals( + ConnectionState.Disconnected, + serviceRepository.connectionState.value, + "Stage 2 stall should end in Disconnected after restart is requested", + ) + } + + @Test + fun `Handshake completing before stall timeout does not trigger transport restart`() = runTest(testDispatcher) { + // Stubs required by onNodeDbReady() (full handshake completion path). + everySuspend { commandSender.requestTelemetry(any(), any(), any()) } returns Unit + every { nodeManager.myNodeNum } returns MutableStateFlow(123) + every { mqttManager.startProxy(any(), any()) } returns Unit + everySuspend { historyManager.requestHistoryReplay(any(), any(), any(), any()) } returns Unit + every { nodeManager.getMyNodeInfo() } returns null + + manager = createManager(backgroundScope) + radioConnectionState.value = ConnectionState.Connected + // Pre-handshake settle completes; Stage 1 stall guard armed. + advanceTimeBy(200) + advanceUntilIdle() + + // Simulate the full handshake completing (config arrives + NodeDB becomes ready). + // onNodeDbReady() cancels handshakeTimeout, so the stall recovery sibling can + // never run even if virtual time later crosses the stage windows. + manager.onNodeDbReady() + advanceUntilIdle() + + // Advance well past BOTH stage windows (Stage 1: 30s, Stage 2: 60s). + advanceTimeBy(120_000L) + advanceUntilIdle() + + verifySuspend(exactly(0)) { radioInterfaceService.restartTransport() } + } + + @Test + fun `TCP Stage 1 stall fires restartTransport at 12s without retry`() = runTest(testDispatcher) { + // Address starting with 't' → DeviceType.TCP → fast recovery transport. + every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42" + val sentPackets = mutableListOf() + every { packetHandler.sendToRadio(any()) } calls + { call -> + sentPackets.add(call.arg(0)) + } + + manager = createManager(backgroundScope) + radioConnectionState.value = ConnectionState.Connected + // Pre-handshake settle (100ms): heartbeat + Stage 1 want_config_id sent, fast watchdog armed. + advanceTimeBy(200) + advanceUntilIdle() + + assertEquals( + ConnectionState.Connecting, + serviceRepository.connectionState.value, + "Manager should be Connecting after radio Connected", + ) + + // Sanity: heartbeat + Stage 1 want_config_id have been sent. + val packetsBeforeStall = sentPackets.size + + // Advance past FAST_HANDSHAKE_TIMEOUT (12s) WITHOUT any progress signal. The fast + // branch must fire the recovery sibling directly — no same-session want_config re-send. + advanceTimeBy(13_000L) + advanceUntilIdle() + + verifySuspend(exactly(1)) { radioInterfaceService.restartTransport() } + assertEquals( + ConnectionState.Disconnected, + serviceRepository.connectionState.value, + "Fast stall should end in Disconnected after restart is requested", + ) + // The recovery sibling must not re-send want_config_id on the same session: no additional + // packet may be sent between the initial arming and the restartTransport call (a same- + // session re-send re-enters firmware handleStartConfig() and crashes the device). + assertEquals( + packetsBeforeStall, + sentPackets.size, + "Fast transport stall must NOT trigger a retry send of want_config_id", + ) + } + + @Test + fun `TCP Stage 1 meaningful progress resets watchdog without false restart`() = runTest(testDispatcher) { + every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42" + manager = createManager(backgroundScope) + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // Trickle inbound progress signals — each lands well inside the 12s fast window, so + // each one re-arms the watchdog and no restart may fire across multiple resets. + repeat(5) { + advanceTimeBy(8_000L) + manager.onHandshakeProgress() + advanceUntilIdle() + } + + verifySuspend(exactly(0)) { radioInterfaceService.restartTransport() } + assertEquals( + ConnectionState.Connecting, + serviceRepository.connectionState.value, + "Steady trickle of progress must keep the manager Connecting", + ) + } + + @Test + fun `TCP Stage 1 watchdog fires after progress stops`() = runTest(testDispatcher) { + every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42" + manager = createManager(backgroundScope) + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // Two progress signals land inside the window. + advanceTimeBy(8_000L) + manager.onHandshakeProgress() + advanceUntilIdle() + advanceTimeBy(8_000L) + manager.onHandshakeProgress() + advanceUntilIdle() + + // Progress stops; advance past the fast timeout from the last re-arm. + advanceTimeBy(13_000L) + advanceUntilIdle() + + verifySuspend(exactly(1)) { radioInterfaceService.restartTransport() } + assertEquals( + ConnectionState.Disconnected, + serviceRepository.connectionState.value, + "Watchdog must fire once progress stops for longer than the fast timeout", + ) + } + + @Test + fun `TCP Stage 2 stall fires restartTransport at 12s`() = runTest(testDispatcher) { + every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42" + manager = createManager(backgroundScope) + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // Drive the connection into Stage 2. In production this is done by the config-flow + // manager once Stage 1 config arrives; here we invoke it directly. startNodeInfoOnly() + // cancels the Stage 1 watchdog and arms Stage 2 with the fast timeout (12s on TCP). + manager.startNodeInfoOnly() + advanceUntilIdle() + + assertEquals( + ConnectionState.Connecting, + serviceRepository.connectionState.value, + "Manager should still be Connecting entering Stage 2", + ) + + // Advance past the fast timeout WITHOUT invoking onNodeDbReady(). + advanceTimeBy(13_000L) + advanceUntilIdle() + + verifySuspend(exactly(1)) { radioInterfaceService.restartTransport() } + assertEquals( + ConnectionState.Disconnected, + serviceRepository.connectionState.value, + "Stage 2 fast stall should end in Disconnected after restart is requested", + ) + } + + @Test + fun `TCP Stage 2 NodeInfo progress resets watchdog`() = runTest(testDispatcher) { + every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42" + manager = createManager(backgroundScope) + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + manager.startNodeInfoOnly() + advanceUntilIdle() + + // Stage 2 node-info burst packets arrive as a trickle; each resets the fast watchdog. + repeat(3) { + advanceTimeBy(7_000L) + manager.onHandshakeProgress() + advanceUntilIdle() + } + + verifySuspend(exactly(0)) { radioInterfaceService.restartTransport() } + assertEquals( + ConnectionState.Connecting, + serviceRepository.connectionState.value, + "Stage 2 progress must keep the manager Connecting", + ) + } + + @Test + fun `TCP fast recovery preserves Disconnected-before-restartTransport ordering`() = runTest(testDispatcher) { + every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42" + + val observed = mutableListOf() + var progressBeforeRestart: String? = null + var restartTransportCalls = 0 + every { serviceRepository.setConnectionState(any()) } calls + { call -> + val state = call.arg(0) + observed.add(state) + connectionStateFlow.value = state + } + every { serviceRepository.setConnectionProgress(any()) } calls + { call -> + // Capture the most recent progress string at the moment restartTransport runs. + // This locks in that the "Reconnecting…" UX hook fires BEFORE the app-level + // Disconnected transition. + progressBeforeRestart = call.arg(0) + } + everySuspend { radioInterfaceService.restartTransport() } calls + { + restartTransportCalls++ + // At the moment restartTransport runs, the app-level state MUST already be + // Disconnected — otherwise the fresh transport Connected emission would be + // dropped by the redundant-Connecting guard and we would re-introduce the + // split-brain this recovery path exists to break. + assertEquals( + ConnectionState.Disconnected, + connectionStateFlow.value, + "restartTransport must run AFTER app-level Disconnected transition", + ) + assertEquals( + ServiceRepository.RECONNECTING_PROGRESS_TEXT, + progressBeforeRestart, + "setConnectionProgress(ServiceRepository.RECONNECTING_PROGRESS_TEXT) must run before restartTransport", + ) + } + + manager = createManager(backgroundScope) + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // Fire fast stall. + advanceTimeBy(13_000L) + advanceUntilIdle() + + assertEquals(1, restartTransportCalls) + // Sanity: Connecting → Disconnected was the observed app-level transition path. + assertEquals(ConnectionState.Disconnected, observed.last(), "Last app-level state must be Disconnected") + } + + @Test + fun `post-handshake failure recovery disconnects before restarting transport`() = runTest(testDispatcher) { + every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42" + + val observed = mutableListOf() + var progressBeforeRestart: String? = null + var restartTransportCalls = 0 + every { serviceRepository.setConnectionState(any()) } calls + { call -> + val state = call.arg(0) + observed.add(state) + connectionStateFlow.value = state + } + every { serviceRepository.setConnectionProgress(any()) } calls + { call -> + progressBeforeRestart = call.arg(0) + } + everySuspend { radioInterfaceService.restartTransport() } calls + { + restartTransportCalls++ + assertEquals( + ConnectionState.Disconnected, + connectionStateFlow.value, + "post-handshake recovery must disconnect app state before restarting transport", + ) + assertEquals( + ServiceRepository.RECONNECTING_PROGRESS_TEXT, + progressBeforeRestart, + "post-handshake recovery should surface reconnecting progress before restartTransport", + ) + } + + manager = createManager(backgroundScope) + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + + manager.onHandshakeComplete() + manager.recoverPostHandshakeFailure() + advanceUntilIdle() + + assertEquals(1, restartTransportCalls) + assertEquals(ConnectionState.Disconnected, observed.last(), "Last app-level state must be Disconnected") + } + + @Test + fun `BLE transport unaffected by onHandshakeProgress regression`() = runTest(testDispatcher) { + // Explicit BLE address (starts with 'x') — must NOT engage the fast path. + every { radioInterfaceService.getDeviceAddress() } returns "xAA:BB:CC:DD:EE:FF" + manager = createManager(backgroundScope) + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // Spam progress signals — BLE must ignore them, so the 30s Stage 1 budget is unchanged. + repeat(10) { + manager.onHandshakeProgress() + advanceUntilIdle() + } + + // Advance to 13s — would fire a fast watchdog if BLE were incorrectly included. + advanceTimeBy(13_000L) + advanceUntilIdle() + + verifySuspend(exactly(0)) { radioInterfaceService.restartTransport() } + assertEquals( + ConnectionState.Connecting, + serviceRepository.connectionState.value, + "BLE must ignore onHandshakeProgress and stay Connecting through 13s", + ) + + // Now advance past the full BLE Stage 1 budget (30s). We already advanced 13s, so 33s + // more crosses the 30s Stage 1 stall threshold the recovery sibling fires at. (Extra + // slack is harmless — the BLE stage has no retry delay before recovery.) + advanceTimeBy(33_000L) + advanceUntilIdle() + + verifySuspend(exactly(1)) { radioInterfaceService.restartTransport() } + } + + @Test + fun `onHandshakeProgress is a no-op when state is not Connecting`() = runTest(testDispatcher) { + every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42" + manager = createManager(backgroundScope) + // Manager starts in Disconnected (initial state, no radio Connected signal yet). + assertEquals( + ConnectionState.Disconnected, + serviceRepository.connectionState.value, + "Precondition: manager must start Disconnected", + ) + + // Calling onHandshakeProgress while not Connecting must not arm any watchdog. + manager.onHandshakeProgress() + advanceUntilIdle() + advanceTimeBy(60_000L) + advanceUntilIdle() + + verifySuspend(exactly(0)) { radioInterfaceService.restartTransport() } + } + + @Test + fun `USB serial transport engages fast path like TCP`() = runTest(testDispatcher) { + // Address starting with 's' → DeviceType.USB → fast recovery transport. + every { radioInterfaceService.getDeviceAddress() } returns "s/dev/ttyUSB0" + manager = createManager(backgroundScope) + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // Fast timeout (12s) fires for USB just like TCP. + advanceTimeBy(13_000L) + advanceUntilIdle() + + verifySuspend(exactly(1)) { radioInterfaceService.restartTransport() } + assertEquals( + ConnectionState.Disconnected, + serviceRepository.connectionState.value, + "USB fast stall should end in Disconnected after restart is requested", + ) + } + + @Test + fun `onHandshakeComplete cancels armed Stage 2 fast watchdog`() = runTest(testDispatcher) { + // TCP address → DeviceType.TCP → fast recovery transport, 12s fast watchdog. + every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42" + manager = createManager(backgroundScope) + radioConnectionState.value = ConnectionState.Connected + // Pre-handshake settle (100ms) completes; Stage 1 fast watchdog armed. + advanceTimeBy(200) + advanceUntilIdle() + + assertEquals( + ConnectionState.Connecting, + serviceRepository.connectionState.value, + "Manager should be Connecting after radio Connected", + ) + + // Drive into Stage 2. startNodeInfoOnly() cancels the Stage 1 watchdog and arms Stage 2 + // with the 12s fast timeout. In production this is invoked by MeshConfigFlowManagerImpl + // after Stage 1 completes; here we drive it directly to isolate the watchdog path. + manager.startNodeInfoOnly() + advanceUntilIdle() + + // Synchronously cancel the watchdog, exactly as MeshConfigFlowManagerImpl now does the + // instant NODE_INFO_NONCE arrives — BEFORE the async DB install work begins. + manager.onHandshakeComplete() + + // Advance past the 12s fast timeout. With the watchdog cancelled, the recovery + // sibling must NOT fire: no restartTransport, no state transition. + advanceTimeBy(13_000L) + advanceUntilIdle() + + verifySuspend(exactly(0)) { radioInterfaceService.restartTransport() } + assertEquals( + ConnectionState.Connecting, + serviceRepository.connectionState.value, + "Watchdog was cancelled by onHandshakeComplete — no restart, no state transition", + ) + } + + @Test + fun `Second recovery attempt applies exponential backoff delay before restart`() = runTest(testDispatcher) { + // TCP address → fast path (12s stall window). Smallest reliable cycle for backoff testing. + every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42" + val restartTransportCalls = restartTransportCallCounter() + + manager = createManager(backgroundScope) + // Initial Connected → Connecting, pre-handshake armed. + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // Stall #1 fires (priorFailures=0 → no backoff). restartTransport called exactly once. + advanceTimeBy(13_000L) + advanceUntilIdle() + assertEquals(1, restartTransportCalls()) + + // Replay the transport restart cycle so handleConnected re-arms the stall guard. DeviceSleep + // is a no-op here (power-saving off); Connected re-enters handleConnected → new pre-handshake. + radioConnectionState.value = ConnectionState.DeviceSleep + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // Stall #2 fires (priorFailures=1 → 2 s backoff before restartTransport). The sibling has + // launched but is suspended in delay(2.seconds). advanceUntilIdle cannot make progress + // because the only pending task is the timed delay. + advanceTimeBy(13_000L) + advanceUntilIdle() + // Prove the backoff is in effect: restartTransport must NOT have been called yet for the + // second stall. If the backoff were skipped, this would already be exactly(2). + assertEquals(1, restartTransportCalls()) + + // Advancing past the 2 s backoff resumes the sibling and triggers the second restart. + advanceTimeBy(2_000L) + advanceUntilIdle() + assertEquals(2, restartTransportCalls()) + } + + @Test + fun `Recovery exhaustion after three consecutive failures surfaces sticky error and stops recovery`() = + runTest(testDispatcher) { + every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42" + + // Capture every setErrorMessage invocation so we can verify the sticky error surfaced + // with Severity.Error without depending on the exact localized text. + val errorMessages = mutableListOf>() + every { serviceRepository.setErrorMessage(any(), any()) } calls + { call -> + errorMessages += call.arg(0) to call.arg(1) + } + val restartTransportCalls = restartTransportCallCounter() + + manager = createManager(backgroundScope) + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // Stall #1 (priorFailures=0): no delay → restart. Replay Connected. + advanceTimeBy(13_000L) + advanceUntilIdle() + radioConnectionState.value = ConnectionState.DeviceSleep + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // Stall #2 (priorFailures=1): 2 s backoff → restart. Replay Connected. + advanceTimeBy(13_000L) + advanceUntilIdle() + advanceTimeBy(2_000L) + advanceUntilIdle() + radioConnectionState.value = ConnectionState.DeviceSleep + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // Stall #3 (priorFailures=2): 4 s backoff → restart. Replay Connected. + advanceTimeBy(13_000L) + advanceUntilIdle() + advanceTimeBy(4_000L) + advanceUntilIdle() + assertEquals(3, restartTransportCalls()) + radioConnectionState.value = ConnectionState.DeviceSleep + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // Stall #4 (priorFailures=3 = MAX_CONSECUTIVE_RECOVERY_FAILURES): sticky error path. + // The sibling must NOT call restartTransport; instead it resets the counter, clears the + // progress text, transitions to Disconnected, and surfaces the sticky error. + advanceTimeBy(13_000L) + advanceUntilIdle() + + assertEquals(3, restartTransportCalls()) + assertEquals( + ConnectionState.Disconnected, + serviceRepository.connectionState.value, + "Recovery exhaustion must leave the manager in Disconnected", + ) + // The sticky error is surfaced via setConnectionProgress("") (cleared because the user + // must manually retry) plus setErrorMessage(..., Severity.Error). We assert at least one + // setErrorMessage call landed with Severity.Error — this is the user-visible signal that + // recovery gave up. (Exact text comes from Res.string.error_recovery_exhausted and is + // resolved by the production code via getStringSuspend.) + assertTrue( + errorMessages.any { (_, severity) -> severity == Severity.Error }, + "Recovery exhaustion must surface a sticky ERROR-severity message; got: $errorMessages", + ) + // Counter is reset to 0 by the sticky-error branch so a manual user retry starts fresh. + // We can't read the private atomic directly, but the reset is exercised by the next-stall + // behavior; its correctness is locked in by `onHandshakeComplete resets counter` and by + // the symmetric reset-on-exhaust code path in runSiblingHandshakeRecovery(). + } + + @Test + fun `Successful handshake resets the consecutive recovery failure counter`() = runTest(testDispatcher) { + every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42" + val restartTransportCalls = restartTransportCallCounter() + + manager = createManager(backgroundScope) + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // Stall #1 (priorFailures=0): no delay → restart. + advanceTimeBy(13_000L) + advanceUntilIdle() + assertEquals(1, restartTransportCalls()) + radioConnectionState.value = ConnectionState.DeviceSleep + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // Stall #2 (priorFailures=1): 2 s backoff → restart. Counter now at 2. + advanceTimeBy(13_000L) + advanceUntilIdle() + advanceTimeBy(2_000L) + advanceUntilIdle() + assertEquals(2, restartTransportCalls()) + radioConnectionState.value = ConnectionState.DeviceSleep + radioConnectionState.value = ConnectionState.Connected + advanceTimeBy(200) + advanceUntilIdle() + + // The next stall WOULD see priorFailures=2 (4 s backoff) if the counter were not reset. + // Call onHandshakeComplete() — this is the production signal that the handshake succeeded + // (e.g. NODE_INFO_NONCE arrived). It cancels the armed stall guard and resets the counter. + manager.onHandshakeComplete() + advanceUntilIdle() + + // Re-arm a fresh Stage 1 stall guard directly. We use startConfigOnly() (the same entry + // point handleConnected calls) instead of replaying a Connected emission because the + // current state is still Connecting — a redundant Connected emission would be rejected by + // the redundant-Connected-while-Connecting guard in onConnectionChanged, so handleConnected + // would never re-run. The counter is now 0 thanks to the reset above. + manager.startConfigOnly() + advanceUntilIdle() + + // Stall #3 fires. Because the counter was reset, priorFailures=0 → NO backoff. The restart + // must land immediately, without the 2 s or 4 s delay that an un-reset counter would impose. + advanceTimeBy(13_000L) + advanceUntilIdle() + assertEquals(3, restartTransportCalls()) + // If the counter had NOT been reset, priorFailures would be 2 here and the sibling would be + // suspended in delay(4.seconds); restartTransport would still be exactly(2). Reaching + // exactly(3) immediately after the stall proves the counter was reset to 0. + } } diff --git a/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/MeshConnectionManager.kt b/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/MeshConnectionManager.kt index a39053954..2d75b3957 100644 --- a/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/MeshConnectionManager.kt +++ b/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/MeshConnectionManager.kt @@ -32,6 +32,38 @@ interface MeshConnectionManager { /** Called when the node database is ready and fully populated. */ suspend fun onNodeDbReady() + /** + * Synchronously cancels the transport-aware handshake watchdog the moment the firmware signals Stage 2 completion + * (NODE_INFO_NONCE received). + * + * This MUST be invoked before the asynchronous NodeDB install work begins so that a slow DB commit on a large mesh + * cannot trip the 12 s fast-recovery timeout after the firmware handshake has already succeeded. The watchdog + * cancellation it performs is a strict subset of [onNodeDbReady]; the remaining post-NodeDB side effects + * (analytics, MQTT start, history replay, telemetry requests) stay gated on [onNodeDbReady] at the end of the DB + * install block. + */ + fun onHandshakeComplete() + + /** + * Recovers from a failure after firmware handshake completion but before NodeDB install has completed. + * + * At this point the handshake watchdog has already been cancelled by [onHandshakeComplete], so recovery must + * explicitly move the app-level state out of Connecting and restart the raw transport in the same ordering used by + * handshake stall recovery. + */ + fun recoverPostHandshakeFailure() + + /** + * Called when meaningful handshake progress is observed on the wire (e.g. an inbound packet related to the + * in-flight config or node-info exchange). + * + * On fast transports (TCP, USB serial) this re-arms the transport-aware handshake watchdog so a steady trickle of + * progress does not trip the aggressive fast-recovery timeout while a true stall still fires on schedule. On BLE + * this is a no-op: BLE keeps the original long-and-retry stall-guard budget because GATT latency is high and + * variable. + */ + fun onHandshakeProgress() + /** Updates the telemetry information for the local node. */ fun updateTelemetry(t: Telemetry) diff --git a/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/RadioInterfaceService.kt b/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/RadioInterfaceService.kt index 51d855494..754dd38d2 100644 --- a/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/RadioInterfaceService.kt +++ b/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/RadioInterfaceService.kt @@ -101,6 +101,35 @@ interface RadioInterfaceService : RadioTransportCallback { */ suspend fun disconnect() + /** + * Silent in-place transport restart for handshake stalls: tears down the active transport and re-establishes it in + * place, without touching the connection-request gate or the selected device address. + * + * Both transport families use this recovery path but with different trigger timings: TCP/USB reach it through the + * fast-path watchdog (~12s after the last meaningful handshake packet), while BLE reaches it after the + * retry-exhausted path (~30s/60s watchdog plus a ~15s retry window). In both cases the symptom is identical: the + * transport itself may still be physically [ConnectionState.Connected] (e.g. a TCP socket whose radio firmware has + * stopped responding to `want_config_id`, or a BLE link whose GATT peer has stalled), so flipping app-level state + * to [ConnectionState.Disconnected] alone leaves a split-brain: transport Connected + `connectionRequested=true` + + * a live `RadioTransport` handle, which then blocks same-node reconnect via [setDeviceAddress]'s fast-path. This + * method breaks that deadlock by cycling the transport in place. + * + * Contract: + * - Preserves the selected device address (does not modify [currentDeviceAddressFlow]). + * - Preserves the `connectionRequested` gate; **MUST NOT** clear it. Safe to call concurrently with an explicit + * [disconnect] — the internal gate check makes it a no-op in that case. + * - Safe to call when no transport is running — implementations must no-op. + * - Does **NOT** bypass selected-device validation; the replacement transport is built from the same bonded address + * via the normal start path. + * - Emits ordinary transport-level transitions through the existing [RadioTransportCallback] surface, so observers + * see the transient [ConnectionState.DeviceSleep] state followed by [ConnectionState.Connected] when the + * replacement transport connects. (No [ConnectionState.Connecting] is emitted at the transport layer — that is an + * app-level state set by [MeshConnectionManager], not a transport callback.) + * + * Suspends until the teardown/restart cycle completes. + */ + suspend fun restartTransport() + /** Returns the current device address. */ fun getDeviceAddress(): String? diff --git a/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/ServiceRepository.kt b/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/ServiceRepository.kt index 1d0e9b5d6..cd8b54f18 100644 --- a/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/ServiceRepository.kt +++ b/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/ServiceRepository.kt @@ -163,4 +163,17 @@ interface ServiceRepository : /** Clears the current neighbor info response. */ override fun clearNeighborInfoResponse() + + companion object { + /** + * The cross-module contract for the WiFi/TCP handshake-watchdog recovery progress signal. + * + * [MeshConnectionManager] writes this exact literal to [connectionProgress] immediately before its recovery + * sibling transitions the transport to [ConnectionState.Disconnected], and `ConnectionsViewModel` (and its + * tests) compare incoming progress against it to surface `RECONNECTING` UI state instead of a final-feeling + * `NOT_CONNECTED`. The literal text and the U+2026 HORIZONTAL ELLIPSIS character MUST match exactly across both + * writers and readers — centralizing it here removes the prior cross-module string-contract hazard. + */ + const val RECONNECTING_PROGRESS_TEXT = "Reconnecting\u2026" + } } diff --git a/core/resources/src/commonMain/composeResources/values/strings.xml b/core/resources/src/commonMain/composeResources/values/strings.xml index 294fc9a0b..d8ef3d42e 100644 --- a/core/resources/src/commonMain/composeResources/values/strings.xml +++ b/core/resources/src/commonMain/composeResources/values/strings.xml @@ -537,6 +537,7 @@ Environment metrics use Fahrenheit Error Duty Cycle limit reached. Cannot send messages right now, please try again later. + Could not establish a stable connection after repeated attempts. Please re-select the node to retry. Connect & administer Establishing remote session… Ethernet Options @@ -1172,6 +1173,7 @@ Ignores observed messages from foreign meshes that are open or those which it cannot decrypt. Only rebroadcasts message on the nodes local primary / secondary channels. Only permitted for SENSOR, TRACKER and TAK_TRACKER roles, this will inhibit all rebroadcasts, not unlike CLIENT_MUTE role. Recent Network Devices + Reconnecting… Red Refresh Refresh metadata @@ -1456,6 +1458,23 @@ Showing %1$d/%2$d nodes Track and Share Locations track point + + Traffic Management + Traffic Management Configuration + Drop Unknown Packets + Module Enabled + Local-only Position (Relays) + Local-only Telemetry (Relays) + NodeInfo Direct Response + Max Hops for Direct Response + Position Deduplication + Min Position Interval (secs) + Position Precision (bits) + Rate Limiting + Max Packets in Window + Rate Limit Window (secs) + Preserve Router Hops + Unknown Packet Threshold Transmit over LoRa BLE TCP diff --git a/core/service/src/commonMain/kotlin/org/meshtastic/core/service/SharedRadioInterfaceService.kt b/core/service/src/commonMain/kotlin/org/meshtastic/core/service/SharedRadioInterfaceService.kt index 96571d3d7..98bb4b079 100644 --- a/core/service/src/commonMain/kotlin/org/meshtastic/core/service/SharedRadioInterfaceService.kt +++ b/core/service/src/commonMain/kotlin/org/meshtastic/core/service/SharedRadioInterfaceService.kt @@ -270,6 +270,80 @@ class SharedRadioInterfaceService( } } + override suspend fun restartTransport() { + // CAS BEFORE the mutex, mirroring checkLiveness()'s coordination structure: both + // restart paths CAS synchronously, one wins, one loses immediately. Performing the + // CAS inside transportMutex.withLock races checkLiveness's outer + // `finally { isRestarting = false }` (which runs AFTER mutex release): a queued + // restartTransport that resumes from mutex.wait can observe isRestarting == false, + // win the CAS, and produce an extra transport cycle (3 instead of 2) under the JVM's + // real dispatcher. The loser here observes isRestarting == true and defers to the + // in-flight cycle. startTransportLocked() is idempotent w.r.t. an existing transport, + // but the CAS also prevents a double stop/stop race on the teardown side. + if (!isRestarting.compareAndSet(expect = false, update = true)) { + Logger.d { "restartTransport: skipped, concurrent restart in progress" } + return + } + try { + transportMutex.withLock { + // Silent recovery for app-level handshake stalls. The transport may still be physically + // up (TCP socket alive, firmware unresponsive to want_config_id), so cycling it in place + // WITHOUT clearing connectionRequested avoids the split-brain where setDeviceAddress's + // fast-path would otherwise block same-node reconnect. The caller (MeshConnectionManager) + // is responsible for the app-level Disconnected flip; this method only cycles the + // transport and emits the transport-level DeviceSleep -> Connected transitions via + // callbacks (no Connecting — that is an app-level state, not a transport emission). + if (!connectionRequested) { + Logger.d { "restartTransport: skipped-not-requested" } + return@withLock + } + if (getBondedDeviceAddress() == null) { + Logger.d { "restartTransport: skipped-no-address" } + return@withLock + } + // Honor the documented "safe no-op when no transport running" contract: environmental + // stops (network unavailable, BLE disabled) intentionally preserve + // connectionRequested=true so the recovery listeners above can re-bring-up the + // transport later. A stale restart job running after such a stop must NOT bypass that + // recovery path by creating a transport directly via startTransportLocked(). + if (radioTransport == null) { + Logger.d { "restartTransport: skipped-no-transport" } + return@withLock + } + Logger.w { "restartTransport: restarting transport for ${getDeviceAddress()?.anonymize}" } + // Mirror checkLiveness()'s coordination contract: emit a transport-level + // Connected -> DeviceSleep transition before the stop/start cycle so + // transport-level observers see a full DeviceSleep -> Connected cycle when + // the fresh transport's onConnect() fires and can re-trigger their onConnected + // logic. The caller (runSiblingHandshakeRecovery) flips the app-level state to + // Disconnected before invoking restartTransport(), so this emission exists for + // transport-level coordination, not for app-level StateFlow dedupe. Silent: + // transient DeviceSleep, not a permanent Disconnected. + onDisconnect(isPermanent = false) + // notifyPermanent=false below (no user-facing Disconnected modal — the app-level + // state machine drives that separately) and sendPoliteDisconnect=false (firmware + // is unresponsive, writing a goodbye frame into a dead link only delays teardown). + ignoreExceptionSuspend { stopTransportLocked(notifyPermanent = false, sendPoliteDisconnect = false) } + // Defense-in-depth mirroring the liveness recovery gate; today all + // connectionRequested mutators (connect/disconnect/setDeviceAddress) hold + // transportMutex so this re-check is unreachable, but it guards against future + // refactors that mutate the gate without serialization. + if (!connectionRequested) { + Logger.d { "restartTransport: aborted, disconnect requested during stop" } + return@withLock + } + // startTransportLocked() re-validates the selected address (no-op if null) and emits + // Connected through the transport callbacks (via the new transport's onConnect) once + // the fresh transport comes up — there is no Connecting emission at the transport + // layer (that is an app-level state owned by MeshConnectionManager). + startTransportLocked() + Logger.i { "restartTransport: completed" } + } + } finally { + isRestarting.value = false + } + } + override fun isMockTransport(): Boolean = transportFactory.isMockTransport() override fun toInterfaceAddress(interfaceId: InterfaceId, rest: String): String = diff --git a/core/service/src/commonTest/kotlin/org/meshtastic/core/service/SharedRadioInterfaceServiceLivenessTest.kt b/core/service/src/commonTest/kotlin/org/meshtastic/core/service/SharedRadioInterfaceServiceLivenessTest.kt index 96c98b996..68f6f450c 100644 --- a/core/service/src/commonTest/kotlin/org/meshtastic/core/service/SharedRadioInterfaceServiceLivenessTest.kt +++ b/core/service/src/commonTest/kotlin/org/meshtastic/core/service/SharedRadioInterfaceServiceLivenessTest.kt @@ -785,4 +785,443 @@ class SharedRadioInterfaceServiceLivenessTest { advanceTimeBy(1_000L) } } + + // ─── restartTransport gate contract ───────────────────────────────────────────────────── + // + // [SharedRadioInterfaceService.restartTransport] is the app-level handshake-stall recovery + // path. It mirrors BLE liveness silent recovery: stopTransportLocked(notifyPermanent=false, + // sendPoliteDisconnect=false) then startTransportLocked(). The isRestarting CAS runs + // synchronously BEFORE transportMutex (mirroring checkLiveness), then the remaining gates run + // inside the mutex. The gate contract has four early-return gates that MUST all hold for the + // cycle to fire, evaluated in this exact order: + // 1. isRestarting.compareAndSet(false, true) succeeds (reuses the liveness CAS; synchronous, + // BEFORE acquiring transportMutex — both restart paths serialize on this CAS first) + // 2. connectionRequested == true (cleared by disconnect() / setDeviceAddress(null)/("n"); + // checked inside transportMutex) + // 3. getBondedDeviceAddress() != null (re-validates the selected address; inside transportMutex) + // 4. radioTransport != null (defends against a stale restart after an environmental stop; + // inside transportMutex) + // The cycle must also be address-preserving and silent (no permanent Disconnected, no + // user-facing error). The tests below lock each leg of that contract. + + /** + * Happy path: [SharedRadioInterfaceService.restartTransport] after a normal [connect] with a selected address stops + * the running transport and creates a fresh one. Mirrors the BLE liveness "timeout closes old transport and creates + * fresh one" assertion shape. + */ + @Test + fun `restartTransport after connect closes old transport and creates fresh one`() = runTest(testDispatcher) { + clock = 0L + val service = createConnectedService("xAA:BB:CC:DD:EE:FF") + try { + assertEquals(1, createdTransports.size, "Initial connect should create one transport") + val initialTransport = createdTransports.first() + + // Lock the sendPoliteDisconnect=false contract: clear any bytes recorded during the + // initial connect so sentData reflects only writes performed during restartTransport. + initialTransport.sentData.clear() + + service.restartTransport() + // sendPoliteDisconnect = false → no 500ms drain inside the cycle. Under + // UnconfinedTestDispatcher the whole stop/start runs inline; runCurrent is + // belt-and-suspenders. The trailing disconnect() covers its own polite delay. + testDispatcher.scheduler.runCurrent() + + assertEquals(2, createdTransports.size, "restartTransport should create exactly one fresh transport") + assertTrue(initialTransport.closeCalled, "Old transport must be closed by restartTransport") + assertEquals(1, initialTransport.closeCount, "Old transport closed exactly once (no double-close)") + assertTrue( + initialTransport.sentData.isEmpty(), + "restartTransport must NOT write any bytes to the old transport (sendPoliteDisconnect=false)", + ) + } finally { + service.disconnect() + advanceTimeBy(1_000L) + } + } + + /** + * Regression: [SharedRadioInterfaceService.restartTransport] MUST be a no-op after an explicit + * [SharedRadioInterfaceService.disconnect]. disconnect() clears `connectionRequested` BEFORE stopTransportLocked(), + * and restartTransport() consults that gate as its first check. Without the gate, a racing handshake-induced + * restart would silently resurrect a transport the user tore down. + */ + @Test + fun `restartTransport is a no-op after explicit disconnect`() = runTest(testDispatcher) { + clock = 0L + val service = createConnectedService("xAA:BB:CC:DD:EE:FF") + try { + assertEquals(1, createdTransports.size, "Initial connect should create one transport") + + service.disconnect() + advanceTimeBy(1_000L) + val transportCountAfterDisconnect = createdTransports.size + + service.restartTransport() + testDispatcher.scheduler.runCurrent() + + assertEquals( + transportCountAfterDisconnect, + createdTransports.size, + "restartTransport must not create a transport after disconnect (connectionRequested gate)", + ) + } finally { + service.disconnect() + advanceTimeBy(1_000L) + } + } + + /** + * Regression: [SharedRadioInterfaceService.restartTransport] MUST be a no-op after + * [SharedRadioInterfaceService.setDeviceAddress] with `null`. The deselect clears `connectionRequested` AND leaves + * getBondedDeviceAddress() == null (invalid address); either gate alone is sufficient to skip the restart, but both + * must hold to defend against a stale listener emission re-arming the transport. + */ + @Test + fun `restartTransport is a no-op after setDeviceAddress null`() = runTest(testDispatcher) { + clock = 0L + val service = createConnectedService("xAA:BB:CC:DD:EE:FF") + try { + assertEquals(1, createdTransports.size, "Initial connect should create one transport") + + service.setDeviceAddress(null) + advanceTimeBy(1_000L) + val transportCountAfterDeselect = createdTransports.size + + service.restartTransport() + testDispatcher.scheduler.runCurrent() + + assertEquals( + transportCountAfterDeselect, + createdTransports.size, + "restartTransport must not create a transport after setDeviceAddress(null)", + ) + } finally { + service.disconnect() + advanceTimeBy(1_000L) + } + } + + /** + * Counterpart to the null-deselect test: `"n"` is the UI sentinel for "no device" and is sanitized to `null` inside + * [SharedRadioInterfaceService.setDeviceAddress]. The same gate must skip restartTransport for both forms. + */ + @Test + fun `restartTransport is a no-op after setDeviceAddress n`() = runTest(testDispatcher) { + clock = 0L + val service = createConnectedService("xAA:BB:CC:DD:EE:FF") + try { + assertEquals(1, createdTransports.size, "Initial connect should create one transport") + + service.setDeviceAddress("n") + advanceTimeBy(1_000L) + val transportCountAfterDeselect = createdTransports.size + + service.restartTransport() + testDispatcher.scheduler.runCurrent() + + assertEquals( + transportCountAfterDeselect, + createdTransports.size, + "restartTransport must not create a transport after setDeviceAddress(\"n\")", + ) + } finally { + service.disconnect() + advanceTimeBy(1_000L) + } + } + + /** + * Regression: [SharedRadioInterfaceService.restartTransport] cycles the transport in place WITHOUT writing a new + * devAddr or clearing currentDeviceAddressFlow. The caller (MeshConnectionManager) is the sole owner of address + * changes; restartTransport must never silently re-bind to a different device or evict the selection. + */ + @Test + fun `restartTransport preserves selected device address`() = runTest(testDispatcher) { + clock = 0L + val address = "xAA:BB:CC:DD:EE:FF" + val service = createConnectedService(address) + try { + assertEquals(1, createdTransports.size, "Initial connect should create one transport") + val addrBefore = service.getDeviceAddress() + val prefsAddrBefore = radioPrefs.devAddr.value + + service.restartTransport() + testDispatcher.scheduler.runCurrent() + + assertEquals( + addrBefore, + service.getDeviceAddress(), + "getDeviceAddress must not change across restartTransport", + ) + assertEquals(address, service.getDeviceAddress(), "Selected device address preserved") + assertEquals( + prefsAddrBefore, + radioPrefs.devAddr.value, + "radioPrefs.devAddr must not be rewritten by restartTransport", + ) + } finally { + service.disconnect() + advanceTimeBy(1_000L) + } + } + + /** + * Regression: [SharedRadioInterfaceService.restartTransport] mirrors BLE liveness silent recovery. It MUST emit a + * transient DeviceSleep (via onDisconnect(isPermanent = false)) BEFORE the stop/start cycle so the subsequent + * onConnect() produces a real Connected transition (StateFlow is idempotent on same-value — without the DeviceSleep + * flip, Connected -> Connected is a no-op and MeshConnectionManager stays stuck on its app-level Disconnected + * state). It MUST NOT emit a permanent user-facing Disconnected state — the caller drives app-level state + * transitions separately, and surfacing a permanent disconnect for a self-healing cycle would pop a confusing modal + * for a transient condition. + */ + @Test + fun `restartTransport does not emit permanent Disconnected`() = runTest(testDispatcher) { + clock = 0L + val service = createConnectedService("xAA:BB:CC:DD:EE:FF") + try { + val stateEmissions = mutableListOf() + val collectJob = backgroundScope.launch { service.connectionState.collect { stateEmissions.add(it) } } + + service.restartTransport() + testDispatcher.scheduler.runCurrent() + + collectJob.cancel() + + // The DeviceSleep emission is the intended transport-level transition that lets + // MeshConnectionManager observe a real Connected transition on the fresh transport. + assertTrue( + ConnectionState.DeviceSleep in stateEmissions, + "restartTransport must emit transient DeviceSleep so the post-restart onConnect() re-triggers handleConnected() (emitted: $stateEmissions)", + ) + assertFalse( + ConnectionState.Disconnected in stateEmissions, + "restartTransport must not emit permanent Disconnected state (emitted: $stateEmissions)", + ) + } finally { + service.disconnect() + advanceTimeBy(1_000L) + } + } + + /** + * Regression: [SharedRadioInterfaceService.restartTransport] must not emit a user-facing connection error. + * _connectionError is a no-replay SharedFlow, so the collector must subscribe BEFORE the restart; under + * UnconfinedTestDispatcher the launch runs eagerly up to its first suspension (awaiting SharedFlow emission). A + * silent-recovery cycle surfacing an error modal is the same UX bug as a liveness recovery surfacing one. + */ + @Test + fun `restartTransport does not emit user-facing connection error`() = runTest(testDispatcher) { + clock = 0L + val service = createConnectedService("xAA:BB:CC:DD:EE:FF") + try { + val errors = mutableListOf() + val collectJob = backgroundScope.launch { service.connectionError.collect { errors.add(it) } } + + service.restartTransport() + testDispatcher.scheduler.runCurrent() + + collectJob.cancel() + + assertTrue( + errors.isEmpty(), + "restartTransport must not emit user-facing connection error (got: $errors)", + ) + } finally { + service.disconnect() + advanceTimeBy(1_000L) + } + } + + /** + * Regression: [SharedRadioInterfaceService.restartTransport] reuses the BLE liveness `isRestarting` CAS so a + * concurrent liveness silent-recovery cycle and a handshake-induced restart cannot stack. The loser of the CAS + * observes `isRestarting == true` and defers to the in-flight cycle, preventing a double stop/start race on the + * transport. + * + * restartTransport performs the isRestarting CAS synchronously BEFORE acquiring transportMutex (mirroring + * checkLiveness). Under this pattern, a concurrent liveness restart and a handshake-induced restart serialize on + * the CAS first — the loser returns immediately without touching the mutex. This is deterministic on all + * dispatchers, not just UnconfinedTestDispatcher. + * + * Deterministic in-flight overlap: a [GatedFakeRadioTransport] suspends the liveness restart genuinely inside + * stopTransportLocked → close() (awaiting closeGate). While the liveness coroutine holds `isRestarting == true` + * inside its restart cycle, a concurrent `restartTransport()` call CAS-fails on `isRestarting` and returns + * immediately without ever acquiring `transportMutex`. A stacking bug (CAS ignored, or CAS performed inside the + * mutex) would produce 3 transports. + */ + @Test + fun `restartTransport coordinates with in-flight liveness restart via isRestarting`() = runTest(testDispatcher) { + val gatedTransports = mutableListOf() + val closeGate = CompletableDeferred() + // Publish the gate to activeCloseGate so tearDown can release it even if an assertion + // throws before we reach the inner try/finally — otherwise disconnect() below would + // hang forever on the gated close(). + activeCloseGate = closeGate + val transportProvider: () -> RadioTransport = { + GatedFakeRadioTransport(closeGate).also { gatedTransports.add(it) } + } + + clock = 0L + val service = createConnectedService("xAA:BB:CC:DD:EE:FF", transportProvider) + try { + assertEquals(1, gatedTransports.size, "Initial connect should create one transport") + val initialTransport = gatedTransports.first() + + // Past the 60s threshold → first checkLiveness CAS-sets isRestarting=true and + // launches a restart coroutine whose close() suspends on closeGate. Under + // UnconfinedTestDispatcher the launched coroutine runs eagerly up to the + // suspension point: by the time checkLiveness() returns, mutex is held and + // isRestarting == true. + clock = 65_000L + service.checkLiveness() + + // Issue restartTransport() while the liveness restart is in-flight. Under the refactored + // CAS-before-mutex pattern, restartTransport's CAS fails immediately (isRestarting is already + // true from checkLiveness's synchronous CAS). It returns without acquiring the mutex. + val restartJob = backgroundScope.launch { service.restartTransport() } + testDispatcher.scheduler.runCurrent() + + try { + // Pre-release: liveness still in-flight (close() suspended on closeGate), + // but restartTransport already returned via CAS-fail on isRestarting (it never + // touched transportMutex). No fresh transport from restartTransport, no stacking. + assertEquals( + 1, + gatedTransports.size, + "No fresh transport created while liveness restart is in-flight", + ) + assertTrue(initialTransport.closeCalled, "Liveness restart must have entered close()") + assertEquals( + 0, + initialTransport.closeCompletedCount, + "Liveness restart close() must still be suspended (gate not yet released)", + ) + assertTrue( + restartJob.isCompleted, + "restartTransport should CAS-fail immediately when liveness already holds isRestarting, " + + "not queue on transportMutex", + ) + } finally { + // Release the gate unconditionally so the suspended liveness restart can + // complete. tearDown also releases activeCloseGate, but completing here is + // required for the post-finally assertions to observe the resumed cycle. + closeGate.complete(Unit) + } + + // Release the gate: liveness restart completes (close + startTransport). + // restartTransport already returned via CAS-fail, so no second cycle. Exactly 2 transports. + testDispatcher.scheduler.runCurrent() + advanceTimeBy(1_000L) + restartJob.join() + + // Exactly 2 transports: 1 initial + 1 from liveness restart. restartTransport + // was a no-op via the isRestarting CAS. A stacking bug would produce 3. + assertEquals( + 2, + gatedTransports.size, + "restartTransport must not stack another cycle on an in-flight liveness restart", + ) + assertEquals(1, initialTransport.closeCount, "Initial transport closed exactly once") + assertEquals( + 1, + initialTransport.closeCompletedCount, + "Initial transport close completed exactly once after gate release", + ) + } finally { + service.disconnect() + advanceTimeBy(1_000L) + } + } + + /** + * Regression for the `radioTransport == null` gate added to [SharedRadioInterfaceService.restartTransport] (the + * mutex-protected early-return, checked after the `isRestarting` CAS but before the `onDisconnect(isPermanent = + * false)` call). + * + * Scenario: a TCP transport has been torn down by an environmental stop (`networkAvailable = false`) but + * `connectionRequested` is intentionally preserved so the network recovery listener can re-bring-up the transport + * when connectivity returns. A stale handshake-stall restart fired in that window MUST be a no-op: + * - It MUST NOT create a fresh transport via `startTransportLocked()` (that would bypass the recovery listener, + * which owns the re-bring-up). + * - It MUST NOT emit `DeviceSleep` via `onDisconnect(isPermanent = false)` (the gate returns before that call), so + * the recovery path's later transitions stay meaningful. + * + * Counter-assertion: the recovery listener itself MUST still function after the gate fires — toggling + * `networkAvailable` back to `true` MUST create the second transport, proving the gate did not break the documented + * re-bring-up path. + */ + @Test + fun `restartTransport is a no-op when transport is environmentally stopped but connection remains requested`() = + runTest(testDispatcher) { + clock = 0L + val networkAvailability = MutableStateFlow(true) + val service = createConnectedService("t192.168.1.100", networkAvailability = networkAvailability) + try { + assertEquals(1, createdTransports.size, "Initial connect should create one transport") + val initialTransport = createdTransports.first() + + // Subscribe BEFORE the environmental stop so we capture every state transition through + // the stop, the gated restartTransport, and the recovery. _connectionState is a StateFlow + // (replays current value to late collectors), so the launch's first emission is Connected. + val stateEmissions = mutableListOf() + val collectJob = backgroundScope.launch { service.connectionState.collect { stateEmissions.add(it) } } + testDispatcher.scheduler.runCurrent() + + // Environmental stop: network drops while a TCP transport is running. The network + // listener (see initStateListeners) calls stopTransportLocked() with defaults + // (notifyPermanent=true → onDisconnect(isPermanent=true) → Disconnected). The transport + // is closed and radioTransport becomes null, but connectionRequested STAYS true so the + // recovery listener can re-bring-up later. + networkAvailability.value = false + testDispatcher.scheduler.runCurrent() + // Drain the polite-disconnect frame (POLITE_DISCONNECT_DRAIN_MS = 500ms). + advanceTimeBy(1_000L) + + assertTrue(initialTransport.closeCalled, "Environmental stop must close the running TCP transport") + assertEquals( + 1, + createdTransports.size, + "Environmental stop must not create a new transport (only teardown)", + ) + + // Stale handshake-stall restart fired in the window where radioTransport == null but + // connectionRequested is still true. The new gate must short-circuit AFTER the + // isRestarting CAS (inside transportMutex) but BEFORE the onDisconnect(isPermanent=false) DeviceSleep + // emission. + service.restartTransport() + testDispatcher.scheduler.runCurrent() + advanceTimeBy(1_000L) + + assertEquals( + 1, + createdTransports.size, + "restartTransport must be a no-op when radioTransport == null (gate fired)", + ) + assertFalse( + ConnectionState.DeviceSleep in stateEmissions, + "Gated restartTransport must NOT emit DeviceSleep " + + "(returned before onDisconnect(isPermanent=false)); emitted: $stateEmissions", + ) + + // Environmental recovery: networkAvailable flips back to true. connectionRequested is + // still true (neither the environmental stop nor the gated restart cleared it), so the + // recovery listener MUST call startTransportLocked() and create the second transport. + // This proves the new gate did not break the documented re-bring-up path. + networkAvailability.value = true + testDispatcher.scheduler.runCurrent() + advanceTimeBy(1_000L) + + collectJob.cancel() + + assertEquals( + 2, + createdTransports.size, + "Network recovery must re-bring-up the transport after the gated no-op restart (gate still set)", + ) + } finally { + service.disconnect() + advanceTimeBy(1_000L) + } + } } diff --git a/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeRadioInterfaceService.kt b/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeRadioInterfaceService.kt index f7837e436..d707ffb9c 100644 --- a/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeRadioInterfaceService.kt +++ b/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeRadioInterfaceService.kt @@ -64,6 +64,8 @@ class FakeRadioInterfaceService(override val serviceScope: CoroutineScope = Main val sentToRadio = mutableListOf() var connectCalled = false + var restartTransportCalled: Boolean = false + private set override fun isMockTransport(): Boolean = true @@ -79,6 +81,10 @@ class FakeRadioInterfaceService(override val serviceScope: CoroutineScope = Main connectCalled = false } + override suspend fun restartTransport() { + restartTransportCalled = true + } + override fun getDeviceAddress(): String? = _currentDeviceAddressFlow.value override fun setDeviceAddress(deviceAddr: String?): Boolean { diff --git a/core/ui/src/commonMain/kotlin/org/meshtastic/core/ui/viewmodel/ConnectionsViewModel.kt b/core/ui/src/commonMain/kotlin/org/meshtastic/core/ui/viewmodel/ConnectionsViewModel.kt index af2b74d49..aae290fa7 100644 --- a/core/ui/src/commonMain/kotlin/org/meshtastic/core/ui/viewmodel/ConnectionsViewModel.kt +++ b/core/ui/src/commonMain/kotlin/org/meshtastic/core/ui/viewmodel/ConnectionsViewModel.kt @@ -27,16 +27,17 @@ import org.koin.core.annotation.KoinViewModel import org.meshtastic.core.model.ConnectionState import org.meshtastic.core.model.MyNodeInfo import org.meshtastic.core.model.Node -import org.meshtastic.core.repository.ConnectionStateProvider import org.meshtastic.core.repository.NodeRepository import org.meshtastic.core.repository.RadioConfigRepository +import org.meshtastic.core.repository.ServiceRepository import org.meshtastic.core.repository.UiPrefs import org.meshtastic.proto.Config import org.meshtastic.proto.LocalConfig /** - * Derived, UI-friendly summary of the device connection state. Combines [ConnectionStateProvider.connectionState] with - * "region unset" to surface the MUST_SET_REGION case that otherwise needs a separate boolean flag in the UI layer. + * Derived, UI-friendly summary of the device connection state. Combines [ServiceRepository.connectionState] with + * "region unset" and the [ServiceRepository.RECONNECTING_PROGRESS_TEXT] handshake-recovery signal to surface cases + * (MUST_SET_REGION, RECONNECTING) that otherwise need separate boolean flags in the UI layer. */ enum class ConnectionStatus { /** No device has been selected or we are otherwise disconnected. */ @@ -45,6 +46,12 @@ enum class ConnectionStatus { /** A device has been selected and we are working through bonding/handshake. */ CONNECTING, + /** + * Transport is recovering from a WiFi/TCP handshake stall (the watchdog tore the link down and is bringing it back + * up). Distinct from [NOT_CONNECTED] so the UI can show an in-progress recovery instead of a final failure. + */ + RECONNECTING, + /** Connected with node info available. */ CONNECTED, @@ -58,7 +65,7 @@ enum class ConnectionStatus { @KoinViewModel class ConnectionsViewModel( radioConfigRepository: RadioConfigRepository, - connectionStateProvider: ConnectionStateProvider, + serviceRepository: ServiceRepository, nodeRepository: NodeRepository, private val uiPrefs: UiPrefs, ) : ViewModel() { @@ -66,7 +73,7 @@ class ConnectionsViewModel( val localConfig: StateFlow = radioConfigRepository.localConfigFlow.stateInWhileSubscribed(initialValue = LocalConfig()) - val connectionState = connectionStateProvider.connectionState + val connectionState = serviceRepository.connectionState val myNodeInfo: StateFlow = nodeRepository.myNodeInfo @@ -95,18 +102,29 @@ class ConnectionsViewModel( .stateInWhileSubscribed(initialValue = false) /** - * Single source of truth for the UI's "connection status" pill/banner. Derived from [connectionState] and - * [regionUnset]; kept here rather than in the composable so the mapping is observable and testable. + * Single source of truth for the UI's "connection status" pill/banner. Derived from [connectionState], + * [ServiceRepository.connectionProgress], and [regionUnset]; kept here rather than in the composable so the mapping + * is observable and testable. + * + * The [ConnectionStatus.RECONNECTING] case is signalled by the WiFi/TCP handshake watchdog writing + * [ServiceRepository.RECONNECTING_PROGRESS_TEXT] to [ServiceRepository.connectionProgress] immediately before its + * recovery sibling transitions to [ConnectionState.Disconnected]. See + * [ServiceRepository.RECONNECTING_PROGRESS_TEXT] for the cross-track contract. */ val connectionStatus: StateFlow = - combine(connectionState, regionUnset) { state, unset -> + combine(connectionState, regionUnset, serviceRepository.connectionProgress) { state, unset, progress -> when (state) { is ConnectionState.Connected -> if (unset) ConnectionStatus.MUST_SET_REGION else ConnectionStatus.CONNECTED ConnectionState.Connecting -> ConnectionStatus.CONNECTING - ConnectionState.Disconnected -> ConnectionStatus.NOT_CONNECTED + ConnectionState.Disconnected -> + if (progress == ServiceRepository.RECONNECTING_PROGRESS_TEXT) { + ConnectionStatus.RECONNECTING + } else { + ConnectionStatus.NOT_CONNECTED + } ConnectionState.DeviceSleep -> ConnectionStatus.CONNECTED_SLEEPING } diff --git a/core/ui/src/commonTest/kotlin/org/meshtastic/core/ui/viewmodel/ConnectionsViewModelTest.kt b/core/ui/src/commonTest/kotlin/org/meshtastic/core/ui/viewmodel/ConnectionsViewModelTest.kt index ac4b8391a..2c1c121c9 100644 --- a/core/ui/src/commonTest/kotlin/org/meshtastic/core/ui/viewmodel/ConnectionsViewModelTest.kt +++ b/core/ui/src/commonTest/kotlin/org/meshtastic/core/ui/viewmodel/ConnectionsViewModelTest.kt @@ -16,6 +16,7 @@ */ package org.meshtastic.core.ui.viewmodel +import app.cash.turbine.test import dev.mokkery.MockMode import dev.mokkery.answering.returns import dev.mokkery.every @@ -27,8 +28,11 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.test.StandardTestDispatcher import kotlinx.coroutines.test.resetMain +import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.setMain +import org.meshtastic.core.model.ConnectionState import org.meshtastic.core.repository.RadioConfigRepository +import org.meshtastic.core.repository.ServiceRepository import org.meshtastic.core.repository.UiPrefs import org.meshtastic.core.testing.FakeNodeRepository import org.meshtastic.core.testing.FakeServiceRepository @@ -59,7 +63,7 @@ class ConnectionsViewModelTest { viewModel = ConnectionsViewModel( radioConfigRepository = radioConfigRepository, - connectionStateProvider = serviceRepository, + serviceRepository = serviceRepository, nodeRepository = nodeRepository, uiPrefs = uiPrefs, ) @@ -84,4 +88,67 @@ class ConnectionsViewModelTest { assertEquals(true, viewModel.hasShownNotPairedWarning.value) verify { uiPrefs.setHasShownNotPairedWarning(true) } } + + @Test + fun `Disconnected with Reconnecting progress maps to RECONNECTING`() = runTest { + viewModel.connectionStatus.test { + // Initial value from stateInWhileSubscribed. + assertEquals(ConnectionStatus.NOT_CONNECTED, awaitItem()) + + // Track A contract: progress is set BEFORE the Disconnected transition. + serviceRepository.setConnectionProgress(ServiceRepository.RECONNECTING_PROGRESS_TEXT) + assertEquals(ConnectionStatus.RECONNECTING, awaitItem()) + + // State catch-up: the Disconnected transition is a no-op because FakeServiceRepository + // starts Disconnected, and distinctUntilChanged suppresses the re-emission. + serviceRepository.setConnectionState(ConnectionState.Disconnected) + expectNoEvents() + + cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `Connecting state maps to CONNECTING regardless of progress text`() = runTest { + viewModel.connectionStatus.test { + assertEquals(ConnectionStatus.NOT_CONNECTED, awaitItem()) + + // Progress set first would transiently surface RECONNECTING while state is still Disconnected. + serviceRepository.setConnectionProgress(ServiceRepository.RECONNECTING_PROGRESS_TEXT) + assertEquals(ConnectionStatus.RECONNECTING, awaitItem()) + + // The Connecting transition overrides progress: Connecting always maps to CONNECTING. + serviceRepository.setConnectionState(ConnectionState.Connecting) + assertEquals(ConnectionStatus.CONNECTING, awaitItem()) + + cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `Disconnected without Reconnecting progress stays NOT_CONNECTED`() = runTest { + viewModel.connectionStatus.test { + assertEquals(ConnectionStatus.NOT_CONNECTED, awaitItem()) + + serviceRepository.setConnectionProgress("Downloading Node DB...") + serviceRepository.setConnectionState(ConnectionState.Disconnected) + // No emission: state and progress resolve to NOT_CONNECTED, equal to the current value. + expectNoEvents() + + cancelAndIgnoreRemainingEvents() + } + } + + /** + * Cross-track contract pin: Track A (MeshConnectionManagerImpl.runSiblingHandshakeRecovery) writes the literal + * "Reconnecting…" (with U+2026) to ServiceRepository.connectionProgress. This constant is what Track C compares + * against. If either side changes (e.g., localization, ASCII normalization), the UI would silently fall back to + * NOT_CONNECTED instead of RECONNECTING. This test pins the canonical constant in [ServiceRepository]; combined + * with the existing ordering test in MeshConnectionManagerImplTest that asserts the same literal is set, this + * transitively enforces the contract via the shared constant. + */ + @Test + fun `RECONNECTING_PROGRESS_TEXT pins the cross-track literal value`() { + assertEquals("Reconnecting\u2026", ServiceRepository.RECONNECTING_PROGRESS_TEXT) + } } diff --git a/desktopApp/src/main/kotlin/org/meshtastic/desktop/stub/NoopStubs.kt b/desktopApp/src/main/kotlin/org/meshtastic/desktop/stub/NoopStubs.kt index f8b96faba..0b576d77e 100644 --- a/desktopApp/src/main/kotlin/org/meshtastic/desktop/stub/NoopStubs.kt +++ b/desktopApp/src/main/kotlin/org/meshtastic/desktop/stub/NoopStubs.kt @@ -90,6 +90,10 @@ class NoopRadioInterfaceService : RadioInterfaceService { logWarn("NoopRadioInterfaceService.disconnect()") } + override suspend fun restartTransport() { + logWarn("NoopRadioInterfaceService.restartTransport()") + } + override fun getDeviceAddress(): String? = null override fun setDeviceAddress(deviceAddr: String?): Boolean = false diff --git a/feature/connections/src/commonMain/kotlin/org/meshtastic/feature/connections/ui/components/ConnectingDeviceInfo.kt b/feature/connections/src/commonMain/kotlin/org/meshtastic/feature/connections/ui/components/ConnectingDeviceInfo.kt index e27939314..a3aff424b 100644 --- a/feature/connections/src/commonMain/kotlin/org/meshtastic/feature/connections/ui/components/ConnectingDeviceInfo.kt +++ b/feature/connections/src/commonMain/kotlin/org/meshtastic/feature/connections/ui/components/ConnectingDeviceInfo.kt @@ -39,6 +39,7 @@ import org.meshtastic.core.resources.connected_sleeping import org.meshtastic.core.resources.connecting import org.meshtastic.core.resources.must_set_region import org.meshtastic.core.resources.not_connected +import org.meshtastic.core.resources.reconnecting import org.meshtastic.core.ui.viewmodel.ConnectionStatus /** @@ -59,6 +60,7 @@ fun ConnectingDeviceInfo( ConnectionStatus.CONNECTED -> stringResource(Res.string.connected) ConnectionStatus.MUST_SET_REGION -> stringResource(Res.string.must_set_region) ConnectionStatus.CONNECTING -> connectionProgress ?: stringResource(Res.string.connecting) + ConnectionStatus.RECONNECTING -> stringResource(Res.string.reconnecting) ConnectionStatus.CONNECTED_SLEEPING -> stringResource(Res.string.connected_sleeping) ConnectionStatus.NOT_CONNECTED -> stringResource(Res.string.not_connected) }