mirror of
https://github.com/meshtastic/Meshtastic-Android.git
synced 2026-07-02 09:26:01 -04:00
fix(data): stale firmware/hardware caches — stop cancelling slow API refreshes, prune pulled releases, seed from newer bundles (#6060)
Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -101,7 +101,7 @@ class NetworkModule {
|
||||
install(plugin = ContentNegotiation) { json(json) }
|
||||
install(DefaultRequest) { url(HttpClientDefaults.API_BASE_URL) }
|
||||
install(plugin = HttpTimeout) {
|
||||
requestTimeoutMillis = HttpClientDefaults.TIMEOUT_MS
|
||||
requestTimeoutMillis = HttpClientDefaults.REQUEST_TIMEOUT_MS
|
||||
connectTimeoutMillis = HttpClientDefaults.TIMEOUT_MS
|
||||
socketTimeoutMillis = HttpClientDefaults.TIMEOUT_MS
|
||||
}
|
||||
|
||||
@@ -34,17 +34,17 @@ class FirmwareReleaseLocalDataSource(
|
||||
private val firmwareReleaseDao
|
||||
get() = dbManager.currentDb.value.firmwareReleaseDao()
|
||||
|
||||
suspend fun insertFirmwareReleases(
|
||||
firmwareReleases: List<NetworkFirmwareRelease>,
|
||||
releaseType: FirmwareReleaseType,
|
||||
) = withContext(dispatchers.io) {
|
||||
firmwareReleases.forEach { firmwareRelease ->
|
||||
firmwareReleaseDao.insert(firmwareRelease.asEntity(releaseType))
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun deleteAllFirmwareReleases() = withContext(dispatchers.io) { firmwareReleaseDao.deleteAll() }
|
||||
|
||||
/** Transactionally replaces all rows of each given type with its API list; other types are untouched. */
|
||||
suspend fun replaceFirmwareReleases(releasesByType: Map<FirmwareReleaseType, List<NetworkFirmwareRelease>>) =
|
||||
withContext(dispatchers.io) {
|
||||
firmwareReleaseDao.replaceByTypes(
|
||||
types = releasesByType.keys.toList(),
|
||||
releases = releasesByType.flatMap { (type, releases) -> releases.map { it.asEntity(type) } },
|
||||
)
|
||||
}
|
||||
|
||||
suspend fun getLatestRelease(releaseType: FirmwareReleaseType): FirmwareReleaseEntity? =
|
||||
withContext(dispatchers.io) {
|
||||
val releases = firmwareReleaseDao.getReleasesByType(releaseType)
|
||||
@@ -55,6 +55,4 @@ class FirmwareReleaseLocalDataSource(
|
||||
return@withContext latestRelease
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun hasAnyEntries(): Boolean = withContext(dispatchers.io) { firmwareReleaseDao.count() > 0 }
|
||||
}
|
||||
|
||||
@@ -17,7 +17,10 @@
|
||||
package org.meshtastic.core.data.repository
|
||||
|
||||
import co.touchlab.kermit.Logger
|
||||
import kotlinx.coroutines.NonCancellable
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Deferred
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
@@ -53,8 +56,15 @@ class DeviceHardwareRepositoryImpl(
|
||||
private val dispatchers: CoroutineDispatchers,
|
||||
) : DeviceHardwareRepository {
|
||||
|
||||
/** Single-flight guard so concurrent collectors don't duplicate the full-table refresh. */
|
||||
private val refreshMutex = Mutex()
|
||||
/** Guards [inFlightRefresh] so concurrent callers share one full-table refresh. */
|
||||
private val refreshGuard = Mutex()
|
||||
|
||||
private var inFlightRefresh: Deferred<Unit>? = null
|
||||
|
||||
// This @Single lives for the entire app lifetime, so the SupervisorJob is never cancelled. Refreshes run here
|
||||
// so a caller that stops waiting (the node-details path bounds its wait) can't abort the shared fetch — slow
|
||||
// api.meshtastic.org responses (measured 20-60s) still land in the DB for the next lookup.
|
||||
private val refreshScope = CoroutineScope(dispatchers.io + SupervisorJob())
|
||||
|
||||
/**
|
||||
* Retrieves device hardware information by its model ID and optional target string.
|
||||
@@ -83,7 +93,7 @@ class DeviceHardwareRepositoryImpl(
|
||||
|
||||
var entities = lookupEntities(hwModel, target)
|
||||
if (forceRefresh || entities.isEmpty() || entities.any { it.isStale() }) {
|
||||
singleFlightRefresh()
|
||||
singleFlightRefresh(maxWaitMs = NETWORK_REFRESH_TIMEOUT_MS)
|
||||
entities = lookupEntities(hwModel, target)
|
||||
}
|
||||
|
||||
@@ -126,32 +136,35 @@ class DeviceHardwareRepositoryImpl(
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs a single-flight network refresh: concurrent callers share one in-flight request rather than each
|
||||
* triggering a full API fetch.
|
||||
* Starts (or joins) a single shared refresh running in [refreshScope]. When [maxWaitMs] is set, the caller waits at
|
||||
* most that long before falling back to cached data; the refresh itself always runs to completion, bounded only by
|
||||
* the HttpClient's own timeout/retry policy.
|
||||
*/
|
||||
private suspend fun singleFlightRefresh() {
|
||||
refreshMutex.withLock {
|
||||
safeCatching {
|
||||
val remoteHardware =
|
||||
withTimeoutOrNull(NETWORK_REFRESH_TIMEOUT_MS) {
|
||||
Logger.d { "DeviceHardwareRepository: fetching from remote API" }
|
||||
remoteDataSource.getAllDeviceHardware()
|
||||
}
|
||||
if (remoteHardware == null) {
|
||||
Logger.w {
|
||||
"DeviceHardwareRepository: network refresh timed out after ${NETWORK_REFRESH_TIMEOUT_MS}ms"
|
||||
}
|
||||
} else {
|
||||
Logger.d { "DeviceHardwareRepository: remote returned ${remoteHardware.size} entries" }
|
||||
withContext(NonCancellable + dispatchers.io) {
|
||||
localDataSource.insertAllDeviceHardware(remoteHardware)
|
||||
}
|
||||
// Refresh msh.to device links from the API after a hardware refresh. Runs outside the hardware
|
||||
// network timeout so that deadline can't cancel it mid-write.
|
||||
deviceLinkRepository.reconcile()
|
||||
}
|
||||
private suspend fun singleFlightRefresh(maxWaitMs: Long? = null) {
|
||||
val refresh =
|
||||
refreshGuard.withLock {
|
||||
inFlightRefresh?.takeIf { it.isActive }
|
||||
?: refreshScope
|
||||
.async {
|
||||
safeCatching {
|
||||
Logger.d { "DeviceHardwareRepository: fetching from remote API" }
|
||||
val remoteHardware = remoteDataSource.getAllDeviceHardware()
|
||||
Logger.d {
|
||||
"DeviceHardwareRepository: remote returned ${remoteHardware.size} entries"
|
||||
}
|
||||
localDataSource.insertAllDeviceHardware(remoteHardware)
|
||||
// Refresh msh.to device links from the API after a hardware refresh.
|
||||
deviceLinkRepository.reconcile()
|
||||
}
|
||||
.onFailure { e -> Logger.w(e) { "DeviceHardwareRepository: network refresh failed" } }
|
||||
Unit
|
||||
}
|
||||
.also { inFlightRefresh = it }
|
||||
}
|
||||
.onFailure { e -> Logger.w(e) { "DeviceHardwareRepository: network refresh failed" } }
|
||||
if (maxWaitMs == null) {
|
||||
refresh.join()
|
||||
} else if (withTimeoutOrNull(maxWaitMs) { refresh.join() } == null) {
|
||||
Logger.w { "DeviceHardwareRepository: refresh still in flight after ${maxWaitMs}ms; using cached data" }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -209,7 +222,7 @@ class DeviceHardwareRepositoryImpl(
|
||||
companion object {
|
||||
private val CACHE_EXPIRATION_TIME_MS = TimeConstants.ONE_DAY.inWholeMilliseconds
|
||||
|
||||
/** Maximum time to wait for the remote API before falling back to cached/bundled data. */
|
||||
/** Maximum time a blocking lookup waits for an in-flight refresh before returning cached/bundled data. */
|
||||
private const val NETWORK_REFRESH_TIMEOUT_MS = 5_000L
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,14 +18,15 @@ package org.meshtastic.core.data.repository
|
||||
|
||||
import co.touchlab.kermit.Logger
|
||||
import kotlinx.coroutines.NonCancellable
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.emitAll
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlinx.coroutines.withTimeoutOrNull
|
||||
import kotlinx.serialization.json.Json
|
||||
import org.koin.core.annotation.Single
|
||||
import org.meshtastic.core.common.util.nowMillis
|
||||
@@ -62,6 +63,9 @@ class DeviceLinkRepositoryImpl(
|
||||
/** Serializes seeding and network refreshes so concurrent collectors don't duplicate writes. */
|
||||
private val writeMutex = Mutex()
|
||||
|
||||
/** Single-flights stale-triggered refreshes so concurrent collectors don't start duplicate fetches. */
|
||||
private val refreshMutex = Mutex()
|
||||
|
||||
@Volatile private var lastRefreshMillis = 0L
|
||||
|
||||
override suspend fun ensureImported() {
|
||||
@@ -69,25 +73,23 @@ class DeviceLinkRepositoryImpl(
|
||||
}
|
||||
|
||||
override suspend fun reconcile() {
|
||||
writeMutex.withLock {
|
||||
safeCatching {
|
||||
// Bound only the network call by the timeout; the DB write runs after so a deadline can't cancel it
|
||||
// mid-write and leave the cache half-pruned.
|
||||
val remoteLinks =
|
||||
withTimeoutOrNull(NETWORK_REFRESH_TIMEOUT_MS) { remoteDataSource.getDeviceLinks() }
|
||||
if (remoteLinks == null) {
|
||||
Logger.w {
|
||||
"DeviceLinkRepository: network refresh timed out after ${NETWORK_REFRESH_TIMEOUT_MS}ms"
|
||||
}
|
||||
} else {
|
||||
withContext(NonCancellable + dispatchers.io) {
|
||||
store(remoteLinks)
|
||||
safeCatching {
|
||||
// The network call is bounded by the HttpClient's own timeout/retry policy — api.meshtastic.org has
|
||||
// been measured taking 20-60s, so a short local deadline would cancel every refresh. It runs outside
|
||||
// writeMutex so seeding (and therefore first emission) is never blocked behind a slow fetch. The DB
|
||||
// write is NonCancellable so a cancelled caller can't leave the cache half-pruned.
|
||||
val remoteLinks = remoteDataSource.getDeviceLinks()
|
||||
writeMutex.withLock {
|
||||
withContext(NonCancellable + dispatchers.io) {
|
||||
// Only an applied payload counts as fresh — an ignored empty response must not suppress
|
||||
// retries for a whole expiration window while the cache stays stale.
|
||||
if (store(remoteLinks)) {
|
||||
lastRefreshMillis = nowMillis
|
||||
}
|
||||
}
|
||||
}
|
||||
.onFailure { e -> Logger.w(e) { "DeviceLinkRepository: network refresh failed" } }
|
||||
}
|
||||
.onFailure { e -> Logger.w(e) { "DeviceLinkRepository: network refresh failed" } }
|
||||
}
|
||||
|
||||
override suspend fun getLinksForTarget(platformioTarget: String, regionCode: String): List<DeviceLink> =
|
||||
@@ -109,8 +111,12 @@ class DeviceLinkRepositoryImpl(
|
||||
|
||||
override fun observeAllLinks(): Flow<List<DeviceLink>> = flow {
|
||||
ensureSeeded()
|
||||
refreshIfStale()
|
||||
emitAll(localDataSource.observeAll().map { entities -> entities.map { it.asExternalModel() } })
|
||||
coroutineScope {
|
||||
// Refresh concurrently so a slow API response can't delay the first emission; Room re-emits when the
|
||||
// refreshed rows land.
|
||||
launch { refreshIfStale() }
|
||||
emitAll(localDataSource.observeAll().map { entities -> entities.map { it.asExternalModel() } })
|
||||
}
|
||||
}
|
||||
|
||||
/** Seeds the table from the bundled snapshot if empty (fresh install, data clear, radio switch). */
|
||||
@@ -128,31 +134,30 @@ class DeviceLinkRepositoryImpl(
|
||||
}
|
||||
}
|
||||
|
||||
/** Best-effort network refresh, gated by [CACHE_EXPIRATION_TIME_MS]. */
|
||||
/** Best-effort network refresh, gated by [CACHE_EXPIRATION_TIME_MS] and single-flighted via [refreshMutex]. */
|
||||
private suspend fun refreshIfStale() {
|
||||
if (nowMillis - lastRefreshMillis > CACHE_EXPIRATION_TIME_MS) reconcile()
|
||||
if (nowMillis - lastRefreshMillis <= CACHE_EXPIRATION_TIME_MS) return
|
||||
refreshMutex.withLock { if (nowMillis - lastRefreshMillis > CACHE_EXPIRATION_TIME_MS) reconcile() }
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps resolved API links to the cached domain model, upserts them, and prunes short codes that no longer exist.
|
||||
* Internal links (GitHub, YouTube, …) are dropped — they never belong to a device's purchase section. An empty list
|
||||
* is ignored rather than wiping the cache on a bad response.
|
||||
* is ignored rather than wiping the cache on a bad response. Returns whether anything was stored.
|
||||
*/
|
||||
private suspend fun store(networkLinks: List<NetworkDeviceLink>) {
|
||||
private suspend fun store(networkLinks: List<NetworkDeviceLink>): Boolean {
|
||||
val links = networkLinks.filter { it.type != NetworkDeviceLink.TYPE_INTERNAL }.map { it.toDeviceLink() }
|
||||
if (links.isEmpty()) {
|
||||
Logger.w { "DeviceLinkRepository: no device links to store; leaving cache untouched" }
|
||||
return
|
||||
return false
|
||||
}
|
||||
localDataSource.upsertAll(links.map { it.asEntity() })
|
||||
localDataSource.deleteNotIn(links.map { it.shortCode })
|
||||
Logger.i { "DeviceLinkRepository: stored ${links.size} device links" }
|
||||
return true
|
||||
}
|
||||
|
||||
private companion object {
|
||||
private val CACHE_EXPIRATION_TIME_MS = TimeConstants.ONE_DAY.inWholeMilliseconds
|
||||
|
||||
/** Maximum time to wait for the remote API before falling back to cached/bundled data. */
|
||||
private const val NETWORK_REFRESH_TIMEOUT_MS = 5_000L
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,12 +31,16 @@ import org.meshtastic.core.data.util.staleWhileRevalidateFlow
|
||||
import org.meshtastic.core.database.entity.FirmwareRelease
|
||||
import org.meshtastic.core.database.entity.FirmwareReleaseEntity
|
||||
import org.meshtastic.core.database.entity.FirmwareReleaseType
|
||||
import org.meshtastic.core.database.entity.asDeviceVersion
|
||||
import org.meshtastic.core.database.entity.asEntity
|
||||
import org.meshtastic.core.database.entity.asExternalModel
|
||||
import org.meshtastic.core.di.CoroutineDispatchers
|
||||
import org.meshtastic.core.model.NetworkFirmwareRelease
|
||||
import org.meshtastic.core.model.NetworkFirmwareReleases
|
||||
import org.meshtastic.core.model.util.TimeConstants
|
||||
import org.meshtastic.core.network.FirmwareReleaseRemoteDataSource
|
||||
import org.meshtastic.core.repository.FirmwareReleaseRepository
|
||||
import kotlin.concurrent.Volatile
|
||||
|
||||
@Single
|
||||
open class FirmwareReleaseRepositoryImpl(
|
||||
@@ -50,6 +54,11 @@ open class FirmwareReleaseRepositoryImpl(
|
||||
/** Single-flight guard so concurrent collectors share one network refresh. */
|
||||
private val refreshMutex = Mutex()
|
||||
|
||||
/** Guards [seedChecked] so concurrent collectors decode the bundled snapshot at most once per process. */
|
||||
private val seedMutex = Mutex()
|
||||
|
||||
@Volatile private var seedChecked = false
|
||||
|
||||
override val stableRelease: Flow<FirmwareRelease?> = getLatestFirmware(FirmwareReleaseType.STABLE)
|
||||
|
||||
override val alphaRelease: Flow<FirmwareRelease?> = getLatestFirmware(FirmwareReleaseType.ALPHA)
|
||||
@@ -64,7 +73,10 @@ open class FirmwareReleaseRepositoryImpl(
|
||||
},
|
||||
fetch = { singleFlightRefresh() },
|
||||
context = dispatchers.default,
|
||||
networkTimeoutMs = NETWORK_REFRESH_TIMEOUT_MS,
|
||||
// No collector blocks on the fetch (cache is emitted first), so let the HttpClient's own
|
||||
// timeout/retry policy bound it — api.meshtastic.org routinely takes 20-60s to serve this list,
|
||||
// and a short deadline here cancels every refresh, pinning users to the bundled seed data.
|
||||
networkTimeoutMs = null,
|
||||
tag = "FirmwareReleaseRepository",
|
||||
)
|
||||
|
||||
@@ -72,26 +84,59 @@ open class FirmwareReleaseRepositoryImpl(
|
||||
localDataSource.deleteAllFirmwareReleases()
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies the bundled snapshot per release type whenever it is newer than what is cached for that type — not just
|
||||
* when the cache is empty. The bundle is refreshed weekly in CI, so an app update carries fresh data even for users
|
||||
* whose network path to api.meshtastic.org chronically fails. Checked once per process; a cache that is already
|
||||
* newer (from a successful network refresh) is never regressed, and a type the bundle doesn't ship is left
|
||||
* untouched.
|
||||
*/
|
||||
private suspend fun ensureSeeded() {
|
||||
if (!localDataSource.hasAnyEntries()) {
|
||||
if (seedChecked) return
|
||||
seedMutex.withLock {
|
||||
if (seedChecked) return
|
||||
safeCatching {
|
||||
Logger.d { "FirmwareReleaseRepository: seeding cache from bundled JSON" }
|
||||
val jsonReleases =
|
||||
assetReader.decode<NetworkFirmwareReleases>("firmware_releases.json", json)
|
||||
?: NetworkFirmwareReleases()
|
||||
localDataSource.insertFirmwareReleases(jsonReleases.releases.stable, FirmwareReleaseType.STABLE)
|
||||
localDataSource.insertFirmwareReleases(jsonReleases.releases.alpha, FirmwareReleaseType.ALPHA)
|
||||
val bundled = assetReader.decode<NetworkFirmwareReleases>("firmware_releases.json", json)?.releases
|
||||
if (bundled == null) {
|
||||
Logger.w { "FirmwareReleaseRepository: no bundled releases available to seed from" }
|
||||
} else {
|
||||
val toApply =
|
||||
listOf(
|
||||
FirmwareReleaseType.STABLE to bundled.stable,
|
||||
FirmwareReleaseType.ALPHA to bundled.alpha,
|
||||
)
|
||||
.filter { (type, releases) -> isBundleNewerFor(type, releases) }
|
||||
.toMap()
|
||||
if (toApply.isNotEmpty()) {
|
||||
Logger.i { "FirmwareReleaseRepository: applying bundled snapshot for ${toApply.keys}" }
|
||||
localDataSource.replaceFirmwareReleases(toApply)
|
||||
}
|
||||
}
|
||||
}
|
||||
.onFailure { e -> Logger.w(e) { "FirmwareReleaseRepository: failed to seed cache from bundled JSON" } }
|
||||
seedChecked = true
|
||||
}
|
||||
}
|
||||
|
||||
/** True when [bundled] contains a release newer than anything cached for [type]. */
|
||||
private suspend fun isBundleNewerFor(type: FirmwareReleaseType, bundled: List<NetworkFirmwareRelease>): Boolean {
|
||||
val bundledNewest = bundled.maxOfOrNull { it.asEntity(type).asDeviceVersion() } ?: return false
|
||||
val cachedNewest = localDataSource.getLatestRelease(type)?.asDeviceVersion()
|
||||
return cachedNewest == null || bundledNewest > cachedNewest
|
||||
}
|
||||
|
||||
private suspend fun singleFlightRefresh() {
|
||||
refreshMutex.withLock {
|
||||
Logger.d { "FirmwareReleaseRepository: fetching from remote API" }
|
||||
val networkReleases = remoteDataSource.getFirmwareReleases()
|
||||
localDataSource.insertFirmwareReleases(networkReleases.releases.stable, FirmwareReleaseType.STABLE)
|
||||
localDataSource.insertFirmwareReleases(networkReleases.releases.alpha, FirmwareReleaseType.ALPHA)
|
||||
val releases = remoteDataSource.getFirmwareReleases().releases
|
||||
if (releases.stable.isEmpty() && releases.alpha.isEmpty()) {
|
||||
Logger.w { "FirmwareReleaseRepository: remote returned no releases; leaving cache untouched" }
|
||||
} else {
|
||||
// Replace rather than upsert so releases pulled or reclassified upstream don't linger as "latest".
|
||||
localDataSource.replaceFirmwareReleases(
|
||||
mapOf(FirmwareReleaseType.STABLE to releases.stable, FirmwareReleaseType.ALPHA to releases.alpha),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,8 +144,5 @@ open class FirmwareReleaseRepositoryImpl(
|
||||
|
||||
companion object {
|
||||
private val CACHE_EXPIRATION_TIME_MS = TimeConstants.ONE_HOUR.inWholeMilliseconds
|
||||
|
||||
/** Maximum time to wait for the remote API before falling back to cached/bundled data. */
|
||||
private const val NETWORK_REFRESH_TIMEOUT_MS = 5_000L
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,195 @@
|
||||
/*
|
||||
* Copyright (c) 2026 Meshtastic LLC
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package org.meshtastic.core.data.repository
|
||||
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.flow.toList
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.serialization.json.Json
|
||||
import okio.Buffer
|
||||
import okio.Source
|
||||
import org.meshtastic.core.data.datasource.BundledAssetReader
|
||||
import org.meshtastic.core.data.datasource.FirmwareReleaseLocalDataSource
|
||||
import org.meshtastic.core.database.entity.FirmwareReleaseEntity
|
||||
import org.meshtastic.core.database.entity.FirmwareReleaseType
|
||||
import org.meshtastic.core.di.CoroutineDispatchers
|
||||
import org.meshtastic.core.model.NetworkDeviceHardware
|
||||
import org.meshtastic.core.model.NetworkDeviceLinksResponse
|
||||
import org.meshtastic.core.model.NetworkFirmwareRelease
|
||||
import org.meshtastic.core.model.NetworkFirmwareReleases
|
||||
import org.meshtastic.core.model.Releases
|
||||
import org.meshtastic.core.network.FirmwareReleaseRemoteDataSource
|
||||
import org.meshtastic.core.network.service.ApiService
|
||||
import org.meshtastic.core.testing.FakeDatabaseProvider
|
||||
import kotlin.test.AfterTest
|
||||
import kotlin.test.BeforeTest
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class FirmwareReleaseRepositoryImplTest {
|
||||
|
||||
/** Only [getFirmwareReleases] is exercised; the other endpoints are never called by this repository. */
|
||||
private class FakeApiService(var response: NetworkFirmwareReleases) : ApiService {
|
||||
override suspend fun getDeviceHardware(): List<NetworkDeviceHardware> = error("unused")
|
||||
|
||||
override suspend fun getDeviceLinks(): NetworkDeviceLinksResponse = error("unused")
|
||||
|
||||
override suspend fun getFirmwareReleases(): NetworkFirmwareReleases = response
|
||||
}
|
||||
|
||||
/** Serves `firmware_releases.json` from [bundled] via the real decode path, or nothing when null. */
|
||||
private class FakeBundledAssetReader(var bundled: Releases? = null) : BundledAssetReader {
|
||||
override fun open(name: String): Source? {
|
||||
val releases = bundled ?: return null
|
||||
if (name != "firmware_releases.json") return null
|
||||
val bytes = Json.encodeToString(NetworkFirmwareReleases(releases = releases)).encodeToByteArray()
|
||||
return Buffer().write(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
// Real dispatchers + runBlocking, NOT runTest — see DeviceLinkRepositoryImplTest for why (Room + virtual time).
|
||||
private val unconfined = Dispatchers.Unconfined
|
||||
private val dispatchers = CoroutineDispatchers(main = unconfined, io = unconfined, default = unconfined)
|
||||
|
||||
private lateinit var dbProvider: FakeDatabaseProvider
|
||||
private lateinit var api: FakeApiService
|
||||
private lateinit var seed: FakeBundledAssetReader
|
||||
private lateinit var repository: FirmwareReleaseRepositoryImpl
|
||||
|
||||
private val dao
|
||||
get() = dbProvider.currentDb.value.firmwareReleaseDao()
|
||||
|
||||
private fun release(id: String) = NetworkFirmwareRelease(id = id, title = id, zipUrl = "https://example.com/$id")
|
||||
|
||||
/** A cached row old enough that the repository always refreshes. */
|
||||
private fun staleRow(id: String, type: FirmwareReleaseType) =
|
||||
FirmwareReleaseEntity(id = id, title = id, releaseType = type, lastUpdated = 0)
|
||||
|
||||
@BeforeTest
|
||||
fun setup() {
|
||||
dbProvider = FakeDatabaseProvider()
|
||||
api = FakeApiService(NetworkFirmwareReleases())
|
||||
seed = FakeBundledAssetReader()
|
||||
repository =
|
||||
FirmwareReleaseRepositoryImpl(
|
||||
remoteDataSource = FirmwareReleaseRemoteDataSource(api, dispatchers),
|
||||
localDataSource = FirmwareReleaseLocalDataSource(dbProvider, dispatchers),
|
||||
assetReader = seed,
|
||||
json = Json { ignoreUnknownKeys = true },
|
||||
dispatchers = dispatchers,
|
||||
)
|
||||
}
|
||||
|
||||
@AfterTest fun tearDown() = dbProvider.close()
|
||||
|
||||
@Test
|
||||
fun refreshPrunesRemovedAndReclassifiedReleases() = runBlocking {
|
||||
// Old snapshot: 2.7.26 was published as alpha, 9.9.9 was pulled entirely upstream.
|
||||
dao.insert(staleRow("v2.7.15.567b8ea", FirmwareReleaseType.STABLE))
|
||||
dao.insert(staleRow("v2.7.26.54e0d8d", FirmwareReleaseType.ALPHA))
|
||||
dao.insert(staleRow("v9.9.9.deadbee", FirmwareReleaseType.ALPHA))
|
||||
// Upstream promoted 2.7.26 to stable and dropped 9.9.9.
|
||||
api.response =
|
||||
NetworkFirmwareReleases(
|
||||
releases =
|
||||
Releases(stable = listOf(release("v2.7.26.54e0d8d")), alpha = listOf(release("v2.7.25.104df5f"))),
|
||||
)
|
||||
|
||||
val stableEmissions = repository.stableRelease.toList()
|
||||
|
||||
assertEquals("v2.7.15.567b8ea", stableEmissions.first()?.id, "cached value is emitted before the refresh")
|
||||
assertEquals("v2.7.26.54e0d8d", stableEmissions.last()?.id, "refreshed value is emitted after the fetch")
|
||||
assertEquals(
|
||||
listOf("v2.7.25.104df5f"),
|
||||
dao.getReleasesByType(FirmwareReleaseType.ALPHA).map { it.id },
|
||||
"pulled and reclassified releases are pruned from the alpha rows",
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun refreshLeavesLocalRowsUntouched() = runBlocking {
|
||||
dao.insert(staleRow("v2.7.15.567b8ea", FirmwareReleaseType.STABLE))
|
||||
dao.insert(staleRow("local-import", FirmwareReleaseType.LOCAL))
|
||||
api.response = NetworkFirmwareReleases(releases = Releases(stable = listOf(release("v2.7.26.54e0d8d"))))
|
||||
|
||||
repository.stableRelease.toList()
|
||||
|
||||
assertEquals(listOf("local-import"), dao.getReleasesByType(FirmwareReleaseType.LOCAL).map { it.id })
|
||||
}
|
||||
|
||||
@Test
|
||||
fun emptyResponseLeavesCacheUntouched() = runBlocking {
|
||||
dao.insert(staleRow("v2.7.15.567b8ea", FirmwareReleaseType.STABLE))
|
||||
api.response = NetworkFirmwareReleases()
|
||||
|
||||
val emissions = repository.stableRelease.toList()
|
||||
|
||||
assertEquals("v2.7.15.567b8ea", emissions.last()?.id)
|
||||
assertTrue(dao.getReleasesByType(FirmwareReleaseType.STABLE).isNotEmpty())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun emptyCacheSeedsFromBundledSnapshot() = runBlocking {
|
||||
seed.bundled = Releases(stable = listOf(release("v2.7.15.567b8ea")), alpha = listOf(release("v2.7.25.104df5f")))
|
||||
api.response = NetworkFirmwareReleases()
|
||||
|
||||
val emissions = repository.stableRelease.toList()
|
||||
|
||||
assertEquals("v2.7.15.567b8ea", emissions.first()?.id)
|
||||
assertEquals(listOf("v2.7.25.104df5f"), dao.getReleasesByType(FirmwareReleaseType.ALPHA).map { it.id })
|
||||
}
|
||||
|
||||
@Test
|
||||
fun newerBundledSnapshotReplacesOlderCache() = runBlocking {
|
||||
// App update ships a fresher bundle than what a network-starved user has cached.
|
||||
dao.insert(FirmwareReleaseEntity(id = "v2.7.15.567b8ea", releaseType = FirmwareReleaseType.STABLE))
|
||||
seed.bundled = Releases(stable = listOf(release("v2.7.26.54e0d8d")))
|
||||
|
||||
val emissions = repository.stableRelease.toList()
|
||||
|
||||
assertEquals("v2.7.26.54e0d8d", emissions.first()?.id, "bundle applies before the first emission")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun partialBundleLeavesTypesItDoesNotShipUntouched() = runBlocking {
|
||||
// Cache has data for both types; the bundle ships only a newer stable list.
|
||||
dao.insert(FirmwareReleaseEntity(id = "v2.7.15.567b8ea", releaseType = FirmwareReleaseType.STABLE))
|
||||
dao.insert(FirmwareReleaseEntity(id = "v2.7.25.104df5f", releaseType = FirmwareReleaseType.ALPHA))
|
||||
seed.bundled = Releases(stable = listOf(release("v2.7.26.54e0d8d")))
|
||||
|
||||
val emissions = repository.stableRelease.toList()
|
||||
|
||||
assertEquals("v2.7.26.54e0d8d", emissions.first()?.id, "newer bundled stable applies")
|
||||
assertEquals(
|
||||
listOf("v2.7.25.104df5f"),
|
||||
dao.getReleasesByType(FirmwareReleaseType.ALPHA).map { it.id },
|
||||
"alpha cache survives a stable-only bundle",
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun olderBundledSnapshotNeverRegressesCache() = runBlocking {
|
||||
// A successful network refresh left the cache newer than the (weekly) bundle.
|
||||
dao.insert(FirmwareReleaseEntity(id = "v2.8.0.abc1234", releaseType = FirmwareReleaseType.STABLE))
|
||||
seed.bundled = Releases(stable = listOf(release("v2.7.26.54e0d8d")))
|
||||
|
||||
val emissions = repository.stableRelease.toList()
|
||||
|
||||
assertEquals("v2.8.0.abc1234", emissions.first()?.id)
|
||||
}
|
||||
}
|
||||
@@ -18,6 +18,7 @@ package org.meshtastic.core.database.dao
|
||||
|
||||
import androidx.room3.Dao
|
||||
import androidx.room3.Query
|
||||
import androidx.room3.Transaction
|
||||
import androidx.room3.Upsert
|
||||
import org.meshtastic.core.database.entity.FirmwareReleaseEntity
|
||||
import org.meshtastic.core.database.entity.FirmwareReleaseType
|
||||
@@ -29,6 +30,19 @@ interface FirmwareReleaseDao {
|
||||
@Query("DELETE FROM firmware_release")
|
||||
suspend fun deleteAll()
|
||||
|
||||
@Query("DELETE FROM firmware_release WHERE release_type = :releaseType")
|
||||
suspend fun deleteByType(releaseType: FirmwareReleaseType)
|
||||
|
||||
/**
|
||||
* Replaces all rows of the given [types] with [releases] in one transaction, so releases removed or reclassified
|
||||
* upstream are pruned and a crash mid-refresh can't leave the table half-written. Other types are untouched.
|
||||
*/
|
||||
@Transaction
|
||||
suspend fun replaceByTypes(types: List<FirmwareReleaseType>, releases: List<FirmwareReleaseEntity>) {
|
||||
types.forEach { deleteByType(it) }
|
||||
releases.forEach { insert(it) }
|
||||
}
|
||||
|
||||
@Query("SELECT * FROM firmware_release")
|
||||
suspend fun getAllReleases(): List<FirmwareReleaseEntity>
|
||||
|
||||
|
||||
@@ -25,9 +25,16 @@ import io.ktor.client.plugins.HttpRequestRetryConfig
|
||||
* Desktop) when installing [io.ktor.client.plugins.HttpTimeout] and [io.ktor.client.plugins.HttpRequestRetry].
|
||||
*/
|
||||
object HttpClientDefaults {
|
||||
/** Timeout in milliseconds for connect, request, and socket operations. */
|
||||
/** Timeout in milliseconds for connect and socket operations. */
|
||||
const val TIMEOUT_MS = 30_000L
|
||||
|
||||
/**
|
||||
* Timeout in milliseconds for a whole request. Deliberately generous: api.meshtastic.org has been measured taking
|
||||
* 20-60s to serve `github/firmware/list` and `resource/deviceHardware`, and callers use stale-while-revalidate
|
||||
* caching so nothing user-facing waits on this deadline.
|
||||
*/
|
||||
const val REQUEST_TIMEOUT_MS = 90_000L
|
||||
|
||||
/** Maximum number of automatic retries on server errors (5xx) and transient connection/IO failures. */
|
||||
const val MAX_RETRIES = 3
|
||||
|
||||
|
||||
@@ -235,7 +235,7 @@ private fun desktopPlatformStubsModule() = module {
|
||||
install(ContentNegotiation) { json(get<Json>()) }
|
||||
install(DefaultRequest) { url(HttpClientDefaults.API_BASE_URL) }
|
||||
install(HttpTimeout) {
|
||||
requestTimeoutMillis = HttpClientDefaults.TIMEOUT_MS
|
||||
requestTimeoutMillis = HttpClientDefaults.REQUEST_TIMEOUT_MS
|
||||
connectTimeoutMillis = HttpClientDefaults.TIMEOUT_MS
|
||||
socketTimeoutMillis = HttpClientDefaults.TIMEOUT_MS
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user