fix(service): recover stalled WiFi/TCP handshakes by cycling active transport (#5856)

This commit is contained in:
Jeremiah K
2026-06-20 14:03:48 -05:00
committed by GitHub
parent 63dbbddb0f
commit 3ca87fa032
18 changed files with 2291 additions and 123 deletions

View File

@@ -513,6 +513,7 @@ environment_metrics_update_interval_seconds
environment_metrics_use_fahrenheit
error
error_duty_cycle
error_recovery_exhausted
establish_session
establishing_session
ethernet_config
@@ -1133,6 +1134,7 @@ rebroadcast_mode_known_only_desc
rebroadcast_mode_local_only_desc
rebroadcast_mode_none_desc
recent_network_devices
reconnecting
red
refresh
refresh_metadata
@@ -1411,6 +1413,23 @@ traceroute_route_towards_dest
traceroute_showing_nodes
track_and_share_locations
track_point
### TRAFFIC ###
traffic_management
traffic_management_config
traffic_management_drop_unknown_enabled
traffic_management_enabled
traffic_management_exhaust_hop_position
traffic_management_exhaust_hop_telemetry
traffic_management_nodeinfo_direct_response
traffic_management_nodeinfo_direct_response_max_hops
traffic_management_position_dedup
traffic_management_position_min_interval
traffic_management_position_precision
traffic_management_rate_limit_enabled
traffic_management_rate_limit_max_packets
traffic_management_rate_limit_window
traffic_management_router_preserve_hops
traffic_management_unknown_packet_threshold
transmit_over_lora
transport_ble
transport_tcp

View File

@@ -18,11 +18,13 @@ package org.meshtastic.core.data.manager
import co.touchlab.kermit.Logger
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import org.koin.core.annotation.Named
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.safeCatching
import org.meshtastic.core.model.ConnectionState
import org.meshtastic.core.model.DeviceVersion
import org.meshtastic.core.repository.CommandSender
@@ -77,10 +79,14 @@ class MeshConfigFlowManagerImpl(
* Stage 1: receiving device config, module config, channels, and metadata.
*
* [rawMyNodeInfo] arrives first (my_info packet); [metadata] may arrive shortly after. Both are consumed
* together by [buildMyNodeInfo] at Stage 1 completion.
* together by [buildMyNodeInfo] at Stage 1 completion. Some firmware/network paths can deliver NodeInfo before
* the Stage 2 request; keep those packets so the later node-list phase can still make progress.
*/
data class ReceivingConfig(val rawMyNodeInfo: ProtoMyNodeInfo, val metadata: DeviceMetadata? = null) :
HandshakeState()
data class ReceivingConfig(
val rawMyNodeInfo: ProtoMyNodeInfo,
val metadata: DeviceMetadata? = null,
val earlyNodes: List<NodeInfo> = emptyList(),
) : HandshakeState()
/**
* Stage 2: receiving node-info packets from the firmware.
@@ -95,13 +101,18 @@ class MeshConfigFlowManagerImpl(
data class Complete(val myNodeInfo: SharedMyNodeInfo) : HandshakeState()
}
private var handshakeState: HandshakeState = HandshakeState.Idle
private val handshakeState = atomic<HandshakeState>(HandshakeState.Idle)
override val newNodeCount: Int
get() = (handshakeState as? HandshakeState.ReceivingNodeInfo)?.nodes?.size ?: 0
get() =
when (val state = handshakeState.value) {
is HandshakeState.ReceivingConfig -> state.earlyNodes.size
is HandshakeState.ReceivingNodeInfo -> state.nodes.size
else -> 0
}
override fun handleConfigComplete(configCompleteId: Int) {
val state = handshakeState
val state = handshakeState.value
when (configCompleteId) {
HandshakeConstants.CONFIG_NONCE -> {
if (state !is HandshakeState.ReceivingConfig) {
@@ -129,7 +140,7 @@ class MeshConfigFlowManagerImpl(
val finalizedInfo = buildMyNodeInfo(state.rawMyNodeInfo, state.metadata)
if (finalizedInfo == null) {
Logger.w { "Stage 1 failed: could not build MyNodeInfo, retrying Stage 1" }
handshakeState = HandshakeState.Idle
handshakeState.value = HandshakeState.Idle
scope.handledLaunch {
delay(wantConfigDelay)
connectionManager.value.startConfigOnly()
@@ -149,9 +160,10 @@ class MeshConfigFlowManagerImpl(
}
}
handshakeState = HandshakeState.ReceivingNodeInfo(myNodeInfo = finalizedInfo)
Logger.i { "myNodeInfo committed (nodeNum=${finalizedInfo.myNodeNum})" }
handshakeState.value = HandshakeState.ReceivingNodeInfo(myNodeInfo = finalizedInfo, nodes = state.earlyNodes)
Logger.i { "myNodeInfo committed" }
connectionManager.value.onRadioConfigLoaded()
serviceStateWriter.setConnectionProgress("Loading node list")
scope.handledLaunch {
delay(wantConfigDelay)
@@ -160,6 +172,7 @@ class MeshConfigFlowManagerImpl(
Logger.i { "Requesting NodeInfo (Stage 2)" }
connectionManager.value.startNodeInfoOnly()
}
connectionManager.value.onHandshakeProgress()
}
private fun handleNodeInfoComplete(state: HandshakeState.ReceivingNodeInfo) {
@@ -171,7 +184,15 @@ class MeshConfigFlowManagerImpl(
// The async work below (DB writes, broadcasts) proceeds without the guard.
// Because nodes is now immutable, no snapshot is needed — state.nodes IS the snapshot.
// Any stall-guard retry that re-enters handleNodeInfo will see Complete state and be ignored.
handshakeState = HandshakeState.Complete(myNodeInfo = info)
handshakeState.value = HandshakeState.Complete(myNodeInfo = info)
// Cancel the transport-aware fast-recovery watchdog SYNCHRONOUSLY, before the async DB
// install work below is launched. The firmware handshake has already completed at this
// point (NODE_INFO_NONCE received); a slow Room commit on a large mesh would otherwise
// falsely trip the 12s fast-recovery timeout before onNodeDbReady() gets a chance to
// cancel it. onNodeDbReady() still performs the same cancel as part of its larger post-
// NodeDB side-effect set, but it runs only after the DB install block finishes.
connectionManager.value.onHandshakeComplete()
val entities =
state.nodes.mapNotNull { nodeInfo ->
@@ -184,20 +205,39 @@ class MeshConfigFlowManagerImpl(
}
scope.handledLaunch {
nodeRepository.installConfig(info, entities)
analytics.setDeviceAttributes(info.firmwareVersion ?: "unknown", info.model ?: "unknown")
try {
nodeRepository.installConfig(info, entities)
} catch (e: CancellationException) {
throw e
} catch (@Suppress("TooGenericExceptionCaught") e: Exception) {
Logger.e(e) { "Post-handshake NodeDB install failed; restarting transport to recover" }
nodeManager.setNodeDbReady(false)
nodeManager.setAllowNodeDbWrites(false)
connectionManager.value.recoverPostHandshakeFailure()
return@handledLaunch
}
nodeManager.setNodeDbReady(true)
nodeManager.setAllowNodeDbWrites(true)
serviceStateWriter.setConnectionState(ConnectionState.Connected)
connectionManager.value.onNodeDbReady()
safeCatching { analytics.setDeviceAttributes(info.firmwareVersion ?: "unknown", info.model ?: "unknown") }
.onFailure { e -> Logger.w(e) { "Failed to set post-handshake analytics attributes" } }
safeCatching { connectionManager.value.onNodeDbReady() }
.onFailure { e -> Logger.e(e) { "Post-connected onNodeDbReady side effects failed" } }
}
// Note: onHandshakeProgress() is intentionally NOT called here. By this point the
// handshake has reached HandshakeState.Complete and the synchronous onHandshakeComplete()
// call above has already cancelled the watchdog. Re-arming via onHandshakeProgress()
// would be both semantically wrong and wasted work. The remaining onHandshakeProgress
// sites cover all genuine progress.
}
override fun handleMyInfo(myInfo: ProtoMyNodeInfo) {
Logger.i { "MyNodeInfo received: ${myInfo.my_node_num}" }
Logger.i { "MyNodeInfo received" }
// Transition to Stage 1, discarding any stale data from a prior interrupted handshake.
handshakeState = HandshakeState.ReceivingConfig(rawMyNodeInfo = myInfo)
handshakeState.value = HandshakeState.ReceivingConfig(rawMyNodeInfo = myInfo)
nodeManager.setMyNodeNum(myInfo.my_node_num)
nodeManager.setFirmwareEdition(myInfo.firmware_edition)
applyEventFirmwareNotificationDefaults(myInfo.firmware_edition)
@@ -220,35 +260,47 @@ class MeshConfigFlowManagerImpl(
radioConfigRepository.clearDeviceUIConfig()
radioConfigRepository.clearFileManifest()
}
connectionManager.value.onHandshakeProgress()
}
override fun handleLocalMetadata(metadata: DeviceMetadata) {
Logger.i { "Local Metadata received: ${metadata.firmware_version}" }
val state = handshakeState
val state = handshakeState.value
if (state is HandshakeState.ReceivingConfig) {
handshakeState = state.copy(metadata = metadata)
handshakeState.value = state.copy(metadata = metadata)
// Persist the metadata immediately — buildMyNodeInfo() reads it at Stage 1 complete,
// but the DB write does not need to wait until then.
if (metadata != DeviceMetadata()) {
scope.handledLaunch { nodeRepository.insertMetadata(state.rawMyNodeInfo.my_node_num, metadata) }
}
connectionManager.value.onHandshakeProgress()
} else {
Logger.w { "Ignoring metadata outside Stage 1 (state=$state)" }
}
}
override fun handleNodeInfo(info: NodeInfo) {
val state = handshakeState
if (state is HandshakeState.ReceivingNodeInfo) {
handshakeState = state.copy(nodes = state.nodes + info)
} else {
Logger.w { "Ignoring NodeInfo outside Stage 2 (state=$state)" }
val state = handshakeState.value
when (state) {
is HandshakeState.ReceivingConfig -> {
Logger.d { "Buffering NodeInfo received during Stage 1" }
handshakeState.value = state.copy(earlyNodes = state.earlyNodes.withNodeInfo(info))
connectionManager.value.onHandshakeProgress()
}
is HandshakeState.ReceivingNodeInfo -> {
handshakeState.value = state.copy(nodes = state.nodes.withNodeInfo(info))
connectionManager.value.onHandshakeProgress()
}
else -> Logger.w { "Ignoring NodeInfo outside active handshake (state=$state)" }
}
}
override fun handleFileInfo(info: FileInfo) {
Logger.d { "FileInfo received: ${info.file_name} (${info.size_bytes} bytes)" }
scope.handledLaunch { radioConfigRepository.addFileInfo(info) }
connectionManager.value.onHandshakeProgress()
}
override fun triggerWantConfig() {
@@ -305,3 +357,12 @@ class MeshConfigFlowManagerImpl(
}
}
}
private fun List<NodeInfo>.withNodeInfo(info: NodeInfo): List<NodeInfo> {
val index = indexOfFirst { it.num == info.num }
return if (index >= 0) {
toMutableList().apply { this[index] = info }
} else {
this + info
}
}

View File

@@ -26,6 +26,7 @@ import org.koin.core.annotation.Named
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.repository.MeshConfigHandler
import org.meshtastic.core.repository.MeshConnectionManager
import org.meshtastic.core.repository.NodeManager
import org.meshtastic.core.repository.RadioConfigRepository
import org.meshtastic.core.repository.ServiceStateWriter
@@ -41,6 +42,7 @@ class MeshConfigHandlerImpl(
private val radioConfigRepository: RadioConfigRepository,
private val serviceStateWriter: ServiceStateWriter,
private val nodeManager: NodeManager,
private val connectionManager: Lazy<MeshConnectionManager>,
@Named("ServiceScope") private val scope: CoroutineScope,
) : MeshConfigHandler {
@@ -59,6 +61,7 @@ class MeshConfigHandlerImpl(
Logger.d { "Device config received: ${config.summarize()}" }
scope.handledLaunch { radioConfigRepository.setLocalConfig(config) }
serviceStateWriter.setConnectionProgress("Device config received")
connectionManager.value.onHandshakeProgress()
}
override fun handleModuleConfig(config: ModuleConfig) {
@@ -69,6 +72,7 @@ class MeshConfigHandlerImpl(
config.statusmessage?.let { sm ->
nodeManager.myNodeNum.value?.let { num -> nodeManager.updateNodeStatus(num, sm.node_status) }
}
connectionManager.value.onHandshakeProgress()
}
override fun handleChannel(channel: Channel) {
@@ -83,11 +87,16 @@ class MeshConfigHandlerImpl(
} else {
serviceStateWriter.setConnectionProgress("Channels (${index + 1})")
}
connectionManager.value.onHandshakeProgress()
}
override fun handleDeviceUIConfig(config: DeviceUIConfig) {
Logger.d { "DeviceUI config received" }
scope.handledLaunch { radioConfigRepository.setDeviceUIConfig(config) }
// deviceuiConfig arrives during Stage 1 immediately after my_info. It proves the transport
// is alive, so surface it as handshake progress — without this, a long gap before the next
// meaningful packet could falsely trip the fast-path watchdog on TCP/USB.
connectionManager.value.onHandshakeProgress()
}
}

View File

@@ -17,6 +17,8 @@
package org.meshtastic.core.data.manager
import co.touchlab.kermit.Logger
import co.touchlab.kermit.Severity
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
@@ -32,6 +34,7 @@ import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.common.util.nowSeconds
import org.meshtastic.core.common.util.safeCatchingAll
import org.meshtastic.core.model.ConnectionState
import org.meshtastic.core.model.DeviceType
import org.meshtastic.core.model.TelemetryType
@@ -55,6 +58,9 @@ import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.core.repository.ServiceRepository
import org.meshtastic.core.repository.SessionManager
import org.meshtastic.core.repository.UiPrefs
import org.meshtastic.core.resources.Res
import org.meshtastic.core.resources.error_recovery_exhausted
import org.meshtastic.core.resources.getStringSuspend
import org.meshtastic.proto.AdminMessage
import org.meshtastic.proto.Config
import org.meshtastic.proto.Telemetry
@@ -96,7 +102,30 @@ class MeshConnectionManagerImpl(
private var preHandshakeJob: Job? = null
private var sleepTimeout: Job? = null
private var locationRequestsJob: Job? = null
private var handshakeTimeout: Job? = null
private val handshakeTimeout = atomic<Job?>(null)
/**
* One-way latch set by [onHandshakeComplete] and cleared at the start of each fresh handshake in [handleConnected].
*
* Prevents late-arriving handshake-progress packets (e.g. a FileInfo that lands between NODE_INFO_NONCE and the
* completion of the async Room DB install) from re-arming the fast watchdog that [onHandshakeComplete] already
* cancelled. Without this latch, those late packets would re-introduce the slow-DB false-trip on large meshes that
* [onHandshakeComplete] was added to prevent: the app state is still Connecting during that window, so the existing
* Connecting-state guard inside [onHandshakeProgress] is insufficient on its own.
*/
private val handshakeCompleteLatch = atomic(false)
/**
* Consecutive handshake-recovery failure count for [runSiblingHandshakeRecovery].
*
* Incremented atomically on each recovery attempt. Reset to 0 by [onHandshakeComplete] (a successful handshake
* breaks the failure streak) and also reset when the cap is reached and a sticky error is surfaced (so a manual
* user retry starts fresh). When this reaches [MAX_CONSECUTIVE_RECOVERY_FAILURES], recovery stops and the sticky
* error is surfaced instead of silently retrying indefinitely.
*/
private val consecutiveRecoveryFailures = atomic(0)
private var connectTimeMsec = 0L
private var connectionRestored = false
@@ -163,33 +192,40 @@ class MeshConnectionManagerImpl(
onConnectionChanged(effectiveState)
}
private suspend fun onConnectionChanged(c: ConnectionState) = connectionMutex.withLock {
val current = serviceRepository.connectionState.value
if (current == c) return@withLock
private suspend fun onConnectionChanged(c: ConnectionState, fromState: ConnectionState? = null): Boolean =
connectionMutex.withLock {
val current = serviceRepository.connectionState.value
if (fromState != null && current != fromState) {
Logger.d { "Skipping connection transition $current -> $c, expected current state $fromState" }
return@withLock false
}
if (current == c) return@withLock false
// If the transport reports 'Connected', but we are already in the middle of a handshake (Connecting)
if (c is ConnectionState.Connected && current is ConnectionState.Connecting) {
Logger.d { "Ignoring redundant transport connection signal while handshake is in progress" }
return@withLock
// If the transport reports 'Connected', but we are already in the middle of a handshake (Connecting)
if (c is ConnectionState.Connected && current is ConnectionState.Connecting) {
Logger.d { "Ignoring redundant transport connection signal while handshake is in progress" }
return@withLock false
}
Logger.i { "onConnectionChanged: $current -> $c" }
sleepTimeout?.cancel()
sleepTimeout = null
preHandshakeJob?.cancel()
preHandshakeJob = null
// Collapse cancel+clear into one atomic swap so a concurrent re-arm cannot
// orphan a job in the gap between cancel and reassign.
handshakeTimeout.getAndSet(null)?.cancel()
when (c) {
is ConnectionState.Connecting -> serviceRepository.setConnectionState(ConnectionState.Connecting)
is ConnectionState.Connected -> handleConnected()
is ConnectionState.DeviceSleep -> handleDeviceSleep()
is ConnectionState.Disconnected -> handleDisconnected()
}
true
}
Logger.i { "onConnectionChanged: $current -> $c" }
sleepTimeout?.cancel()
sleepTimeout = null
preHandshakeJob?.cancel()
preHandshakeJob = null
handshakeTimeout?.cancel()
handshakeTimeout = null
when (c) {
is ConnectionState.Connecting -> serviceRepository.setConnectionState(ConnectionState.Connecting)
is ConnectionState.Connected -> handleConnected()
is ConnectionState.DeviceSleep -> handleDeviceSleep()
is ConnectionState.Disconnected -> handleDisconnected()
}
}
private fun handleConnected() {
// Track whether this connection was restored from device sleep (vs. a fresh connect),
// matching Apple's "connectionRestored" attribute for cross-platform DataDog parity.
@@ -199,7 +235,13 @@ class MeshConnectionManagerImpl(
serviceRepository.setConnectionState(ConnectionState.Connecting)
}
connectTimeMsec = nowMillis
// A fresh handshake is starting — clear the completion latch so progress signals during
// this handshake can re-arm the fast watchdog. The latch is set by onHandshakeComplete()
// and only matters in the window between Stage 2 completion and the subsequent Connected
// transition; without this reset, a recovery-restarted handshake would have its progress
// signals ignored (latch left set from the prior failed cycle). This covers both initial
// user-initiated connects and transport-restart recovery siblings.
handshakeCompleteLatch.value = false
// Send a wake-up heartbeat before the config request. The firmware may be in a
// power-saving state where the NimBLE callback context needs warming up. The 100ms
// delay ensures the heartbeat BLE write is enqueued before the want_config_id
@@ -213,27 +255,156 @@ class MeshConnectionManagerImpl(
}
}
private fun startHandshakeStallGuard(stage: Int, timeout: Duration, action: () -> Unit) {
handshakeTimeout?.cancel()
handshakeTimeout =
scope.handledLaunch {
delay(timeout)
if (serviceRepository.connectionState.value is ConnectionState.Connecting) {
// Attempt one retry. Note: the firmware silently drops identical consecutive
// writes (per-connection dedup). If the first want_config_id was received and
// the stall is on our side, the retry will be dropped and the reconnect below
// will trigger instead — which is the right recovery in that case.
Logger.w {
"Handshake stall detected at Stage $stage — retrying, then reconnecting if still stalled"
private fun startHandshakeStallGuard(stage: Int, timeout: Duration) {
val fastTransport = isFastRecoveryTransport()
// On TCP/USB the firmware handshake completes in roughly 1s when healthy (logs show),
// while a wedged socket takes the full ~30s transport read timeout without any further
// progress. The aggressive 12s fast timeout recovers a stuck session quickly; BLE keeps
// the original generous budget because its GATT latency is high and variable.
val effectiveTimeout = if (fastTransport) FAST_HANDSHAKE_TIMEOUT else timeout
val transportLabel = if (fastTransport) "fast transport" else "BLE"
// Collapse cancel+reassign into one atomic swap so a concurrent re-arm cannot orphan a
// job in the gap between cancel and reassign.
handshakeTimeout
.getAndSet(
scope.handledLaunch {
delay(effectiveTimeout)
if (serviceRepository.connectionState.value !is ConnectionState.Connecting) {
return@handledLaunch
}
action()
delay(HANDSHAKE_RETRY_TIMEOUT)
if (serviceRepository.connectionState.value is ConnectionState.Connecting) {
Logger.e { "Handshake still stalled after retry, forcing reconnect" }
onConnectionChanged(ConnectionState.Disconnected)
// A clean transport restart is the ONLY safe stall recovery on every transport.
// The previous BLE branch re-sent want_config mid-session via action() here;
// that re-send is now deliberately removed because firmware's handleStartConfig()
// has no in-flight guard, so a second want_config on the same session re-enters
// it and crashes the firmware (reproduced on T-Beam v2.7.25.104df5f in QA). The
// firmware per-write dedup that was supposed to drop the retry is single-slot
// (memcmp vs the previous write only), and interleaved heartbeats mean the re-sent
// nonce is not byte-identical to the prior write — so it slips past dedup and
// re-enters handleStartConfig(). A clean transport restart creates a fresh
// session and re-enters the handshake naturally, which is both safer and more
// deterministic than a same-session retry.
Logger.e {
"Handshake stall detected at Stage $stage on $transportLabel" +
"requesting forced transport restart"
}
runSiblingHandshakeRecovery()
},
)
?.cancel()
}
/**
* Launches the deterministic two-phase stall-recovery sibling used by [startHandshakeStallGuard] on every
* transport.
*
* Phase 1 flips the app-level state from Connecting to Disconnected first, guarded by the connection mutex so a
* just-completed handshake cannot be torn down after winning the race. Phase 2 then calls
* [RadioInterfaceService.restartTransport], whose emissions (DeviceSleep → Connected) now arrive from app-level
* Disconnected, bypass the redundant-Connecting guard in [onConnectionChanged], and re-enter [handleConnected] to
* restart the handshake cleanly.
*
* We MUST NOT call [onConnectionChanged] from the [handshakeTimeout] coroutine after launching the sibling:
* [onConnectionChanged] cancels handshakeTimeout (the very job running this code), and any work chained after the
* launch is not guaranteed to run. We MUST ALSO NOT leave the explicit Disconnected call in this coroutine after
* the sibling launch — otherwise the sibling's restart emissions (DeviceSleep, then Connected) can arrive while the
* app-level state is still Connecting, causing [onConnectionChanged]'s redundant-Connected-while-Connecting guard
* to ignore the fresh Connected emission. That leaves the app Disconnected while transport is Connected — the same
* split-brain this restart path is meant to break.
*
* By the time the sibling runs, handshakeTimeout has already completed naturally (it launched the sibling and
* returned), so the cancellation [onConnectionChanged] would attempt is a no-op on an already-completed job — and
* because the sibling is parented to `scope`, not to handshakeTimeout, it survives independently.
*
* Concurrent-recovery protection is layered across three independent mechanisms, each sufficient on its own:
* 1. The [connectionMutex] serializes the [onConnectionChanged]`(Disconnected, fromState=Connecting)` call inside
* the sibling. If a concurrent sibling already transitioned the app state to Disconnected, a second caller's
* `fromState=Connecting` precondition fails and [onConnectionChanged] returns `false`, so the `if (disconnected
* && ...)` gate skips `restartTransport()`.
* 2. The transport-level `isRestarting` CAS inside `SharedRadioInterfaceService.restartTransport()` provides
* authoritative dedup at the layer that actually tears down and re-creates the transport, independent of the
* app-level state machine above it.
* 3. The atomic [consecutiveRecoveryFailures]`getAndIncrement()` ensures each concurrent caller observes a unique
* `priorFailures` slot, so the give-up decision (and the backoff duration) is race-free even if two siblings
* race the same stall window.
*
* Backoff and give-up cap: each call atomically increments [consecutiveRecoveryFailures]. Prior failures trigger an
* exponential [delay] (2 s base, doubling each failure, capped at 30 s) before the transport restart so a node that
* keeps crashing under handshake re-entry is not re-driven in a tight loop. After
* [MAX_CONSECUTIVE_RECOVERY_FAILURES] consecutive failed recoveries, recovery stops and a sticky user-facing error
* is surfaced (Disconnected + error message) instead of silently retrying indefinitely; the user must manually
* re-select the node to retry, which resets the streak. The streak is also reset to 0 by [onHandshakeComplete] (a
* successful handshake).
*/
private fun runSiblingHandshakeRecovery() {
// Atomically claim a failure slot. getAndIncrement guarantees each concurrent caller
// observes a unique priorFailures value, so the give-up decision is race-free.
val priorFailures = consecutiveRecoveryFailures.getAndIncrement()
scope.handledLaunch {
// After MAX_CONSECUTIVE_RECOVERY_FAILURES consecutive failed recoveries, stop retrying
// and surface a sticky error. A node that keeps failing the handshake is likely
// crashing firmware under re-entry (reproduced on T-Beam v2.7.25.104df5f), and
// re-driving it indefinitely only makes things worse. Reset the counter here so a
// manual retry from the user starts a fresh streak.
if (priorFailures >= MAX_CONSECUTIVE_RECOVERY_FAILURES) {
Logger.e {
"Handshake recovery exhausted after $MAX_CONSECUTIVE_RECOVERY_FAILURES consecutive " +
"failures; surfacing sticky error"
}
consecutiveRecoveryFailures.value = 0
serviceRepository.setConnectionProgress("")
onConnectionChanged(ConnectionState.Disconnected, fromState = ConnectionState.Connecting)
// safeCatchingAll swallows Skiko ExceptionInInitializerError on headless JVM tests
// where compose-resources can't load native libs. Production resolves the localized
// string normally; tests fall back to empty and setErrorMessage is still called.
val errorMessage =
safeCatchingAll { getStringSuspend(Res.string.error_recovery_exhausted) }.getOrDefault("")
serviceRepository.setErrorMessage(errorMessage, Severity.Error)
return@handledLaunch
}
// Exponential backoff before the next attempt: 2 s base, doubling for each prior
// failure, hard-capped at 30 s. Skipped on the first attempt (no prior failures).
// The delay is parented to `scope`, so it is cancelled cleanly if the user
// disconnects, navigates away, or the service shuts down (structured concurrency).
if (priorFailures > 0) {
val backoffSeconds =
(RECOVERY_BACKOFF_BASE_SECONDS shl (priorFailures - 1)).coerceAtMost(RECOVERY_BACKOFF_CAP_SECONDS)
Logger.w {
"Handshake recovery backoff: waiting ${backoffSeconds}s before retry (attempt ${priorFailures + 1})"
}
delay(backoffSeconds.seconds)
}
// Re-check state after backoff: the user may have disconnected during the delay.
if (serviceRepository.connectionState.value !is ConnectionState.Connecting) return@handledLaunch
// Surface the forced-recovery progress to the UI before the app-level Disconnected
// transition lands, so the user sees "Reconnecting…" rather than a stale
// "Loading node list" while the transport is being torn down and re-established.
//
// This progress is intentionally NOT cleared on the recovery's Disconnected window
// (i.e. NOT in handleDisconnected or this sibling). If recovery fails permanently,
// "Reconnecting…" may persist on the Disconnected screen until the user retries or
// navigates away. That leak is acceptable UX: the transport restart has been
// requested and may still be in flight (restartTransport is one-shot — if the fresh
// transport also fails, nothing here retries automatically; transport-level
// network-recovery listeners may independently re-bring-up the transport, and the
// user can manually retry). Clearing it here would race the deliberate UX signal:
// handleDisconnected runs synchronously after this call inside the same
// onConnectionChanged transition, so any clear there would clobber the signal before
// restartTransport runs. Stale progress is instead cleared at the first downstream
// setConnectionProgress call during the recovery handshake (e.g. "Device config
// received" or "Loading node list"), not by handleConnected itself — and at the
// ViewModel level the Connecting state already dominates progress (the CONNECTING
// status ignores the progress string), so the UI is correct regardless.
serviceRepository.setConnectionProgress(ServiceRepository.RECONNECTING_PROGRESS_TEXT)
val disconnected = onConnectionChanged(ConnectionState.Disconnected, fromState = ConnectionState.Connecting)
if (disconnected && serviceRepository.connectionState.value is ConnectionState.Disconnected) {
radioInterfaceService.restartTransport()
}
}
}
override fun recoverPostHandshakeFailure() {
Logger.w { "Recovering from post-handshake failure by restarting transport" }
runSiblingHandshakeRecovery()
}
private fun tearDownConnection() {
@@ -288,15 +459,13 @@ class MeshConnectionManagerImpl(
}
override fun startConfigOnly() {
val action = { packetHandler.sendToRadio(ToRadio(want_config_id = HandshakeConstants.CONFIG_NONCE)) }
startHandshakeStallGuard(1, HANDSHAKE_TIMEOUT_STAGE1, action)
action()
startHandshakeStallGuard(1, HANDSHAKE_TIMEOUT_STAGE1)
packetHandler.sendToRadio(ToRadio(want_config_id = HandshakeConstants.CONFIG_NONCE))
}
override fun startNodeInfoOnly() {
val action = { packetHandler.sendToRadio(ToRadio(want_config_id = HandshakeConstants.NODE_INFO_NONCE)) }
startHandshakeStallGuard(2, HANDSHAKE_TIMEOUT_STAGE2, action)
action()
startHandshakeStallGuard(2, HANDSHAKE_TIMEOUT_STAGE2)
packetHandler.sendToRadio(ToRadio(want_config_id = HandshakeConstants.NODE_INFO_NONCE))
}
override fun onRadioConfigLoaded() {
@@ -313,11 +482,11 @@ class MeshConnectionManagerImpl(
}
override suspend fun onNodeDbReady() {
handshakeTimeout?.cancel()
handshakeTimeout = null
// Collapse cancel+clear into one atomic swap so a concurrent re-arm cannot
// orphan a job in the gap between cancel and reassign.
handshakeTimeout.getAndSet(null)?.cancel()
val myNodeNum = nodeManager.myNodeNum.value ?: 0
// Set device time now that the full node picture is ready. Sending this during Stage 1
// (onRadioConfigLoaded) introduced GATT write contention with the Stage 2 node-info burst.
commandSender.sendAdmin(myNodeNum) { AdminMessage(set_time_only = nowSeconds.toInt()) }
@@ -352,6 +521,30 @@ class MeshConnectionManagerImpl(
commandSender.requestTelemetry(commandSender.generatePacketId(), myNodeNum, TelemetryType.DEVICE.ordinal)
}
/**
* Synchronously cancels the transport-aware handshake watchdog the moment Stage 2 completes (NODE_INFO_NONCE
* received). Does NOT replicate [onNodeDbReady]'s post-NodeDB side effects (analytics, MQTT start, history replay,
* telemetry requests) — those remain gated on [onNodeDbReady] at the end of the async DB install block.
*
* See [MeshConnectionManager.onHandshakeComplete] for the full rationale.
*/
override fun onHandshakeComplete() {
// Collapse cancel+clear into one atomic swap so a concurrent re-arm cannot orphan a
// job in the gap between cancel and reassign.
handshakeTimeout.getAndSet(null)?.cancel()
// Set the completion latch so late-arriving progress packets (e.g. FileInfo that lands
// between NODE_INFO_NONCE and the async Room DB install completion) cannot re-arm the
// fast watchdog we just cancelled. Without this latch, those late packets would
// re-introduce the slow-DB false-trip on large meshes that this method was added to
// prevent — the app state is still Connecting during that window, so the Connecting-
// state guard inside onHandshakeProgress() is insufficient on its own. The latch is
// cleared at the start of the next fresh handshake in handleConnected().
handshakeCompleteLatch.value = true
// A successful handshake breaks the recovery failure streak — reset the consecutive
// failure counter so the next stall starts from a fresh count.
consecutiveRecoveryFailures.value = 0
}
private fun reportConnection() {
val myNode = nodeManager.getMyNodeInfo()
val radioModel = DataPair(KEY_RADIO_MODEL, myNode?.model ?: "unknown")
@@ -378,6 +571,45 @@ class MeshConnectionManagerImpl(
updateStatusNotification(t)
}
/**
* True when the active transport is a TCP or USB serial connection — i.e. a transport whose firmware handshake
* reliably completes in roughly 1s when healthy and therefore benefits from aggressive silent-restart on stall.
* Uses the same [DeviceType.fromAddress] pattern as [reportConnection] for transport classification. BLE is
* excluded because its GATT latency budget is high and variable enough that the long-and-retry stall-guard budgets
* remain the right trade-off.
*/
private fun isFastRecoveryTransport(): Boolean =
radioInterfaceService.getDeviceAddress()?.let { DeviceType.fromAddress(it) } in FAST_RECOVERY_TYPES
override fun onHandshakeProgress() {
// Arm only inside the fast-recovery envelope, while Connecting, before the completion latch
// has fired. BLE retains the long stall-guard budget because its GATT latency is variable.
val shouldArmFastWatchdog =
isFastRecoveryTransport() &&
serviceRepository.connectionState.value is ConnectionState.Connecting &&
!handshakeCompleteLatch.value
if (!shouldArmFastWatchdog) return
// Atomic swap: cancel any in-flight fast watchdog and re-arm it with the full fast
// timeout in a single operation. This keeps the watchdog quiet as long as meaningful
// progress keeps arriving within the window, while a true stall still fires on
// schedule. getAndSet prevents a concurrent re-arm from orphaning a job in the gap
// between cancel and reassign.
handshakeTimeout
.getAndSet(
scope.handledLaunch {
delay(FAST_HANDSHAKE_TIMEOUT)
if (serviceRepository.connectionState.value !is ConnectionState.Connecting) {
return@handledLaunch
}
Logger.e {
"Fast-handshake watchdog expired after progress stalled — requesting forced transport restart"
}
runSiblingHandshakeRecovery()
},
)
?.cancel()
}
override fun updateStatusNotification(telemetry: Telemetry?) {
serviceNotifications.updateServiceStateNotification(
serviceRepository.connectionState.value,
@@ -386,6 +618,10 @@ class MeshConnectionManagerImpl(
}
companion object {
// Hoisted constant — used on every meaningful handshake packet via
// isFastRecoveryTransport(); avoids allocating a fresh Set per packet.
private val FAST_RECOVERY_TYPES = setOf(DeviceType.TCP, DeviceType.USB)
private const val DEVICE_SLEEP_TIMEOUT_SECONDS = 30
// Maximum time (in seconds) to wait for a sleeping device before declaring it
@@ -411,10 +647,54 @@ class MeshConnectionManagerImpl(
*/
private val HANDSHAKE_TIMEOUT_STAGE2 = 60.seconds
// Shorter window for the retry attempt: if the device genuinely didn't receive the
// first want_config_id the retry completes within a few seconds. Waiting another 30s
// before reconnecting just delays recovery unnecessarily.
private val HANDSHAKE_RETRY_TIMEOUT = 15.seconds
/**
* Transport-aware fast-recovery timeout for the handshake stall guard, applied only to TCP and USB serial
* transports.
*
* Production logs on TCP/USB show a healthy firmware handshake completes in roughly 1 second, while a wedged
* socket sits idle for the full transport-level read timeout (~30s) without any further progress. 12s sits
* comfortably above the healthy success envelope and well below the transport read timeout, so firing a silent
* [RadioInterfaceService.restartTransport] at 12s recovers a stuck TCP/USB session quickly without
* false-positiving on healthy connections.
*
* BLE is intentionally excluded — its GATT latency budget is variable enough that the existing
* [HANDSHAKE_TIMEOUT_STAGE1] (30s) and [HANDSHAKE_TIMEOUT_STAGE2] (60s) budgets remain the right trade-off.
* Both transports now recover exclusively via [runSiblingHandshakeRecovery] (transport restart); the previous
* BLE mid-session want_config retry has been removed (see [startHandshakeStallGuard] for the firmware crash
* rationale).
*/
private val FAST_HANDSHAKE_TIMEOUT = 12.seconds
/**
* Maximum consecutive handshake-recovery attempts before surfacing a sticky error to the user.
*
* Each call to [runSiblingHandshakeRecovery] counts as one attempt. After this many consecutive attempts fail
* to lead to a successful handshake, recovery stops and a sticky Disconnected + error state is surfaced,
* requiring the user to manually re-select the node to retry. This prevents indefinite re-driving of a node
* whose firmware is crashing under handshake re-entry (reproduced on T-Beam v2.7.25.104df5f).
*/
private const val MAX_CONSECUTIVE_RECOVERY_FAILURES = 3
/**
* Base delay (seconds) for the exponential backoff applied between consecutive recovery attempts in
* [runSiblingHandshakeRecovery]. Doubled for each prior failure and hard-capped at
* [RECOVERY_BACKOFF_CAP_SECONDS].
*
* 2 s base gives the firmware and transport a brief breather after a failed restart without adding perceptible
* latency to the first retry; doubling bounds the worst-case loop tightly under the 3-strike cap.
*/
private const val RECOVERY_BACKOFF_BASE_SECONDS = 2L
/**
* Hard cap (seconds) for the exponential backoff between recovery attempts.
*
* Keeps the per-attempt delay bounded even if [MAX_CONSECUTIVE_RECOVERY_FAILURES] is raised in the future. 30 s
* matches the transport-level read timeout envelope, so a backed-off retry never waits longer than the
* underlying transport would have taken to fail on its own.
*/
// Cap unreachable while MAX_CONSECUTIVE_RECOVERY_FAILURES=3 (max computed delay is 4s);
// retained as defense-in-depth if MAX is raised in the future.
private const val RECOVERY_BACKOFF_CAP_SECONDS = 30L
private const val EVENT_CONNECTED_SECONDS = "connected_seconds"
private const val EVENT_MESH_DISCONNECT = "mesh_disconnect"

View File

@@ -20,6 +20,7 @@ import dev.mokkery.MockMode
import dev.mokkery.answering.calls
import dev.mokkery.answering.returns
import dev.mokkery.every
import dev.mokkery.everySuspend
import dev.mokkery.matcher.any
import dev.mokkery.mock
import dev.mokkery.verify
@@ -29,9 +30,12 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import okio.ByteString.Companion.encodeUtf8
import org.meshtastic.core.model.ConnectionState
import org.meshtastic.core.repository.CommandSender
import org.meshtastic.core.repository.HandshakeConstants
import org.meshtastic.core.repository.MeshConnectionManager
@@ -55,6 +59,13 @@ import org.meshtastic.proto.MyNodeInfo as ProtoMyNodeInfo
@OptIn(ExperimentalCoroutinesApi::class)
class MeshConfigFlowManagerImplTest {
private companion object {
// Production issues two sequential delay(wantConfigDelay) calls (100ms each = 200ms)
// plus scheduler slack for the inter-stage heartbeat and startNodeInfoOnly(); bump in
// lockstep with wantConfigDelay if it changes.
const val STAGE_TRANSITION_ADVANCE_MS = 250L
}
private val nodeManager = mock<NodeManager>(MockMode.autofill)
private val connectionManager = mock<MeshConnectionManager>(MockMode.autofill)
private val nodeRepository = mock<NodeRepository>(MockMode.autofill)
@@ -173,7 +184,8 @@ class MeshConfigFlowManagerImplTest {
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
verify { connectionManager.onRadioConfigLoaded() }
verify { connectionManager.startNodeInfoOnly() }
@@ -194,7 +206,8 @@ class MeshConfigFlowManagerImplTest {
sentPackets.clear() // Clear any packets from prior phases
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
val heartbeats = sentPackets.filter { it.heartbeat != null }
assertEquals(1, heartbeats.size, "Expected exactly one inter-stage heartbeat")
@@ -215,7 +228,8 @@ class MeshConfigFlowManagerImplTest {
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
// Handshake should still progress despite old firmware
verify { connectionManager.onRadioConfigLoaded() }
@@ -234,6 +248,20 @@ class MeshConfigFlowManagerImplTest {
verify { connectionManager.onRadioConfigLoaded() }
}
@Test
fun `Stage 1 complete updates progress for node list loading`() = testScope.runTest {
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
verify { serviceRepository.setConnectionProgress("Loading node list") }
}
@Test
fun `Stage 1 complete id ignored when not in ReceivingConfig state`() = testScope.runTest {
// State is Idle — should be a no-op
@@ -250,11 +278,13 @@ class MeshConfigFlowManagerImplTest {
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
// Now in ReceivingNodeInfo — a second Stage 1 complete should be ignored
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
}
// ---------- handleNodeInfo ----------
@@ -267,7 +297,8 @@ class MeshConfigFlowManagerImplTest {
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
// Now in ReceivingNodeInfo
manager.handleNodeInfo(NodeInfo(num = 100))
@@ -276,6 +307,29 @@ class MeshConfigFlowManagerImplTest {
assertEquals(2, manager.newNodeCount)
}
@Test
fun `handleNodeInfo buffers nodes received during Stage 1`() = testScope.runTest {
val testNode = org.meshtastic.core.testing.TestDataFactory.createTestNode(num = 100)
every { nodeManager.nodeDBbyNodeNum } returns mapOf(100 to testNode)
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleNodeInfo(NodeInfo(num = 100))
assertEquals(1, manager.newNodeCount)
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE)
advanceUntilIdle()
verify { nodeManager.installNodeInfo(NodeInfo(num = 100)) }
verifySuspend { connectionManager.onNodeDbReady() }
}
@Test
fun `handleNodeInfo ignored outside Stage 2`() = testScope.runTest {
// State is Idle
@@ -297,7 +351,8 @@ class MeshConfigFlowManagerImplTest {
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
manager.handleNodeInfo(NodeInfo(num = 100))
manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE)
@@ -323,7 +378,8 @@ class MeshConfigFlowManagerImplTest {
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
// No handleNodeInfo calls — empty node list
manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE)
@@ -333,6 +389,54 @@ class MeshConfigFlowManagerImplTest {
verifySuspend { connectionManager.onNodeDbReady() }
}
@Test
fun `Stage 2 complete keeps connected when post NodeDB side effects fail`() = testScope.runTest {
every { analytics.setDeviceAttributes(any(), any()) } calls { throw IllegalStateException("analytics") }
everySuspend { connectionManager.onNodeDbReady() } calls { throw IllegalStateException("side effects") }
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE)
advanceUntilIdle()
verify { nodeManager.setNodeDbReady(true) }
verify { nodeManager.setAllowNodeDbWrites(true) }
verify { serviceRepository.setConnectionState(ConnectionState.Connected) }
verifySuspend { connectionManager.onNodeDbReady() }
verify(mode = VerifyMode.not) { nodeManager.setNodeDbReady(false) }
verify(mode = VerifyMode.not) { nodeManager.setAllowNodeDbWrites(false) }
verify(mode = VerifyMode.not) { connectionManager.recoverPostHandshakeFailure() }
}
@Test
fun `Stage 2 complete disconnects when NodeDB install fails`() = testScope.runTest {
everySuspend { nodeRepository.installConfig(any(), any()) } calls { throw IllegalStateException("room") }
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE)
advanceUntilIdle()
verify { connectionManager.onHandshakeComplete() }
verify { nodeManager.setNodeDbReady(false) }
verify { nodeManager.setAllowNodeDbWrites(false) }
verify { connectionManager.recoverPostHandshakeFailure() }
verify(mode = VerifyMode.not) { nodeManager.setNodeDbReady(true) }
verifySuspend(mode = VerifyMode.not) { connectionManager.onNodeDbReady() }
}
// ---------- Unknown config_complete_id ----------
@Test
@@ -386,7 +490,8 @@ class MeshConfigFlowManagerImplTest {
// Stage 1 complete
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
verify { connectionManager.onRadioConfigLoaded() }
// Receive NodeInfo during Stage 2
@@ -468,4 +573,202 @@ class MeshConfigFlowManagerImplTest {
verify(mode = VerifyMode.not) { notificationPrefs.setNodeEventsEnabled(any()) }
verify(mode = VerifyMode.not) { notificationPrefs.setNodeEventsAutoDisabledForEvent(any()) }
}
// ---------- onHandshakeProgress ----------
@Test
fun `handleMyInfo calls onHandshakeProgress`() = testScope.runTest {
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
verify { connectionManager.onHandshakeProgress() }
}
@Test
fun `handleLocalMetadata in ReceivingConfig calls onHandshakeProgress`() = testScope.runTest {
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
// handleMyInfo fires one call; handleLocalMetadata adds exactly one more in ReceivingConfig.
verify(mode = VerifyMode.exactly(2)) { connectionManager.onHandshakeProgress() }
}
@Test
fun `handleLocalMetadata outside ReceivingConfig does not call onHandshakeProgress`() = testScope.runTest {
// State is Idle — metadata is ignored and must not reset the watchdog.
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
verify(mode = VerifyMode.not) { connectionManager.onHandshakeProgress() }
}
@Test
fun `handleFileInfo calls onHandshakeProgress`() = testScope.runTest {
val fileInfo = FileInfo(file_name = "firmware.bin", size_bytes = 1024)
manager.handleFileInfo(fileInfo)
advanceUntilIdle()
verify { connectionManager.onHandshakeProgress() }
}
@Test
fun `handleNodeInfo during Stage 1 calls onHandshakeProgress`() = testScope.runTest {
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleNodeInfo(NodeInfo(num = 100))
// handleMyInfo fires one call; handleNodeInfo in ReceivingConfig adds exactly one more.
verify(mode = VerifyMode.exactly(2)) { connectionManager.onHandshakeProgress() }
}
@Test
fun `handleNodeInfo during Stage 2 calls onHandshakeProgress`() = testScope.runTest {
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
// Now in ReceivingNodeInfo — a NodeInfo packet must reset the watchdog.
manager.handleNodeInfo(NodeInfo(num = 100))
// Prior calls: handleMyInfo(1) + handleLocalMetadata(1) + handleConfigOnlyComplete(1) = 3.
// handleNodeInfo adds exactly one more.
verify(mode = VerifyMode.exactly(4)) { connectionManager.onHandshakeProgress() }
}
@Test
fun `Stage 1 complete calls onHandshakeProgress`() = testScope.runTest {
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
// handleMyInfo(1) + handleLocalMetadata(1) + handleConfigOnlyComplete(1) = 3.
verify(mode = VerifyMode.exactly(3)) { connectionManager.onHandshakeProgress() }
}
@Test
fun `handleNodeInfo outside active handshake does not call onHandshakeProgress`() = testScope.runTest {
// State is Idle — NodeInfo is ignored and must not reset the watchdog.
manager.handleNodeInfo(NodeInfo(num = 999))
verify(mode = VerifyMode.not) { connectionManager.onHandshakeProgress() }
}
/**
* Regression guard: a full handshake must fire [MeshConnectionManager.onHandshakeProgress] exactly for meaningful
* handshake events only. Packet types not meaningful to the handshake — queueStatus, mqttClientProxyMessage,
* xmodemPacket, clientNotification, deviceuiConfig, and rebooted — are routed to sibling handlers
* (PacketHandlerImpl, MqttManagerImpl, FromRadioPacketHandlerImpl, MeshConfigHandlerImpl) and must never reset the
* watchdog from this manager. This test pins the exact count so any accidental wiring surfaces as a failure.
*/
@Test
fun `full handshake fires onHandshakeProgress exactly for meaningful events only`() = testScope.runTest {
val testNode = org.meshtastic.core.testing.TestDataFactory.createTestNode(num = 100)
every { nodeManager.nodeDBbyNodeNum } returns mapOf(100 to testNode)
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
manager.handleNodeInfo(NodeInfo(num = 100))
manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE)
advanceUntilIdle()
// handleMyInfo(1) + handleLocalMetadata(1) + handleConfigOnlyComplete(1)
// + handleNodeInfo(1) = 4. handleNodeInfoComplete intentionally does NOT call
// onHandshakeProgress: by that point the handshake is Complete and the synchronous
// onHandshakeComplete() call (verified in a separate test) cancels the watchdog.
verify(mode = VerifyMode.exactly(4)) { connectionManager.onHandshakeProgress() }
}
/**
* Regression guard for the Stage 2 watchdog-cancellation race fixed by adding
* [MeshConnectionManager.onHandshakeComplete]. Stage 2 completion must synchronously fire the terminal callback
* exactly once so the transport-aware fast-recovery watchdog is cancelled before any async DB install work begins.
*/
@Test
fun `Stage 2 complete fires onHandshakeComplete exactly once`() = testScope.runTest {
val testNode = org.meshtastic.core.testing.TestDataFactory.createTestNode(num = 100)
every { nodeManager.nodeDBbyNodeNum } returns mapOf(100 to testNode)
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
manager.handleNodeInfo(NodeInfo(num = 100))
manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE)
advanceUntilIdle()
verify(mode = VerifyMode.exactly(1)) { connectionManager.onHandshakeComplete() }
}
/**
* Regression guard for the actual Stage 2 watchdog-cancellation race: the synchronous
* [MeshConnectionManager.onHandshakeComplete] call MUST land before the asynchronous NodeDB install work begins, so
* that a slow Room commit on a large mesh cannot trip the 12 s fast-recovery timeout after the firmware handshake
* has already succeeded.
*
* Under [StandardTestDispatcher] the async DB install coroutine does not run until the test dispatcher is advanced,
* so we can assert the call ordering deterministically without any suspension trick on
* [NodeRepository.installConfig].
*/
@Test
fun `Stage 2 complete cancels watchdog synchronously before async DB install work`() = testScope.runTest {
val testNode = org.meshtastic.core.testing.TestDataFactory.createTestNode(num = 100)
every { nodeManager.nodeDBbyNodeNum } returns mapOf(100 to testNode)
// Record the order in which the synchronous watchdog-cancellation callback and the
// async DB install entry point are observed. The order is the invariant under test.
val callOrder = mutableListOf<String>()
every { connectionManager.onHandshakeComplete() } calls { callOrder.add("handshakeComplete") }
everySuspend { nodeRepository.installConfig(any(), any()) } calls { callOrder.add("installConfig") }
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceTimeBy(STAGE_TRANSITION_ADVANCE_MS)
runCurrent()
manager.handleNodeInfo(NodeInfo(num = 100))
// Drive Stage 2 complete. handleNodeInfoComplete runs synchronously: state becomes
// Complete, onHandshakeComplete() fires (cancelling the watchdog), then the async DB
// install block is launched but not yet executed under StandardTestDispatcher.
manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE)
// Synchronous post-condition: the watchdog was cancelled BEFORE the async DB install
// block was scheduled to run. If this ordering invariant ever regresses, a slow Room
// commit on a large mesh would falsely trip the 12s fast-recovery timeout.
assertEquals(
listOf("handshakeComplete"),
callOrder,
"onHandshakeComplete() must fire synchronously at Stage 2 complete, BEFORE any " +
"async DB install work begins — this is the race this test guards against",
)
// Now let the async DB install block run.
advanceUntilIdle()
assertEquals(
listOf("handshakeComplete", "installConfig"),
callOrder,
"installConfig must run AFTER onHandshakeComplete, ensuring the watchdog is " +
"already cancelled before any DB work could trip the fast-recovery timeout",
)
}
}

View File

@@ -30,6 +30,7 @@ import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import org.meshtastic.core.model.MyNodeInfo
import org.meshtastic.core.repository.MeshConnectionManager
import org.meshtastic.core.repository.NodeManager
import org.meshtastic.core.repository.RadioConfigRepository
import org.meshtastic.core.repository.ServiceRepository
@@ -49,6 +50,7 @@ class MeshConfigHandlerImplTest {
private val radioConfigRepository = mock<RadioConfigRepository>(MockMode.autofill)
private val serviceRepository = mock<ServiceRepository>(MockMode.autofill)
private val nodeManager = mock<NodeManager>(MockMode.autofill)
private val connectionManager = mock<MeshConnectionManager>(MockMode.autofill)
private val localConfigFlow = MutableStateFlow(LocalConfig())
private val moduleConfigFlow = MutableStateFlow(LocalModuleConfig())
@@ -67,6 +69,7 @@ class MeshConfigHandlerImplTest {
radioConfigRepository = radioConfigRepository,
serviceStateWriter = serviceRepository,
nodeManager = nodeManager,
connectionManager = lazy { this.connectionManager },
scope = scope,
)
@@ -228,4 +231,47 @@ class MeshConfigHandlerImplTest {
verifySuspend { radioConfigRepository.setDeviceUIConfig(config) }
}
// ---------- onHandshakeProgress ----------
@Test
fun `handleDeviceConfig calls onHandshakeProgress`() = runTest(testDispatcher) {
handler = createHandler(backgroundScope)
val config = Config(device = Config.DeviceConfig(role = Config.DeviceConfig.Role.CLIENT))
handler.handleDeviceConfig(config)
advanceUntilIdle()
verify { connectionManager.onHandshakeProgress() }
}
@Test
fun `handleModuleConfig calls onHandshakeProgress`() = runTest(testDispatcher) {
handler = createHandler(backgroundScope)
val config = ModuleConfig(mqtt = ModuleConfig.MQTTConfig(enabled = true))
handler.handleModuleConfig(config)
advanceUntilIdle()
verify { connectionManager.onHandshakeProgress() }
}
@Test
fun `handleChannel calls onHandshakeProgress`() = runTest(testDispatcher) {
handler = createHandler(backgroundScope)
every { nodeManager.getMyNodeInfo() } returns null
val channel = Channel(index = 0)
handler.handleChannel(channel)
advanceUntilIdle()
verify { connectionManager.onHandshakeProgress() }
}
@Test
fun `handleDeviceUIConfig calls onHandshakeProgress`() = runTest(testDispatcher) {
handler = createHandler(backgroundScope)
handler.handleDeviceUIConfig(DeviceUIConfig())
advanceUntilIdle()
verify { connectionManager.onHandshakeProgress() }
}
}

View File

@@ -16,6 +16,7 @@
*/
package org.meshtastic.core.data.manager
import co.touchlab.kermit.Severity
import dev.mokkery.MockMode
import dev.mokkery.answering.calls
import dev.mokkery.answering.returns
@@ -24,14 +25,17 @@ import dev.mokkery.everySuspend
import dev.mokkery.matcher.any
import dev.mokkery.mock
import dev.mokkery.verify
import dev.mokkery.verify.VerifyMode.Companion.exactly
import dev.mokkery.verifySuspend
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.TestDispatcher
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import org.meshtastic.core.model.ConnectionState
import org.meshtastic.core.model.DataPacket
@@ -61,27 +65,28 @@ import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
@OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class)
class MeshConnectionManagerImplTest {
private val radioInterfaceService = mock<RadioInterfaceService>(MockMode.autofill)
private val serviceRepository = mock<ServiceRepository>(MockMode.autofill)
private lateinit var radioInterfaceService: RadioInterfaceService
private lateinit var serviceRepository: ServiceRepository
private val serviceNotifications = mock<MeshNotificationManager>(MockMode.autofill)
private val uiPrefs = mock<UiPrefs>(MockMode.autofill)
private val packetHandler = mock<PacketHandler>(MockMode.autofill)
private val nodeRepository = FakeNodeRepository()
private val locationManager = mock<MeshLocationManager>(MockMode.autofill)
private val mqttManager = mock<MqttManager>(MockMode.autofill)
private val historyManager = mock<HistoryManager>(MockMode.autofill)
private val radioConfigRepository = mock<RadioConfigRepository>(MockMode.autofill)
private val commandSender = mock<CommandSender>(MockMode.autofill)
private val sessionManager = mock<SessionManager>(MockMode.autofill)
private val nodeManager = mock<NodeManager>(MockMode.autofill)
private val analytics = mock<PlatformAnalytics>(MockMode.autofill)
private val packetRepository = mock<PacketRepository>(MockMode.autofill)
private val workerManager = mock<MeshWorkerManager>(MockMode.autofill)
private val appWidgetUpdater = mock<AppWidgetUpdater>(MockMode.autofill)
private lateinit var serviceNotifications: MeshNotificationManager
private lateinit var uiPrefs: UiPrefs
private lateinit var packetHandler: PacketHandler
private lateinit var nodeRepository: FakeNodeRepository
private lateinit var locationManager: MeshLocationManager
private lateinit var mqttManager: MqttManager
private lateinit var historyManager: HistoryManager
private lateinit var radioConfigRepository: RadioConfigRepository
private lateinit var commandSender: CommandSender
private lateinit var sessionManager: SessionManager
private lateinit var nodeManager: NodeManager
private lateinit var analytics: PlatformAnalytics
private lateinit var packetRepository: PacketRepository
private lateinit var workerManager: MeshWorkerManager
private lateinit var appWidgetUpdater: AppWidgetUpdater
private val dataPacket = DataPacket(id = 456, time = 0L, to = "0", from = "0", bytes = null, dataType = 0)
@@ -90,12 +95,36 @@ class MeshConnectionManagerImplTest {
private val localConfigFlow = MutableStateFlow(LocalConfig())
private val moduleConfigFlow = MutableStateFlow(LocalModuleConfig())
private val testDispatcher = UnconfinedTestDispatcher()
private lateinit var testDispatcher: TestDispatcher
private lateinit var manager: MeshConnectionManagerImpl
@BeforeTest
fun setUp() {
radioInterfaceService = mock(MockMode.autofill)
serviceRepository = mock(MockMode.autofill)
serviceNotifications = mock(MockMode.autofill)
uiPrefs = mock(MockMode.autofill)
packetHandler = mock(MockMode.autofill)
nodeRepository = FakeNodeRepository()
locationManager = mock(MockMode.autofill)
mqttManager = mock(MockMode.autofill)
historyManager = mock(MockMode.autofill)
radioConfigRepository = mock(MockMode.autofill)
commandSender = mock(MockMode.autofill)
sessionManager = mock(MockMode.autofill)
nodeManager = mock(MockMode.autofill)
analytics = mock(MockMode.autofill)
packetRepository = mock(MockMode.autofill)
workerManager = mock(MockMode.autofill)
appWidgetUpdater = mock(MockMode.autofill)
testDispatcher = UnconfinedTestDispatcher()
radioConnectionState.value = ConnectionState.Disconnected
connectionStateFlow.value = ConnectionState.Disconnected
localConfigFlow.value = LocalConfig()
moduleConfigFlow.value = LocalModuleConfig()
every { radioInterfaceService.connectionState } returns radioConnectionState
every { radioConfigRepository.localConfigFlow } returns localConfigFlow
every { radioConfigRepository.moduleConfigFlow } returns moduleConfigFlow
@@ -135,6 +164,12 @@ class MeshConnectionManagerImplTest {
scope,
)
private fun restartTransportCallCounter(): () -> Int {
var restartCalls = 0
everySuspend { radioInterfaceService.restartTransport() } calls { restartCalls += 1 }
return { restartCalls }
}
@AfterTest fun tearDown() = Unit
@Test
@@ -386,8 +421,9 @@ class MeshConnectionManagerImplTest {
fun `concurrent sleep-timeout and radio state change are serialized`() {
val standardDispatcher = StandardTestDispatcher()
runTest(standardDispatcher) {
// Power saving enabled with a short ls_secs so the sleep timeout fires quickly
val config = LocalConfig(power = Config.PowerConfig(is_power_saving = true, ls_secs = 1))
// Power saving enabled with ls_secs=0 so the sleep timeout boundary is just before the
// Stage 1 handshake watchdog. That keeps this test isolated to sleep-timeout behavior.
val config = LocalConfig(power = Config.PowerConfig(is_power_saving = true, ls_secs = 0))
every { radioConfigRepository.localConfigFlow } returns flowOf(config)
every { nodeManager.nodeDBbyNodeNum } returns emptyMap()
@@ -400,22 +436,24 @@ class MeshConnectionManagerImplTest {
}
manager = createManager(backgroundScope)
advanceUntilIdle()
runCurrent()
// Transition to Connected -> DeviceSleep to start the sleep timer
radioConnectionState.value = ConnectionState.Connected
advanceUntilIdle()
runCurrent()
radioConnectionState.value = ConnectionState.DeviceSleep
advanceUntilIdle()
runCurrent()
observed.clear()
// Before the sleep timeout fires, emit Connected from the radio (simulating device
// waking up). Then let the timeout fire. The mutex ensures they don't race.
radioConnectionState.value = ConnectionState.Connected
// Advance past the sleep timeout (ls_secs=1 + 30s base = 31s)
advanceTimeBy(32_000L)
advanceUntilIdle()
runCurrent()
// Advance past the sleep timeout (ls_secs=0 + 30s base), but stop before the Stage 1
// handshake watchdog fires at roughly 30.1s after the wake-up Connected signal.
advanceTimeBy(30_050L)
runCurrent()
// The Connected transition should have cancelled the sleep timeout, so we should
// end up in Connecting (from handleConnected), NOT Disconnected (from timeout).
@@ -426,4 +464,713 @@ class MeshConnectionManagerImplTest {
)
}
}
@Test
fun `Stage 1 config stall triggers transport restart and ends Disconnected`() = runTest(testDispatcher) {
manager = createManager(backgroundScope)
// Disconnected -> Connected: handleConnected() sets Connecting, sends pre-handshake
// heartbeat, and (after PRE_HANDSHAKE_SETTLE_MS=100ms) calls startConfigOnly() which
// arms the Stage 1 stall guard (HANDSHAKE_TIMEOUT_STAGE1 = 30s).
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// Pre-condition: Stage 1 is in flight — manager is Connecting and a ToRadio has been sent
// (heartbeat + want_config_id). Use at-least-one here so the test isn't brittle on the
// exact packet count.
assertEquals(
ConnectionState.Connecting,
serviceRepository.connectionState.value,
"Manager should be Connecting after radio Connected",
)
verify { packetHandler.sendToRadio(any<org.meshtastic.proto.ToRadio>()) }
// Advance past HANDSHAKE_TIMEOUT_STAGE1 (30s) WITHOUT any config arrival. The stall
// fires the recovery sibling unconditionally on every transport — there is no longer a
// want_config retry delay before restart. The production code runs BOTH transitions
// inside one sibling recovery job — onConnectionChanged(Disconnected) FIRST, then
// restartTransport() — so the fresh Connected emission from restartTransport arrives
// with app-level state already Disconnected and is not ignored by the redundant-
// Connecting guard in onConnectionChanged. The advanceTimeBy keeps extra slack so the
// test stays robust against small timeout tweaks.
advanceTimeBy(46_000L)
advanceUntilIdle()
verifySuspend(exactly(1)) { radioInterfaceService.restartTransport() }
assertEquals(
ConnectionState.Disconnected,
serviceRepository.connectionState.value,
"Stage 1 stall should end in Disconnected after restart is requested",
)
}
@Test
fun `Handshake stall recovery orders app disconnect before transport restart emissions`() =
runTest(testDispatcher) {
// This test locks in the ordering invariant of the stall recovery
// sibling: onConnectionChanged(Disconnected) runs FIRST, then restartTransport().
// We deliberately do NOT stub restartTransport() — the default mock no-op leaves it
// as a pure boundary call so the sibling's two phases can be observed independently.
//
// After the stall fires and the sibling completes, we MANUALLY replay the
// transport-level emissions that the real restartTransport() would produce:
// - DeviceSleep (onDisconnect(isPermanent=false) on the old transport)
// - Connected (the new transport's onConnect callback)
// Under the FIXED ordering, the fresh Connected arrives with app state already
// Disconnected, bypasses the redundant-Connecting guard in onConnectionChanged,
// and re-enters handleConnected → state returns to Connecting.
// Under the BROKEN (old) ordering — restartTransport() BEFORE Disconnected — the
// fresh Connected would arrive while app state was still Connecting, the redundant-
// Connecting guard would drop it, and the state would never return to Connecting.
//
// Restructured to be deterministic on JVM CI: rather than relying on a stubbed
// restartTransport() lambda whose StateFlow side-effect emissions race with the
// flow collector under Mokkery, the test body itself drives the emissions in order.
manager = createManager(backgroundScope)
// Disconnected -> Connected: handleConnected() sets Connecting, sends pre-handshake
// heartbeat, and (after PRE_HANDSHAKE_SETTLE_MS=100ms) calls startConfigOnly() which
// arms the Stage 1 stall guard (HANDSHAKE_TIMEOUT_STAGE1 = 30s).
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// Pre-condition: Stage 1 is in flight.
assertEquals(
ConnectionState.Connecting,
serviceRepository.connectionState.value,
"Manager should be Connecting after radio Connected",
)
// Advance past HANDSHAKE_TIMEOUT_STAGE1 (30s) WITHOUT any config arrival. The stall
// fires the recovery sibling: onConnectionChanged(Disconnected) FIRST, then
// restartTransport() (default mock no-op, so nothing re-arms a stall guard and
// advanceUntilIdle is safe here).
advanceTimeBy(46_000L)
advanceUntilIdle()
verifySuspend(exactly(1)) { radioInterfaceService.restartTransport() }
assertEquals(
ConnectionState.Disconnected,
serviceRepository.connectionState.value,
"Sibling must run onConnectionChanged(Disconnected) BEFORE restartTransport() — " +
"proves the app-level Disconnected transition landed",
)
// Manually replay the transport-level restart signals that the real restartTransport()
// would emit. DeviceSleep corresponds to onDisconnect(isPermanent=false) on the old
// transport; Connected corresponds to the new transport's onConnect callback. With
// UnconfinedTestDispatcher each emission is collected synchronously inline, so no
// advanceUntilIdle() is needed between them — and none is safe AFTER the Connected
// emission, because handleConnected re-arms a fresh Stage 1 stall guard and
// advanceUntilIdle would advance virtual time past it (and every subsequent re-arm),
// looping the recovery and obscuring the single-shot ordering this test locks in.
radioConnectionState.value = ConnectionState.DeviceSleep
radioConnectionState.value = ConnectionState.Connected
assertEquals(
ConnectionState.Connecting,
serviceRepository.connectionState.value,
"Fresh Connected emission must re-enter handleConnected (NOT be ignored by the " +
"redundant-Connecting guard) because the app-level Disconnected transition " +
"already ran BEFORE restartTransport's transport cycle — this is the ordering " +
"invariant under test",
)
}
@Test
fun `Stage 2 node-info stall triggers transport restart`() = runTest(testDispatcher) {
manager = createManager(backgroundScope)
radioConnectionState.value = ConnectionState.Connected
// Pre-handshake settle completes; Stage 1 stall guard armed.
advanceTimeBy(200)
advanceUntilIdle()
// Drive the connection into Stage 2. In production this is done by the config-flow
// manager once Stage 1 config arrives; here we invoke it directly. startNodeInfoOnly()
// cancels the Stage 1 stall guard and arms Stage 2 (HANDSHAKE_TIMEOUT_STAGE2 = 60s).
manager.startNodeInfoOnly()
advanceUntilIdle()
assertEquals(
ConnectionState.Connecting,
serviceRepository.connectionState.value,
"Manager should still be Connecting entering Stage 2",
)
// Advance past HANDSHAKE_TIMEOUT_STAGE2 (60s) WITHOUT invoking onNodeDbReady(). The
// stall must fire the recovery sibling unconditionally.
advanceTimeBy(76_000L)
advanceUntilIdle()
verifySuspend(exactly(1)) { radioInterfaceService.restartTransport() }
assertEquals(
ConnectionState.Disconnected,
serviceRepository.connectionState.value,
"Stage 2 stall should end in Disconnected after restart is requested",
)
}
@Test
fun `Handshake completing before stall timeout does not trigger transport restart`() = runTest(testDispatcher) {
// Stubs required by onNodeDbReady() (full handshake completion path).
everySuspend { commandSender.requestTelemetry(any(), any(), any()) } returns Unit
every { nodeManager.myNodeNum } returns MutableStateFlow(123)
every { mqttManager.startProxy(any(), any()) } returns Unit
everySuspend { historyManager.requestHistoryReplay(any(), any(), any(), any()) } returns Unit
every { nodeManager.getMyNodeInfo() } returns null
manager = createManager(backgroundScope)
radioConnectionState.value = ConnectionState.Connected
// Pre-handshake settle completes; Stage 1 stall guard armed.
advanceTimeBy(200)
advanceUntilIdle()
// Simulate the full handshake completing (config arrives + NodeDB becomes ready).
// onNodeDbReady() cancels handshakeTimeout, so the stall recovery sibling can
// never run even if virtual time later crosses the stage windows.
manager.onNodeDbReady()
advanceUntilIdle()
// Advance well past BOTH stage windows (Stage 1: 30s, Stage 2: 60s).
advanceTimeBy(120_000L)
advanceUntilIdle()
verifySuspend(exactly(0)) { radioInterfaceService.restartTransport() }
}
@Test
fun `TCP Stage 1 stall fires restartTransport at 12s without retry`() = runTest(testDispatcher) {
// Address starting with 't' → DeviceType.TCP → fast recovery transport.
every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42"
val sentPackets = mutableListOf<org.meshtastic.proto.ToRadio>()
every { packetHandler.sendToRadio(any<org.meshtastic.proto.ToRadio>()) } calls
{ call ->
sentPackets.add(call.arg(0))
}
manager = createManager(backgroundScope)
radioConnectionState.value = ConnectionState.Connected
// Pre-handshake settle (100ms): heartbeat + Stage 1 want_config_id sent, fast watchdog armed.
advanceTimeBy(200)
advanceUntilIdle()
assertEquals(
ConnectionState.Connecting,
serviceRepository.connectionState.value,
"Manager should be Connecting after radio Connected",
)
// Sanity: heartbeat + Stage 1 want_config_id have been sent.
val packetsBeforeStall = sentPackets.size
// Advance past FAST_HANDSHAKE_TIMEOUT (12s) WITHOUT any progress signal. The fast
// branch must fire the recovery sibling directly — no same-session want_config re-send.
advanceTimeBy(13_000L)
advanceUntilIdle()
verifySuspend(exactly(1)) { radioInterfaceService.restartTransport() }
assertEquals(
ConnectionState.Disconnected,
serviceRepository.connectionState.value,
"Fast stall should end in Disconnected after restart is requested",
)
// The recovery sibling must not re-send want_config_id on the same session: no additional
// packet may be sent between the initial arming and the restartTransport call (a same-
// session re-send re-enters firmware handleStartConfig() and crashes the device).
assertEquals(
packetsBeforeStall,
sentPackets.size,
"Fast transport stall must NOT trigger a retry send of want_config_id",
)
}
@Test
fun `TCP Stage 1 meaningful progress resets watchdog without false restart`() = runTest(testDispatcher) {
every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42"
manager = createManager(backgroundScope)
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// Trickle inbound progress signals — each lands well inside the 12s fast window, so
// each one re-arms the watchdog and no restart may fire across multiple resets.
repeat(5) {
advanceTimeBy(8_000L)
manager.onHandshakeProgress()
advanceUntilIdle()
}
verifySuspend(exactly(0)) { radioInterfaceService.restartTransport() }
assertEquals(
ConnectionState.Connecting,
serviceRepository.connectionState.value,
"Steady trickle of progress must keep the manager Connecting",
)
}
@Test
fun `TCP Stage 1 watchdog fires after progress stops`() = runTest(testDispatcher) {
every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42"
manager = createManager(backgroundScope)
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// Two progress signals land inside the window.
advanceTimeBy(8_000L)
manager.onHandshakeProgress()
advanceUntilIdle()
advanceTimeBy(8_000L)
manager.onHandshakeProgress()
advanceUntilIdle()
// Progress stops; advance past the fast timeout from the last re-arm.
advanceTimeBy(13_000L)
advanceUntilIdle()
verifySuspend(exactly(1)) { radioInterfaceService.restartTransport() }
assertEquals(
ConnectionState.Disconnected,
serviceRepository.connectionState.value,
"Watchdog must fire once progress stops for longer than the fast timeout",
)
}
@Test
fun `TCP Stage 2 stall fires restartTransport at 12s`() = runTest(testDispatcher) {
every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42"
manager = createManager(backgroundScope)
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// Drive the connection into Stage 2. In production this is done by the config-flow
// manager once Stage 1 config arrives; here we invoke it directly. startNodeInfoOnly()
// cancels the Stage 1 watchdog and arms Stage 2 with the fast timeout (12s on TCP).
manager.startNodeInfoOnly()
advanceUntilIdle()
assertEquals(
ConnectionState.Connecting,
serviceRepository.connectionState.value,
"Manager should still be Connecting entering Stage 2",
)
// Advance past the fast timeout WITHOUT invoking onNodeDbReady().
advanceTimeBy(13_000L)
advanceUntilIdle()
verifySuspend(exactly(1)) { radioInterfaceService.restartTransport() }
assertEquals(
ConnectionState.Disconnected,
serviceRepository.connectionState.value,
"Stage 2 fast stall should end in Disconnected after restart is requested",
)
}
@Test
fun `TCP Stage 2 NodeInfo progress resets watchdog`() = runTest(testDispatcher) {
every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42"
manager = createManager(backgroundScope)
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
manager.startNodeInfoOnly()
advanceUntilIdle()
// Stage 2 node-info burst packets arrive as a trickle; each resets the fast watchdog.
repeat(3) {
advanceTimeBy(7_000L)
manager.onHandshakeProgress()
advanceUntilIdle()
}
verifySuspend(exactly(0)) { radioInterfaceService.restartTransport() }
assertEquals(
ConnectionState.Connecting,
serviceRepository.connectionState.value,
"Stage 2 progress must keep the manager Connecting",
)
}
@Test
fun `TCP fast recovery preserves Disconnected-before-restartTransport ordering`() = runTest(testDispatcher) {
every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42"
val observed = mutableListOf<ConnectionState>()
var progressBeforeRestart: String? = null
var restartTransportCalls = 0
every { serviceRepository.setConnectionState(any()) } calls
{ call ->
val state = call.arg<ConnectionState>(0)
observed.add(state)
connectionStateFlow.value = state
}
every { serviceRepository.setConnectionProgress(any()) } calls
{ call ->
// Capture the most recent progress string at the moment restartTransport runs.
// This locks in that the "Reconnecting…" UX hook fires BEFORE the app-level
// Disconnected transition.
progressBeforeRestart = call.arg<String>(0)
}
everySuspend { radioInterfaceService.restartTransport() } calls
{
restartTransportCalls++
// At the moment restartTransport runs, the app-level state MUST already be
// Disconnected — otherwise the fresh transport Connected emission would be
// dropped by the redundant-Connecting guard and we would re-introduce the
// split-brain this recovery path exists to break.
assertEquals(
ConnectionState.Disconnected,
connectionStateFlow.value,
"restartTransport must run AFTER app-level Disconnected transition",
)
assertEquals(
ServiceRepository.RECONNECTING_PROGRESS_TEXT,
progressBeforeRestart,
"setConnectionProgress(ServiceRepository.RECONNECTING_PROGRESS_TEXT) must run before restartTransport",
)
}
manager = createManager(backgroundScope)
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// Fire fast stall.
advanceTimeBy(13_000L)
advanceUntilIdle()
assertEquals(1, restartTransportCalls)
// Sanity: Connecting → Disconnected was the observed app-level transition path.
assertEquals(ConnectionState.Disconnected, observed.last(), "Last app-level state must be Disconnected")
}
@Test
fun `post-handshake failure recovery disconnects before restarting transport`() = runTest(testDispatcher) {
every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42"
val observed = mutableListOf<ConnectionState>()
var progressBeforeRestart: String? = null
var restartTransportCalls = 0
every { serviceRepository.setConnectionState(any()) } calls
{ call ->
val state = call.arg<ConnectionState>(0)
observed.add(state)
connectionStateFlow.value = state
}
every { serviceRepository.setConnectionProgress(any()) } calls
{ call ->
progressBeforeRestart = call.arg<String>(0)
}
everySuspend { radioInterfaceService.restartTransport() } calls
{
restartTransportCalls++
assertEquals(
ConnectionState.Disconnected,
connectionStateFlow.value,
"post-handshake recovery must disconnect app state before restarting transport",
)
assertEquals(
ServiceRepository.RECONNECTING_PROGRESS_TEXT,
progressBeforeRestart,
"post-handshake recovery should surface reconnecting progress before restartTransport",
)
}
manager = createManager(backgroundScope)
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
manager.onHandshakeComplete()
manager.recoverPostHandshakeFailure()
advanceUntilIdle()
assertEquals(1, restartTransportCalls)
assertEquals(ConnectionState.Disconnected, observed.last(), "Last app-level state must be Disconnected")
}
@Test
fun `BLE transport unaffected by onHandshakeProgress regression`() = runTest(testDispatcher) {
// Explicit BLE address (starts with 'x') — must NOT engage the fast path.
every { radioInterfaceService.getDeviceAddress() } returns "xAA:BB:CC:DD:EE:FF"
manager = createManager(backgroundScope)
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// Spam progress signals — BLE must ignore them, so the 30s Stage 1 budget is unchanged.
repeat(10) {
manager.onHandshakeProgress()
advanceUntilIdle()
}
// Advance to 13s — would fire a fast watchdog if BLE were incorrectly included.
advanceTimeBy(13_000L)
advanceUntilIdle()
verifySuspend(exactly(0)) { radioInterfaceService.restartTransport() }
assertEquals(
ConnectionState.Connecting,
serviceRepository.connectionState.value,
"BLE must ignore onHandshakeProgress and stay Connecting through 13s",
)
// Now advance past the full BLE Stage 1 budget (30s). We already advanced 13s, so 33s
// more crosses the 30s Stage 1 stall threshold the recovery sibling fires at. (Extra
// slack is harmless — the BLE stage has no retry delay before recovery.)
advanceTimeBy(33_000L)
advanceUntilIdle()
verifySuspend(exactly(1)) { radioInterfaceService.restartTransport() }
}
@Test
fun `onHandshakeProgress is a no-op when state is not Connecting`() = runTest(testDispatcher) {
every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42"
manager = createManager(backgroundScope)
// Manager starts in Disconnected (initial state, no radio Connected signal yet).
assertEquals(
ConnectionState.Disconnected,
serviceRepository.connectionState.value,
"Precondition: manager must start Disconnected",
)
// Calling onHandshakeProgress while not Connecting must not arm any watchdog.
manager.onHandshakeProgress()
advanceUntilIdle()
advanceTimeBy(60_000L)
advanceUntilIdle()
verifySuspend(exactly(0)) { radioInterfaceService.restartTransport() }
}
@Test
fun `USB serial transport engages fast path like TCP`() = runTest(testDispatcher) {
// Address starting with 's' → DeviceType.USB → fast recovery transport.
every { radioInterfaceService.getDeviceAddress() } returns "s/dev/ttyUSB0"
manager = createManager(backgroundScope)
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// Fast timeout (12s) fires for USB just like TCP.
advanceTimeBy(13_000L)
advanceUntilIdle()
verifySuspend(exactly(1)) { radioInterfaceService.restartTransport() }
assertEquals(
ConnectionState.Disconnected,
serviceRepository.connectionState.value,
"USB fast stall should end in Disconnected after restart is requested",
)
}
@Test
fun `onHandshakeComplete cancels armed Stage 2 fast watchdog`() = runTest(testDispatcher) {
// TCP address → DeviceType.TCP → fast recovery transport, 12s fast watchdog.
every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42"
manager = createManager(backgroundScope)
radioConnectionState.value = ConnectionState.Connected
// Pre-handshake settle (100ms) completes; Stage 1 fast watchdog armed.
advanceTimeBy(200)
advanceUntilIdle()
assertEquals(
ConnectionState.Connecting,
serviceRepository.connectionState.value,
"Manager should be Connecting after radio Connected",
)
// Drive into Stage 2. startNodeInfoOnly() cancels the Stage 1 watchdog and arms Stage 2
// with the 12s fast timeout. In production this is invoked by MeshConfigFlowManagerImpl
// after Stage 1 completes; here we drive it directly to isolate the watchdog path.
manager.startNodeInfoOnly()
advanceUntilIdle()
// Synchronously cancel the watchdog, exactly as MeshConfigFlowManagerImpl now does the
// instant NODE_INFO_NONCE arrives — BEFORE the async DB install work begins.
manager.onHandshakeComplete()
// Advance past the 12s fast timeout. With the watchdog cancelled, the recovery
// sibling must NOT fire: no restartTransport, no state transition.
advanceTimeBy(13_000L)
advanceUntilIdle()
verifySuspend(exactly(0)) { radioInterfaceService.restartTransport() }
assertEquals(
ConnectionState.Connecting,
serviceRepository.connectionState.value,
"Watchdog was cancelled by onHandshakeComplete — no restart, no state transition",
)
}
@Test
fun `Second recovery attempt applies exponential backoff delay before restart`() = runTest(testDispatcher) {
// TCP address → fast path (12s stall window). Smallest reliable cycle for backoff testing.
every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42"
val restartTransportCalls = restartTransportCallCounter()
manager = createManager(backgroundScope)
// Initial Connected → Connecting, pre-handshake armed.
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// Stall #1 fires (priorFailures=0 → no backoff). restartTransport called exactly once.
advanceTimeBy(13_000L)
advanceUntilIdle()
assertEquals(1, restartTransportCalls())
// Replay the transport restart cycle so handleConnected re-arms the stall guard. DeviceSleep
// is a no-op here (power-saving off); Connected re-enters handleConnected → new pre-handshake.
radioConnectionState.value = ConnectionState.DeviceSleep
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// Stall #2 fires (priorFailures=1 → 2 s backoff before restartTransport). The sibling has
// launched but is suspended in delay(2.seconds). advanceUntilIdle cannot make progress
// because the only pending task is the timed delay.
advanceTimeBy(13_000L)
advanceUntilIdle()
// Prove the backoff is in effect: restartTransport must NOT have been called yet for the
// second stall. If the backoff were skipped, this would already be exactly(2).
assertEquals(1, restartTransportCalls())
// Advancing past the 2 s backoff resumes the sibling and triggers the second restart.
advanceTimeBy(2_000L)
advanceUntilIdle()
assertEquals(2, restartTransportCalls())
}
@Test
fun `Recovery exhaustion after three consecutive failures surfaces sticky error and stops recovery`() =
runTest(testDispatcher) {
every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42"
// Capture every setErrorMessage invocation so we can verify the sticky error surfaced
// with Severity.Error without depending on the exact localized text.
val errorMessages = mutableListOf<Pair<String, Severity>>()
every { serviceRepository.setErrorMessage(any(), any()) } calls
{ call ->
errorMessages += call.arg<String>(0) to call.arg<Severity>(1)
}
val restartTransportCalls = restartTransportCallCounter()
manager = createManager(backgroundScope)
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// Stall #1 (priorFailures=0): no delay → restart. Replay Connected.
advanceTimeBy(13_000L)
advanceUntilIdle()
radioConnectionState.value = ConnectionState.DeviceSleep
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// Stall #2 (priorFailures=1): 2 s backoff → restart. Replay Connected.
advanceTimeBy(13_000L)
advanceUntilIdle()
advanceTimeBy(2_000L)
advanceUntilIdle()
radioConnectionState.value = ConnectionState.DeviceSleep
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// Stall #3 (priorFailures=2): 4 s backoff → restart. Replay Connected.
advanceTimeBy(13_000L)
advanceUntilIdle()
advanceTimeBy(4_000L)
advanceUntilIdle()
assertEquals(3, restartTransportCalls())
radioConnectionState.value = ConnectionState.DeviceSleep
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// Stall #4 (priorFailures=3 = MAX_CONSECUTIVE_RECOVERY_FAILURES): sticky error path.
// The sibling must NOT call restartTransport; instead it resets the counter, clears the
// progress text, transitions to Disconnected, and surfaces the sticky error.
advanceTimeBy(13_000L)
advanceUntilIdle()
assertEquals(3, restartTransportCalls())
assertEquals(
ConnectionState.Disconnected,
serviceRepository.connectionState.value,
"Recovery exhaustion must leave the manager in Disconnected",
)
// The sticky error is surfaced via setConnectionProgress("") (cleared because the user
// must manually retry) plus setErrorMessage(..., Severity.Error). We assert at least one
// setErrorMessage call landed with Severity.Error — this is the user-visible signal that
// recovery gave up. (Exact text comes from Res.string.error_recovery_exhausted and is
// resolved by the production code via getStringSuspend.)
assertTrue(
errorMessages.any { (_, severity) -> severity == Severity.Error },
"Recovery exhaustion must surface a sticky ERROR-severity message; got: $errorMessages",
)
// Counter is reset to 0 by the sticky-error branch so a manual user retry starts fresh.
// We can't read the private atomic directly, but the reset is exercised by the next-stall
// behavior; its correctness is locked in by `onHandshakeComplete resets counter` and by
// the symmetric reset-on-exhaust code path in runSiblingHandshakeRecovery().
}
@Test
fun `Successful handshake resets the consecutive recovery failure counter`() = runTest(testDispatcher) {
every { radioInterfaceService.getDeviceAddress() } returns "t192.168.1.42"
val restartTransportCalls = restartTransportCallCounter()
manager = createManager(backgroundScope)
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// Stall #1 (priorFailures=0): no delay → restart.
advanceTimeBy(13_000L)
advanceUntilIdle()
assertEquals(1, restartTransportCalls())
radioConnectionState.value = ConnectionState.DeviceSleep
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// Stall #2 (priorFailures=1): 2 s backoff → restart. Counter now at 2.
advanceTimeBy(13_000L)
advanceUntilIdle()
advanceTimeBy(2_000L)
advanceUntilIdle()
assertEquals(2, restartTransportCalls())
radioConnectionState.value = ConnectionState.DeviceSleep
radioConnectionState.value = ConnectionState.Connected
advanceTimeBy(200)
advanceUntilIdle()
// The next stall WOULD see priorFailures=2 (4 s backoff) if the counter were not reset.
// Call onHandshakeComplete() — this is the production signal that the handshake succeeded
// (e.g. NODE_INFO_NONCE arrived). It cancels the armed stall guard and resets the counter.
manager.onHandshakeComplete()
advanceUntilIdle()
// Re-arm a fresh Stage 1 stall guard directly. We use startConfigOnly() (the same entry
// point handleConnected calls) instead of replaying a Connected emission because the
// current state is still Connecting — a redundant Connected emission would be rejected by
// the redundant-Connected-while-Connecting guard in onConnectionChanged, so handleConnected
// would never re-run. The counter is now 0 thanks to the reset above.
manager.startConfigOnly()
advanceUntilIdle()
// Stall #3 fires. Because the counter was reset, priorFailures=0 → NO backoff. The restart
// must land immediately, without the 2 s or 4 s delay that an un-reset counter would impose.
advanceTimeBy(13_000L)
advanceUntilIdle()
assertEquals(3, restartTransportCalls())
// If the counter had NOT been reset, priorFailures would be 2 here and the sibling would be
// suspended in delay(4.seconds); restartTransport would still be exactly(2). Reaching
// exactly(3) immediately after the stall proves the counter was reset to 0.
}
}

View File

@@ -32,6 +32,38 @@ interface MeshConnectionManager {
/** Called when the node database is ready and fully populated. */
suspend fun onNodeDbReady()
/**
* Synchronously cancels the transport-aware handshake watchdog the moment the firmware signals Stage 2 completion
* (NODE_INFO_NONCE received).
*
* This MUST be invoked before the asynchronous NodeDB install work begins so that a slow DB commit on a large mesh
* cannot trip the 12 s fast-recovery timeout after the firmware handshake has already succeeded. The watchdog
* cancellation it performs is a strict subset of [onNodeDbReady]; the remaining post-NodeDB side effects
* (analytics, MQTT start, history replay, telemetry requests) stay gated on [onNodeDbReady] at the end of the DB
* install block.
*/
fun onHandshakeComplete()
/**
* Recovers from a failure after firmware handshake completion but before NodeDB install has completed.
*
* At this point the handshake watchdog has already been cancelled by [onHandshakeComplete], so recovery must
* explicitly move the app-level state out of Connecting and restart the raw transport in the same ordering used by
* handshake stall recovery.
*/
fun recoverPostHandshakeFailure()
/**
* Called when meaningful handshake progress is observed on the wire (e.g. an inbound packet related to the
* in-flight config or node-info exchange).
*
* On fast transports (TCP, USB serial) this re-arms the transport-aware handshake watchdog so a steady trickle of
* progress does not trip the aggressive fast-recovery timeout while a true stall still fires on schedule. On BLE
* this is a no-op: BLE keeps the original long-and-retry stall-guard budget because GATT latency is high and
* variable.
*/
fun onHandshakeProgress()
/** Updates the telemetry information for the local node. */
fun updateTelemetry(t: Telemetry)

View File

@@ -101,6 +101,35 @@ interface RadioInterfaceService : RadioTransportCallback {
*/
suspend fun disconnect()
/**
* Silent in-place transport restart for handshake stalls: tears down the active transport and re-establishes it in
* place, without touching the connection-request gate or the selected device address.
*
* Both transport families use this recovery path but with different trigger timings: TCP/USB reach it through the
* fast-path watchdog (~12s after the last meaningful handshake packet), while BLE reaches it after the
* retry-exhausted path (~30s/60s watchdog plus a ~15s retry window). In both cases the symptom is identical: the
* transport itself may still be physically [ConnectionState.Connected] (e.g. a TCP socket whose radio firmware has
* stopped responding to `want_config_id`, or a BLE link whose GATT peer has stalled), so flipping app-level state
* to [ConnectionState.Disconnected] alone leaves a split-brain: transport Connected + `connectionRequested=true` +
* a live `RadioTransport` handle, which then blocks same-node reconnect via [setDeviceAddress]'s fast-path. This
* method breaks that deadlock by cycling the transport in place.
*
* Contract:
* - Preserves the selected device address (does not modify [currentDeviceAddressFlow]).
* - Preserves the `connectionRequested` gate; **MUST NOT** clear it. Safe to call concurrently with an explicit
* [disconnect] — the internal gate check makes it a no-op in that case.
* - Safe to call when no transport is running — implementations must no-op.
* - Does **NOT** bypass selected-device validation; the replacement transport is built from the same bonded address
* via the normal start path.
* - Emits ordinary transport-level transitions through the existing [RadioTransportCallback] surface, so observers
* see the transient [ConnectionState.DeviceSleep] state followed by [ConnectionState.Connected] when the
* replacement transport connects. (No [ConnectionState.Connecting] is emitted at the transport layer — that is an
* app-level state set by [MeshConnectionManager], not a transport callback.)
*
* Suspends until the teardown/restart cycle completes.
*/
suspend fun restartTransport()
/** Returns the current device address. */
fun getDeviceAddress(): String?

View File

@@ -163,4 +163,17 @@ interface ServiceRepository :
/** Clears the current neighbor info response. */
override fun clearNeighborInfoResponse()
companion object {
/**
* The cross-module contract for the WiFi/TCP handshake-watchdog recovery progress signal.
*
* [MeshConnectionManager] writes this exact literal to [connectionProgress] immediately before its recovery
* sibling transitions the transport to [ConnectionState.Disconnected], and `ConnectionsViewModel` (and its
* tests) compare incoming progress against it to surface `RECONNECTING` UI state instead of a final-feeling
* `NOT_CONNECTED`. The literal text and the U+2026 HORIZONTAL ELLIPSIS character MUST match exactly across both
* writers and readers — centralizing it here removes the prior cross-module string-contract hazard.
*/
const val RECONNECTING_PROGRESS_TEXT = "Reconnecting\u2026"
}
}

View File

@@ -537,6 +537,7 @@
<string name="environment_metrics_use_fahrenheit">Environment metrics use Fahrenheit</string>
<string name="error">Error</string>
<string name="error_duty_cycle">Duty Cycle limit reached. Cannot send messages right now, please try again later.</string>
<string name="error_recovery_exhausted">Could not establish a stable connection after repeated attempts. Please re-select the node to retry.</string>
<string name="establish_session">Connect &amp; administer</string>
<string name="establishing_session">Establishing remote session…</string>
<string name="ethernet_config">Ethernet Options</string>
@@ -1172,6 +1173,7 @@
<string name="rebroadcast_mode_local_only_desc">Ignores observed messages from foreign meshes that are open or those which it cannot decrypt. Only rebroadcasts message on the nodes local primary / secondary channels.</string>
<string name="rebroadcast_mode_none_desc">Only permitted for SENSOR, TRACKER and TAK_TRACKER roles, this will inhibit all rebroadcasts, not unlike CLIENT_MUTE role.</string>
<string name="recent_network_devices">Recent Network Devices</string>
<string name="reconnecting">Reconnecting…</string>
<string name="red">Red</string>
<string name="refresh">Refresh</string>
<string name="refresh_metadata">Refresh metadata</string>
@@ -1456,6 +1458,23 @@
<string name="traceroute_showing_nodes">Showing %1$d/%2$d nodes</string>
<string name="track_and_share_locations">Track and Share Locations</string>
<string name="track_point">track point</string>
<!-- TRAFFIC -->
<string name="traffic_management">Traffic Management</string>
<string name="traffic_management_config">Traffic Management Configuration</string>
<string name="traffic_management_drop_unknown_enabled">Drop Unknown Packets</string>
<string name="traffic_management_enabled">Module Enabled</string>
<string name="traffic_management_exhaust_hop_position">Local-only Position (Relays)</string>
<string name="traffic_management_exhaust_hop_telemetry">Local-only Telemetry (Relays)</string>
<string name="traffic_management_nodeinfo_direct_response">NodeInfo Direct Response</string>
<string name="traffic_management_nodeinfo_direct_response_max_hops">Max Hops for Direct Response</string>
<string name="traffic_management_position_dedup">Position Deduplication</string>
<string name="traffic_management_position_min_interval">Min Position Interval (secs)</string>
<string name="traffic_management_position_precision">Position Precision (bits)</string>
<string name="traffic_management_rate_limit_enabled">Rate Limiting</string>
<string name="traffic_management_rate_limit_max_packets">Max Packets in Window</string>
<string name="traffic_management_rate_limit_window">Rate Limit Window (secs)</string>
<string name="traffic_management_router_preserve_hops">Preserve Router Hops</string>
<string name="traffic_management_unknown_packet_threshold">Unknown Packet Threshold</string>
<string name="transmit_over_lora">Transmit over LoRa</string>
<string name="transport_ble">BLE</string>
<string name="transport_tcp">TCP</string>

View File

@@ -270,6 +270,80 @@ class SharedRadioInterfaceService(
}
}
override suspend fun restartTransport() {
// CAS BEFORE the mutex, mirroring checkLiveness()'s coordination structure: both
// restart paths CAS synchronously, one wins, one loses immediately. Performing the
// CAS inside transportMutex.withLock races checkLiveness's outer
// `finally { isRestarting = false }` (which runs AFTER mutex release): a queued
// restartTransport that resumes from mutex.wait can observe isRestarting == false,
// win the CAS, and produce an extra transport cycle (3 instead of 2) under the JVM's
// real dispatcher. The loser here observes isRestarting == true and defers to the
// in-flight cycle. startTransportLocked() is idempotent w.r.t. an existing transport,
// but the CAS also prevents a double stop/stop race on the teardown side.
if (!isRestarting.compareAndSet(expect = false, update = true)) {
Logger.d { "restartTransport: skipped, concurrent restart in progress" }
return
}
try {
transportMutex.withLock {
// Silent recovery for app-level handshake stalls. The transport may still be physically
// up (TCP socket alive, firmware unresponsive to want_config_id), so cycling it in place
// WITHOUT clearing connectionRequested avoids the split-brain where setDeviceAddress's
// fast-path would otherwise block same-node reconnect. The caller (MeshConnectionManager)
// is responsible for the app-level Disconnected flip; this method only cycles the
// transport and emits the transport-level DeviceSleep -> Connected transitions via
// callbacks (no Connecting — that is an app-level state, not a transport emission).
if (!connectionRequested) {
Logger.d { "restartTransport: skipped-not-requested" }
return@withLock
}
if (getBondedDeviceAddress() == null) {
Logger.d { "restartTransport: skipped-no-address" }
return@withLock
}
// Honor the documented "safe no-op when no transport running" contract: environmental
// stops (network unavailable, BLE disabled) intentionally preserve
// connectionRequested=true so the recovery listeners above can re-bring-up the
// transport later. A stale restart job running after such a stop must NOT bypass that
// recovery path by creating a transport directly via startTransportLocked().
if (radioTransport == null) {
Logger.d { "restartTransport: skipped-no-transport" }
return@withLock
}
Logger.w { "restartTransport: restarting transport for ${getDeviceAddress()?.anonymize}" }
// Mirror checkLiveness()'s coordination contract: emit a transport-level
// Connected -> DeviceSleep transition before the stop/start cycle so
// transport-level observers see a full DeviceSleep -> Connected cycle when
// the fresh transport's onConnect() fires and can re-trigger their onConnected
// logic. The caller (runSiblingHandshakeRecovery) flips the app-level state to
// Disconnected before invoking restartTransport(), so this emission exists for
// transport-level coordination, not for app-level StateFlow dedupe. Silent:
// transient DeviceSleep, not a permanent Disconnected.
onDisconnect(isPermanent = false)
// notifyPermanent=false below (no user-facing Disconnected modal — the app-level
// state machine drives that separately) and sendPoliteDisconnect=false (firmware
// is unresponsive, writing a goodbye frame into a dead link only delays teardown).
ignoreExceptionSuspend { stopTransportLocked(notifyPermanent = false, sendPoliteDisconnect = false) }
// Defense-in-depth mirroring the liveness recovery gate; today all
// connectionRequested mutators (connect/disconnect/setDeviceAddress) hold
// transportMutex so this re-check is unreachable, but it guards against future
// refactors that mutate the gate without serialization.
if (!connectionRequested) {
Logger.d { "restartTransport: aborted, disconnect requested during stop" }
return@withLock
}
// startTransportLocked() re-validates the selected address (no-op if null) and emits
// Connected through the transport callbacks (via the new transport's onConnect) once
// the fresh transport comes up — there is no Connecting emission at the transport
// layer (that is an app-level state owned by MeshConnectionManager).
startTransportLocked()
Logger.i { "restartTransport: completed" }
}
} finally {
isRestarting.value = false
}
}
override fun isMockTransport(): Boolean = transportFactory.isMockTransport()
override fun toInterfaceAddress(interfaceId: InterfaceId, rest: String): String =

View File

@@ -785,4 +785,443 @@ class SharedRadioInterfaceServiceLivenessTest {
advanceTimeBy(1_000L)
}
}
// ─── restartTransport gate contract ─────────────────────────────────────────────────────
//
// [SharedRadioInterfaceService.restartTransport] is the app-level handshake-stall recovery
// path. It mirrors BLE liveness silent recovery: stopTransportLocked(notifyPermanent=false,
// sendPoliteDisconnect=false) then startTransportLocked(). The isRestarting CAS runs
// synchronously BEFORE transportMutex (mirroring checkLiveness), then the remaining gates run
// inside the mutex. The gate contract has four early-return gates that MUST all hold for the
// cycle to fire, evaluated in this exact order:
// 1. isRestarting.compareAndSet(false, true) succeeds (reuses the liveness CAS; synchronous,
// BEFORE acquiring transportMutex — both restart paths serialize on this CAS first)
// 2. connectionRequested == true (cleared by disconnect() / setDeviceAddress(null)/("n");
// checked inside transportMutex)
// 3. getBondedDeviceAddress() != null (re-validates the selected address; inside transportMutex)
// 4. radioTransport != null (defends against a stale restart after an environmental stop;
// inside transportMutex)
// The cycle must also be address-preserving and silent (no permanent Disconnected, no
// user-facing error). The tests below lock each leg of that contract.
/**
* Happy path: [SharedRadioInterfaceService.restartTransport] after a normal [connect] with a selected address stops
* the running transport and creates a fresh one. Mirrors the BLE liveness "timeout closes old transport and creates
* fresh one" assertion shape.
*/
@Test
fun `restartTransport after connect closes old transport and creates fresh one`() = runTest(testDispatcher) {
clock = 0L
val service = createConnectedService("xAA:BB:CC:DD:EE:FF")
try {
assertEquals(1, createdTransports.size, "Initial connect should create one transport")
val initialTransport = createdTransports.first()
// Lock the sendPoliteDisconnect=false contract: clear any bytes recorded during the
// initial connect so sentData reflects only writes performed during restartTransport.
initialTransport.sentData.clear()
service.restartTransport()
// sendPoliteDisconnect = false → no 500ms drain inside the cycle. Under
// UnconfinedTestDispatcher the whole stop/start runs inline; runCurrent is
// belt-and-suspenders. The trailing disconnect() covers its own polite delay.
testDispatcher.scheduler.runCurrent()
assertEquals(2, createdTransports.size, "restartTransport should create exactly one fresh transport")
assertTrue(initialTransport.closeCalled, "Old transport must be closed by restartTransport")
assertEquals(1, initialTransport.closeCount, "Old transport closed exactly once (no double-close)")
assertTrue(
initialTransport.sentData.isEmpty(),
"restartTransport must NOT write any bytes to the old transport (sendPoliteDisconnect=false)",
)
} finally {
service.disconnect()
advanceTimeBy(1_000L)
}
}
/**
* Regression: [SharedRadioInterfaceService.restartTransport] MUST be a no-op after an explicit
* [SharedRadioInterfaceService.disconnect]. disconnect() clears `connectionRequested` BEFORE stopTransportLocked(),
* and restartTransport() consults that gate as its first check. Without the gate, a racing handshake-induced
* restart would silently resurrect a transport the user tore down.
*/
@Test
fun `restartTransport is a no-op after explicit disconnect`() = runTest(testDispatcher) {
clock = 0L
val service = createConnectedService("xAA:BB:CC:DD:EE:FF")
try {
assertEquals(1, createdTransports.size, "Initial connect should create one transport")
service.disconnect()
advanceTimeBy(1_000L)
val transportCountAfterDisconnect = createdTransports.size
service.restartTransport()
testDispatcher.scheduler.runCurrent()
assertEquals(
transportCountAfterDisconnect,
createdTransports.size,
"restartTransport must not create a transport after disconnect (connectionRequested gate)",
)
} finally {
service.disconnect()
advanceTimeBy(1_000L)
}
}
/**
* Regression: [SharedRadioInterfaceService.restartTransport] MUST be a no-op after
* [SharedRadioInterfaceService.setDeviceAddress] with `null`. The deselect clears `connectionRequested` AND leaves
* getBondedDeviceAddress() == null (invalid address); either gate alone is sufficient to skip the restart, but both
* must hold to defend against a stale listener emission re-arming the transport.
*/
@Test
fun `restartTransport is a no-op after setDeviceAddress null`() = runTest(testDispatcher) {
clock = 0L
val service = createConnectedService("xAA:BB:CC:DD:EE:FF")
try {
assertEquals(1, createdTransports.size, "Initial connect should create one transport")
service.setDeviceAddress(null)
advanceTimeBy(1_000L)
val transportCountAfterDeselect = createdTransports.size
service.restartTransport()
testDispatcher.scheduler.runCurrent()
assertEquals(
transportCountAfterDeselect,
createdTransports.size,
"restartTransport must not create a transport after setDeviceAddress(null)",
)
} finally {
service.disconnect()
advanceTimeBy(1_000L)
}
}
/**
* Counterpart to the null-deselect test: `"n"` is the UI sentinel for "no device" and is sanitized to `null` inside
* [SharedRadioInterfaceService.setDeviceAddress]. The same gate must skip restartTransport for both forms.
*/
@Test
fun `restartTransport is a no-op after setDeviceAddress n`() = runTest(testDispatcher) {
clock = 0L
val service = createConnectedService("xAA:BB:CC:DD:EE:FF")
try {
assertEquals(1, createdTransports.size, "Initial connect should create one transport")
service.setDeviceAddress("n")
advanceTimeBy(1_000L)
val transportCountAfterDeselect = createdTransports.size
service.restartTransport()
testDispatcher.scheduler.runCurrent()
assertEquals(
transportCountAfterDeselect,
createdTransports.size,
"restartTransport must not create a transport after setDeviceAddress(\"n\")",
)
} finally {
service.disconnect()
advanceTimeBy(1_000L)
}
}
/**
* Regression: [SharedRadioInterfaceService.restartTransport] cycles the transport in place WITHOUT writing a new
* devAddr or clearing currentDeviceAddressFlow. The caller (MeshConnectionManager) is the sole owner of address
* changes; restartTransport must never silently re-bind to a different device or evict the selection.
*/
@Test
fun `restartTransport preserves selected device address`() = runTest(testDispatcher) {
clock = 0L
val address = "xAA:BB:CC:DD:EE:FF"
val service = createConnectedService(address)
try {
assertEquals(1, createdTransports.size, "Initial connect should create one transport")
val addrBefore = service.getDeviceAddress()
val prefsAddrBefore = radioPrefs.devAddr.value
service.restartTransport()
testDispatcher.scheduler.runCurrent()
assertEquals(
addrBefore,
service.getDeviceAddress(),
"getDeviceAddress must not change across restartTransport",
)
assertEquals(address, service.getDeviceAddress(), "Selected device address preserved")
assertEquals(
prefsAddrBefore,
radioPrefs.devAddr.value,
"radioPrefs.devAddr must not be rewritten by restartTransport",
)
} finally {
service.disconnect()
advanceTimeBy(1_000L)
}
}
/**
* Regression: [SharedRadioInterfaceService.restartTransport] mirrors BLE liveness silent recovery. It MUST emit a
* transient DeviceSleep (via onDisconnect(isPermanent = false)) BEFORE the stop/start cycle so the subsequent
* onConnect() produces a real Connected transition (StateFlow is idempotent on same-value — without the DeviceSleep
* flip, Connected -> Connected is a no-op and MeshConnectionManager stays stuck on its app-level Disconnected
* state). It MUST NOT emit a permanent user-facing Disconnected state — the caller drives app-level state
* transitions separately, and surfacing a permanent disconnect for a self-healing cycle would pop a confusing modal
* for a transient condition.
*/
@Test
fun `restartTransport does not emit permanent Disconnected`() = runTest(testDispatcher) {
clock = 0L
val service = createConnectedService("xAA:BB:CC:DD:EE:FF")
try {
val stateEmissions = mutableListOf<ConnectionState>()
val collectJob = backgroundScope.launch { service.connectionState.collect { stateEmissions.add(it) } }
service.restartTransport()
testDispatcher.scheduler.runCurrent()
collectJob.cancel()
// The DeviceSleep emission is the intended transport-level transition that lets
// MeshConnectionManager observe a real Connected transition on the fresh transport.
assertTrue(
ConnectionState.DeviceSleep in stateEmissions,
"restartTransport must emit transient DeviceSleep so the post-restart onConnect() re-triggers handleConnected() (emitted: $stateEmissions)",
)
assertFalse(
ConnectionState.Disconnected in stateEmissions,
"restartTransport must not emit permanent Disconnected state (emitted: $stateEmissions)",
)
} finally {
service.disconnect()
advanceTimeBy(1_000L)
}
}
/**
* Regression: [SharedRadioInterfaceService.restartTransport] must not emit a user-facing connection error.
* _connectionError is a no-replay SharedFlow, so the collector must subscribe BEFORE the restart; under
* UnconfinedTestDispatcher the launch runs eagerly up to its first suspension (awaiting SharedFlow emission). A
* silent-recovery cycle surfacing an error modal is the same UX bug as a liveness recovery surfacing one.
*/
@Test
fun `restartTransport does not emit user-facing connection error`() = runTest(testDispatcher) {
clock = 0L
val service = createConnectedService("xAA:BB:CC:DD:EE:FF")
try {
val errors = mutableListOf<String>()
val collectJob = backgroundScope.launch { service.connectionError.collect { errors.add(it) } }
service.restartTransport()
testDispatcher.scheduler.runCurrent()
collectJob.cancel()
assertTrue(
errors.isEmpty(),
"restartTransport must not emit user-facing connection error (got: $errors)",
)
} finally {
service.disconnect()
advanceTimeBy(1_000L)
}
}
/**
* Regression: [SharedRadioInterfaceService.restartTransport] reuses the BLE liveness `isRestarting` CAS so a
* concurrent liveness silent-recovery cycle and a handshake-induced restart cannot stack. The loser of the CAS
* observes `isRestarting == true` and defers to the in-flight cycle, preventing a double stop/start race on the
* transport.
*
* restartTransport performs the isRestarting CAS synchronously BEFORE acquiring transportMutex (mirroring
* checkLiveness). Under this pattern, a concurrent liveness restart and a handshake-induced restart serialize on
* the CAS first — the loser returns immediately without touching the mutex. This is deterministic on all
* dispatchers, not just UnconfinedTestDispatcher.
*
* Deterministic in-flight overlap: a [GatedFakeRadioTransport] suspends the liveness restart genuinely inside
* stopTransportLocked → close() (awaiting closeGate). While the liveness coroutine holds `isRestarting == true`
* inside its restart cycle, a concurrent `restartTransport()` call CAS-fails on `isRestarting` and returns
* immediately without ever acquiring `transportMutex`. A stacking bug (CAS ignored, or CAS performed inside the
* mutex) would produce 3 transports.
*/
@Test
fun `restartTransport coordinates with in-flight liveness restart via isRestarting`() = runTest(testDispatcher) {
val gatedTransports = mutableListOf<GatedFakeRadioTransport>()
val closeGate = CompletableDeferred<Unit>()
// Publish the gate to activeCloseGate so tearDown can release it even if an assertion
// throws before we reach the inner try/finally — otherwise disconnect() below would
// hang forever on the gated close().
activeCloseGate = closeGate
val transportProvider: () -> RadioTransport = {
GatedFakeRadioTransport(closeGate).also { gatedTransports.add(it) }
}
clock = 0L
val service = createConnectedService("xAA:BB:CC:DD:EE:FF", transportProvider)
try {
assertEquals(1, gatedTransports.size, "Initial connect should create one transport")
val initialTransport = gatedTransports.first()
// Past the 60s threshold → first checkLiveness CAS-sets isRestarting=true and
// launches a restart coroutine whose close() suspends on closeGate. Under
// UnconfinedTestDispatcher the launched coroutine runs eagerly up to the
// suspension point: by the time checkLiveness() returns, mutex is held and
// isRestarting == true.
clock = 65_000L
service.checkLiveness()
// Issue restartTransport() while the liveness restart is in-flight. Under the refactored
// CAS-before-mutex pattern, restartTransport's CAS fails immediately (isRestarting is already
// true from checkLiveness's synchronous CAS). It returns without acquiring the mutex.
val restartJob = backgroundScope.launch { service.restartTransport() }
testDispatcher.scheduler.runCurrent()
try {
// Pre-release: liveness still in-flight (close() suspended on closeGate),
// but restartTransport already returned via CAS-fail on isRestarting (it never
// touched transportMutex). No fresh transport from restartTransport, no stacking.
assertEquals(
1,
gatedTransports.size,
"No fresh transport created while liveness restart is in-flight",
)
assertTrue(initialTransport.closeCalled, "Liveness restart must have entered close()")
assertEquals(
0,
initialTransport.closeCompletedCount,
"Liveness restart close() must still be suspended (gate not yet released)",
)
assertTrue(
restartJob.isCompleted,
"restartTransport should CAS-fail immediately when liveness already holds isRestarting, " +
"not queue on transportMutex",
)
} finally {
// Release the gate unconditionally so the suspended liveness restart can
// complete. tearDown also releases activeCloseGate, but completing here is
// required for the post-finally assertions to observe the resumed cycle.
closeGate.complete(Unit)
}
// Release the gate: liveness restart completes (close + startTransport).
// restartTransport already returned via CAS-fail, so no second cycle. Exactly 2 transports.
testDispatcher.scheduler.runCurrent()
advanceTimeBy(1_000L)
restartJob.join()
// Exactly 2 transports: 1 initial + 1 from liveness restart. restartTransport
// was a no-op via the isRestarting CAS. A stacking bug would produce 3.
assertEquals(
2,
gatedTransports.size,
"restartTransport must not stack another cycle on an in-flight liveness restart",
)
assertEquals(1, initialTransport.closeCount, "Initial transport closed exactly once")
assertEquals(
1,
initialTransport.closeCompletedCount,
"Initial transport close completed exactly once after gate release",
)
} finally {
service.disconnect()
advanceTimeBy(1_000L)
}
}
/**
* Regression for the `radioTransport == null` gate added to [SharedRadioInterfaceService.restartTransport] (the
* mutex-protected early-return, checked after the `isRestarting` CAS but before the `onDisconnect(isPermanent =
* false)` call).
*
* Scenario: a TCP transport has been torn down by an environmental stop (`networkAvailable = false`) but
* `connectionRequested` is intentionally preserved so the network recovery listener can re-bring-up the transport
* when connectivity returns. A stale handshake-stall restart fired in that window MUST be a no-op:
* - It MUST NOT create a fresh transport via `startTransportLocked()` (that would bypass the recovery listener,
* which owns the re-bring-up).
* - It MUST NOT emit `DeviceSleep` via `onDisconnect(isPermanent = false)` (the gate returns before that call), so
* the recovery path's later transitions stay meaningful.
*
* Counter-assertion: the recovery listener itself MUST still function after the gate fires — toggling
* `networkAvailable` back to `true` MUST create the second transport, proving the gate did not break the documented
* re-bring-up path.
*/
@Test
fun `restartTransport is a no-op when transport is environmentally stopped but connection remains requested`() =
runTest(testDispatcher) {
clock = 0L
val networkAvailability = MutableStateFlow<Boolean>(true)
val service = createConnectedService("t192.168.1.100", networkAvailability = networkAvailability)
try {
assertEquals(1, createdTransports.size, "Initial connect should create one transport")
val initialTransport = createdTransports.first()
// Subscribe BEFORE the environmental stop so we capture every state transition through
// the stop, the gated restartTransport, and the recovery. _connectionState is a StateFlow
// (replays current value to late collectors), so the launch's first emission is Connected.
val stateEmissions = mutableListOf<ConnectionState>()
val collectJob = backgroundScope.launch { service.connectionState.collect { stateEmissions.add(it) } }
testDispatcher.scheduler.runCurrent()
// Environmental stop: network drops while a TCP transport is running. The network
// listener (see initStateListeners) calls stopTransportLocked() with defaults
// (notifyPermanent=true → onDisconnect(isPermanent=true) → Disconnected). The transport
// is closed and radioTransport becomes null, but connectionRequested STAYS true so the
// recovery listener can re-bring-up later.
networkAvailability.value = false
testDispatcher.scheduler.runCurrent()
// Drain the polite-disconnect frame (POLITE_DISCONNECT_DRAIN_MS = 500ms).
advanceTimeBy(1_000L)
assertTrue(initialTransport.closeCalled, "Environmental stop must close the running TCP transport")
assertEquals(
1,
createdTransports.size,
"Environmental stop must not create a new transport (only teardown)",
)
// Stale handshake-stall restart fired in the window where radioTransport == null but
// connectionRequested is still true. The new gate must short-circuit AFTER the
// isRestarting CAS (inside transportMutex) but BEFORE the onDisconnect(isPermanent=false) DeviceSleep
// emission.
service.restartTransport()
testDispatcher.scheduler.runCurrent()
advanceTimeBy(1_000L)
assertEquals(
1,
createdTransports.size,
"restartTransport must be a no-op when radioTransport == null (gate fired)",
)
assertFalse(
ConnectionState.DeviceSleep in stateEmissions,
"Gated restartTransport must NOT emit DeviceSleep " +
"(returned before onDisconnect(isPermanent=false)); emitted: $stateEmissions",
)
// Environmental recovery: networkAvailable flips back to true. connectionRequested is
// still true (neither the environmental stop nor the gated restart cleared it), so the
// recovery listener MUST call startTransportLocked() and create the second transport.
// This proves the new gate did not break the documented re-bring-up path.
networkAvailability.value = true
testDispatcher.scheduler.runCurrent()
advanceTimeBy(1_000L)
collectJob.cancel()
assertEquals(
2,
createdTransports.size,
"Network recovery must re-bring-up the transport after the gated no-op restart (gate still set)",
)
} finally {
service.disconnect()
advanceTimeBy(1_000L)
}
}
}

View File

@@ -64,6 +64,8 @@ class FakeRadioInterfaceService(override val serviceScope: CoroutineScope = Main
val sentToRadio = mutableListOf<ByteArray>()
var connectCalled = false
var restartTransportCalled: Boolean = false
private set
override fun isMockTransport(): Boolean = true
@@ -79,6 +81,10 @@ class FakeRadioInterfaceService(override val serviceScope: CoroutineScope = Main
connectCalled = false
}
override suspend fun restartTransport() {
restartTransportCalled = true
}
override fun getDeviceAddress(): String? = _currentDeviceAddressFlow.value
override fun setDeviceAddress(deviceAddr: String?): Boolean {

View File

@@ -27,16 +27,17 @@ import org.koin.core.annotation.KoinViewModel
import org.meshtastic.core.model.ConnectionState
import org.meshtastic.core.model.MyNodeInfo
import org.meshtastic.core.model.Node
import org.meshtastic.core.repository.ConnectionStateProvider
import org.meshtastic.core.repository.NodeRepository
import org.meshtastic.core.repository.RadioConfigRepository
import org.meshtastic.core.repository.ServiceRepository
import org.meshtastic.core.repository.UiPrefs
import org.meshtastic.proto.Config
import org.meshtastic.proto.LocalConfig
/**
* Derived, UI-friendly summary of the device connection state. Combines [ConnectionStateProvider.connectionState] with
* "region unset" to surface the MUST_SET_REGION case that otherwise needs a separate boolean flag in the UI layer.
* Derived, UI-friendly summary of the device connection state. Combines [ServiceRepository.connectionState] with
* "region unset" and the [ServiceRepository.RECONNECTING_PROGRESS_TEXT] handshake-recovery signal to surface cases
* (MUST_SET_REGION, RECONNECTING) that otherwise need separate boolean flags in the UI layer.
*/
enum class ConnectionStatus {
/** No device has been selected or we are otherwise disconnected. */
@@ -45,6 +46,12 @@ enum class ConnectionStatus {
/** A device has been selected and we are working through bonding/handshake. */
CONNECTING,
/**
* Transport is recovering from a WiFi/TCP handshake stall (the watchdog tore the link down and is bringing it back
* up). Distinct from [NOT_CONNECTED] so the UI can show an in-progress recovery instead of a final failure.
*/
RECONNECTING,
/** Connected with node info available. */
CONNECTED,
@@ -58,7 +65,7 @@ enum class ConnectionStatus {
@KoinViewModel
class ConnectionsViewModel(
radioConfigRepository: RadioConfigRepository,
connectionStateProvider: ConnectionStateProvider,
serviceRepository: ServiceRepository,
nodeRepository: NodeRepository,
private val uiPrefs: UiPrefs,
) : ViewModel() {
@@ -66,7 +73,7 @@ class ConnectionsViewModel(
val localConfig: StateFlow<LocalConfig> =
radioConfigRepository.localConfigFlow.stateInWhileSubscribed(initialValue = LocalConfig())
val connectionState = connectionStateProvider.connectionState
val connectionState = serviceRepository.connectionState
val myNodeInfo: StateFlow<MyNodeInfo?> = nodeRepository.myNodeInfo
@@ -95,18 +102,29 @@ class ConnectionsViewModel(
.stateInWhileSubscribed(initialValue = false)
/**
* Single source of truth for the UI's "connection status" pill/banner. Derived from [connectionState] and
* [regionUnset]; kept here rather than in the composable so the mapping is observable and testable.
* Single source of truth for the UI's "connection status" pill/banner. Derived from [connectionState],
* [ServiceRepository.connectionProgress], and [regionUnset]; kept here rather than in the composable so the mapping
* is observable and testable.
*
* The [ConnectionStatus.RECONNECTING] case is signalled by the WiFi/TCP handshake watchdog writing
* [ServiceRepository.RECONNECTING_PROGRESS_TEXT] to [ServiceRepository.connectionProgress] immediately before its
* recovery sibling transitions to [ConnectionState.Disconnected]. See
* [ServiceRepository.RECONNECTING_PROGRESS_TEXT] for the cross-track contract.
*/
val connectionStatus: StateFlow<ConnectionStatus> =
combine(connectionState, regionUnset) { state, unset ->
combine(connectionState, regionUnset, serviceRepository.connectionProgress) { state, unset, progress ->
when (state) {
is ConnectionState.Connected ->
if (unset) ConnectionStatus.MUST_SET_REGION else ConnectionStatus.CONNECTED
ConnectionState.Connecting -> ConnectionStatus.CONNECTING
ConnectionState.Disconnected -> ConnectionStatus.NOT_CONNECTED
ConnectionState.Disconnected ->
if (progress == ServiceRepository.RECONNECTING_PROGRESS_TEXT) {
ConnectionStatus.RECONNECTING
} else {
ConnectionStatus.NOT_CONNECTED
}
ConnectionState.DeviceSleep -> ConnectionStatus.CONNECTED_SLEEPING
}

View File

@@ -16,6 +16,7 @@
*/
package org.meshtastic.core.ui.viewmodel
import app.cash.turbine.test
import dev.mokkery.MockMode
import dev.mokkery.answering.returns
import dev.mokkery.every
@@ -27,8 +28,11 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.resetMain
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.test.setMain
import org.meshtastic.core.model.ConnectionState
import org.meshtastic.core.repository.RadioConfigRepository
import org.meshtastic.core.repository.ServiceRepository
import org.meshtastic.core.repository.UiPrefs
import org.meshtastic.core.testing.FakeNodeRepository
import org.meshtastic.core.testing.FakeServiceRepository
@@ -59,7 +63,7 @@ class ConnectionsViewModelTest {
viewModel =
ConnectionsViewModel(
radioConfigRepository = radioConfigRepository,
connectionStateProvider = serviceRepository,
serviceRepository = serviceRepository,
nodeRepository = nodeRepository,
uiPrefs = uiPrefs,
)
@@ -84,4 +88,67 @@ class ConnectionsViewModelTest {
assertEquals(true, viewModel.hasShownNotPairedWarning.value)
verify { uiPrefs.setHasShownNotPairedWarning(true) }
}
@Test
fun `Disconnected with Reconnecting progress maps to RECONNECTING`() = runTest {
viewModel.connectionStatus.test {
// Initial value from stateInWhileSubscribed.
assertEquals(ConnectionStatus.NOT_CONNECTED, awaitItem())
// Track A contract: progress is set BEFORE the Disconnected transition.
serviceRepository.setConnectionProgress(ServiceRepository.RECONNECTING_PROGRESS_TEXT)
assertEquals(ConnectionStatus.RECONNECTING, awaitItem())
// State catch-up: the Disconnected transition is a no-op because FakeServiceRepository
// starts Disconnected, and distinctUntilChanged suppresses the re-emission.
serviceRepository.setConnectionState(ConnectionState.Disconnected)
expectNoEvents()
cancelAndIgnoreRemainingEvents()
}
}
@Test
fun `Connecting state maps to CONNECTING regardless of progress text`() = runTest {
viewModel.connectionStatus.test {
assertEquals(ConnectionStatus.NOT_CONNECTED, awaitItem())
// Progress set first would transiently surface RECONNECTING while state is still Disconnected.
serviceRepository.setConnectionProgress(ServiceRepository.RECONNECTING_PROGRESS_TEXT)
assertEquals(ConnectionStatus.RECONNECTING, awaitItem())
// The Connecting transition overrides progress: Connecting always maps to CONNECTING.
serviceRepository.setConnectionState(ConnectionState.Connecting)
assertEquals(ConnectionStatus.CONNECTING, awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
@Test
fun `Disconnected without Reconnecting progress stays NOT_CONNECTED`() = runTest {
viewModel.connectionStatus.test {
assertEquals(ConnectionStatus.NOT_CONNECTED, awaitItem())
serviceRepository.setConnectionProgress("Downloading Node DB...")
serviceRepository.setConnectionState(ConnectionState.Disconnected)
// No emission: state and progress resolve to NOT_CONNECTED, equal to the current value.
expectNoEvents()
cancelAndIgnoreRemainingEvents()
}
}
/**
* Cross-track contract pin: Track A (MeshConnectionManagerImpl.runSiblingHandshakeRecovery) writes the literal
* "Reconnecting…" (with U+2026) to ServiceRepository.connectionProgress. This constant is what Track C compares
* against. If either side changes (e.g., localization, ASCII normalization), the UI would silently fall back to
* NOT_CONNECTED instead of RECONNECTING. This test pins the canonical constant in [ServiceRepository]; combined
* with the existing ordering test in MeshConnectionManagerImplTest that asserts the same literal is set, this
* transitively enforces the contract via the shared constant.
*/
@Test
fun `RECONNECTING_PROGRESS_TEXT pins the cross-track literal value`() {
assertEquals("Reconnecting\u2026", ServiceRepository.RECONNECTING_PROGRESS_TEXT)
}
}

View File

@@ -90,6 +90,10 @@ class NoopRadioInterfaceService : RadioInterfaceService {
logWarn("NoopRadioInterfaceService.disconnect()")
}
override suspend fun restartTransport() {
logWarn("NoopRadioInterfaceService.restartTransport()")
}
override fun getDeviceAddress(): String? = null
override fun setDeviceAddress(deviceAddr: String?): Boolean = false

View File

@@ -39,6 +39,7 @@ import org.meshtastic.core.resources.connected_sleeping
import org.meshtastic.core.resources.connecting
import org.meshtastic.core.resources.must_set_region
import org.meshtastic.core.resources.not_connected
import org.meshtastic.core.resources.reconnecting
import org.meshtastic.core.ui.viewmodel.ConnectionStatus
/**
@@ -59,6 +60,7 @@ fun ConnectingDeviceInfo(
ConnectionStatus.CONNECTED -> stringResource(Res.string.connected)
ConnectionStatus.MUST_SET_REGION -> stringResource(Res.string.must_set_region)
ConnectionStatus.CONNECTING -> connectionProgress ?: stringResource(Res.string.connecting)
ConnectionStatus.RECONNECTING -> stringResource(Res.string.reconnecting)
ConnectionStatus.CONNECTED_SLEEPING -> stringResource(Res.string.connected_sleeping)
ConnectionStatus.NOT_CONNECTED -> stringResource(Res.string.not_connected)
}