diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/DeviceHardwareRepositoryImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/DeviceHardwareRepositoryImpl.kt index bb02284b9..cfb239a87 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/DeviceHardwareRepositoryImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/DeviceHardwareRepositoryImpl.kt @@ -17,6 +17,7 @@ package org.meshtastic.core.data.repository import co.touchlab.kermit.Logger +import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -98,7 +99,7 @@ class DeviceHardwareRepositoryImpl( shouldFetch = { cached -> cached == null || lookupEntities(hwModel, target).any { it.isStale() } }, fetch = { singleFlightRefresh() }, context = dispatchers.io, - networkTimeoutMs = NETWORK_REFRESH_TIMEOUT_MS, + networkTimeoutMs = null, tag = "DeviceHardwareRepository", ) @@ -127,18 +128,20 @@ class DeviceHardwareRepositoryImpl( private suspend fun singleFlightRefresh() { refreshMutex.withLock { safeCatching { - val completed = + val remoteHardware = withTimeoutOrNull(NETWORK_REFRESH_TIMEOUT_MS) { Logger.d { "DeviceHardwareRepository: fetching from remote API" } - val remoteHardware = remoteDataSource.getAllDeviceHardware() - Logger.d { "DeviceHardwareRepository: remote returned ${remoteHardware.size} entries" } - localDataSource.insertAllDeviceHardware(remoteHardware) + remoteDataSource.getAllDeviceHardware() } - if (completed == null) { + if (remoteHardware == null) { Logger.w { "DeviceHardwareRepository: network refresh timed out after ${NETWORK_REFRESH_TIMEOUT_MS}ms" } } else { + Logger.d { "DeviceHardwareRepository: remote returned ${remoteHardware.size} entries" } + withContext(NonCancellable + dispatchers.io) { + localDataSource.insertAllDeviceHardware(remoteHardware) + } // Refresh msh.to device links from the API after a hardware refresh. Runs outside the hardware // network timeout so that deadline can't cancel it mid-write. deviceLinkRepository.reconcile() diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/DeviceLinkRepositoryImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/DeviceLinkRepositoryImpl.kt index daa197377..67841218d 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/DeviceLinkRepositoryImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/DeviceLinkRepositoryImpl.kt @@ -17,6 +17,7 @@ package org.meshtastic.core.data.repository import co.touchlab.kermit.Logger +import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flow @@ -75,8 +76,10 @@ class DeviceLinkRepositoryImpl( "DeviceLinkRepository: network refresh timed out after ${NETWORK_REFRESH_TIMEOUT_MS}ms" } } else { - store(remoteLinks) - lastRefreshMillis = nowMillis + withContext(NonCancellable + dispatchers.io) { + store(remoteLinks) + lastRefreshMillis = nowMillis + } } } .onFailure { e -> Logger.w(e) { "DeviceLinkRepository: network refresh failed" } } diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/util/StaleWhileRevalidateFlow.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/util/StaleWhileRevalidateFlow.kt index 46bc97142..d66aaabb7 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/util/StaleWhileRevalidateFlow.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/util/StaleWhileRevalidateFlow.kt @@ -29,7 +29,7 @@ private const val DEFAULT_NETWORK_TIMEOUT_MS = 5_000L /** * Creates a cold Flow that implements the stale-while-revalidate caching pattern: * 1. Load and emit cached data immediately (UI never waits for network). - * 2. If [shouldFetch] returns true, attempt a network refresh bounded by [networkTimeoutMs]. + * 2. If [shouldFetch] returns true, attempt a refresh optionally bounded by [networkTimeoutMs]. * 3. Reload from cache and emit again if the data changed. * * The [fetch] lambda is expected to write results to the local cache as a side-effect; [loadFromCache] is called again @@ -42,7 +42,8 @@ private const val DEFAULT_NETWORK_TIMEOUT_MS = 5_000L * @param shouldFetch Decides whether a network refresh is needed based on the cached value (suspendable). * @param fetch Performs the network call and persists results locally (side-effect only). * @param context The coroutine context for cache/network operations. - * @param networkTimeoutMs Maximum time to wait for [fetch] before falling back to cached data. + * @param networkTimeoutMs Maximum time to wait for [fetch] before falling back to cached data. Pass null when [fetch] + * bounds its own network work and also performs local cache writes that must not be cancelled by this helper. * @param tag Logging tag for diagnostics. */ internal fun staleWhileRevalidateFlow( @@ -50,7 +51,7 @@ internal fun staleWhileRevalidateFlow( shouldFetch: suspend (T?) -> Boolean, fetch: suspend () -> Unit, context: CoroutineContext, - networkTimeoutMs: Long = DEFAULT_NETWORK_TIMEOUT_MS, + networkTimeoutMs: Long? = DEFAULT_NETWORK_TIMEOUT_MS, tag: String = "StaleWhileRevalidate", ): Flow = flow { val cached = loadFromCache() @@ -58,11 +59,22 @@ internal fun staleWhileRevalidateFlow( if (!shouldFetch(cached)) return@flow - val completed = - withTimeoutOrNull(networkTimeoutMs) { - safeCatching { fetch() }.onFailure { e -> Logger.w(e) { "$tag: network fetch failed" } } + suspend fun runFetch() { + safeCatching { fetch() }.onFailure { e -> Logger.w(e) { "$tag: network fetch failed" } } + } + + val timedOut = + if (networkTimeoutMs == null) { + runFetch() + false + } else { + withTimeoutOrNull(networkTimeoutMs) { + runFetch() + true + } != true } - if (completed == null) { + + if (timedOut) { Logger.w { "$tag: network fetch timed out after ${networkTimeoutMs}ms" } }