fix(data): separate refresh timeouts from Room persistence (#5881)

Co-authored-by: James Rich <james.a.rich@gmail.com>
Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Jeremiah K
2026-06-20 12:31:41 -05:00
committed by GitHub
parent 75f229e8e7
commit 63dbbddb0f
3 changed files with 33 additions and 15 deletions

View File

@@ -17,6 +17,7 @@
package org.meshtastic.core.data.repository
import co.touchlab.kermit.Logger
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
@@ -98,7 +99,7 @@ class DeviceHardwareRepositoryImpl(
shouldFetch = { cached -> cached == null || lookupEntities(hwModel, target).any { it.isStale() } },
fetch = { singleFlightRefresh() },
context = dispatchers.io,
networkTimeoutMs = NETWORK_REFRESH_TIMEOUT_MS,
networkTimeoutMs = null,
tag = "DeviceHardwareRepository",
)
@@ -127,18 +128,20 @@ class DeviceHardwareRepositoryImpl(
private suspend fun singleFlightRefresh() {
refreshMutex.withLock {
safeCatching {
val completed =
val remoteHardware =
withTimeoutOrNull(NETWORK_REFRESH_TIMEOUT_MS) {
Logger.d { "DeviceHardwareRepository: fetching from remote API" }
val remoteHardware = remoteDataSource.getAllDeviceHardware()
Logger.d { "DeviceHardwareRepository: remote returned ${remoteHardware.size} entries" }
localDataSource.insertAllDeviceHardware(remoteHardware)
remoteDataSource.getAllDeviceHardware()
}
if (completed == null) {
if (remoteHardware == null) {
Logger.w {
"DeviceHardwareRepository: network refresh timed out after ${NETWORK_REFRESH_TIMEOUT_MS}ms"
}
} else {
Logger.d { "DeviceHardwareRepository: remote returned ${remoteHardware.size} entries" }
withContext(NonCancellable + dispatchers.io) {
localDataSource.insertAllDeviceHardware(remoteHardware)
}
// Refresh msh.to device links from the API after a hardware refresh. Runs outside the hardware
// network timeout so that deadline can't cancel it mid-write.
deviceLinkRepository.reconcile()

View File

@@ -17,6 +17,7 @@
package org.meshtastic.core.data.repository
import co.touchlab.kermit.Logger
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
@@ -75,8 +76,10 @@ class DeviceLinkRepositoryImpl(
"DeviceLinkRepository: network refresh timed out after ${NETWORK_REFRESH_TIMEOUT_MS}ms"
}
} else {
store(remoteLinks)
lastRefreshMillis = nowMillis
withContext(NonCancellable + dispatchers.io) {
store(remoteLinks)
lastRefreshMillis = nowMillis
}
}
}
.onFailure { e -> Logger.w(e) { "DeviceLinkRepository: network refresh failed" } }

View File

@@ -29,7 +29,7 @@ private const val DEFAULT_NETWORK_TIMEOUT_MS = 5_000L
/**
* Creates a cold Flow that implements the stale-while-revalidate caching pattern:
* 1. Load and emit cached data immediately (UI never waits for network).
* 2. If [shouldFetch] returns true, attempt a network refresh bounded by [networkTimeoutMs].
* 2. If [shouldFetch] returns true, attempt a refresh optionally bounded by [networkTimeoutMs].
* 3. Reload from cache and emit again if the data changed.
*
* The [fetch] lambda is expected to write results to the local cache as a side-effect; [loadFromCache] is called again
@@ -42,7 +42,8 @@ private const val DEFAULT_NETWORK_TIMEOUT_MS = 5_000L
* @param shouldFetch Decides whether a network refresh is needed based on the cached value (suspendable).
* @param fetch Performs the network call and persists results locally (side-effect only).
* @param context The coroutine context for cache/network operations.
* @param networkTimeoutMs Maximum time to wait for [fetch] before falling back to cached data.
* @param networkTimeoutMs Maximum time to wait for [fetch] before falling back to cached data. Pass null when [fetch]
* bounds its own network work and also performs local cache writes that must not be cancelled by this helper.
* @param tag Logging tag for diagnostics.
*/
internal fun <T : Any> staleWhileRevalidateFlow(
@@ -50,7 +51,7 @@ internal fun <T : Any> staleWhileRevalidateFlow(
shouldFetch: suspend (T?) -> Boolean,
fetch: suspend () -> Unit,
context: CoroutineContext,
networkTimeoutMs: Long = DEFAULT_NETWORK_TIMEOUT_MS,
networkTimeoutMs: Long? = DEFAULT_NETWORK_TIMEOUT_MS,
tag: String = "StaleWhileRevalidate",
): Flow<T?> = flow {
val cached = loadFromCache()
@@ -58,11 +59,22 @@ internal fun <T : Any> staleWhileRevalidateFlow(
if (!shouldFetch(cached)) return@flow
val completed =
withTimeoutOrNull(networkTimeoutMs) {
safeCatching { fetch() }.onFailure { e -> Logger.w(e) { "$tag: network fetch failed" } }
suspend fun runFetch() {
safeCatching { fetch() }.onFailure { e -> Logger.w(e) { "$tag: network fetch failed" } }
}
val timedOut =
if (networkTimeoutMs == null) {
runFetch()
false
} else {
withTimeoutOrNull(networkTimeoutMs) {
runFetch()
true
} != true
}
if (completed == null) {
if (timedOut) {
Logger.w { "$tag: network fetch timed out after ${networkTimeoutMs}ms" }
}