diff --git a/androidApp/src/main/kotlin/org/meshtastic/app/di/NetworkModule.kt b/androidApp/src/main/kotlin/org/meshtastic/app/di/NetworkModule.kt index bd88dafb8..81db475a0 100644 --- a/androidApp/src/main/kotlin/org/meshtastic/app/di/NetworkModule.kt +++ b/androidApp/src/main/kotlin/org/meshtastic/app/di/NetworkModule.kt @@ -101,7 +101,7 @@ class NetworkModule { install(plugin = ContentNegotiation) { json(json) } install(DefaultRequest) { url(HttpClientDefaults.API_BASE_URL) } install(plugin = HttpTimeout) { - requestTimeoutMillis = HttpClientDefaults.TIMEOUT_MS + requestTimeoutMillis = HttpClientDefaults.REQUEST_TIMEOUT_MS connectTimeoutMillis = HttpClientDefaults.TIMEOUT_MS socketTimeoutMillis = HttpClientDefaults.TIMEOUT_MS } diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/datasource/FirmwareReleaseLocalDataSource.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/datasource/FirmwareReleaseLocalDataSource.kt index 31f1845b7..2fb0feccb 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/datasource/FirmwareReleaseLocalDataSource.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/datasource/FirmwareReleaseLocalDataSource.kt @@ -34,17 +34,17 @@ class FirmwareReleaseLocalDataSource( private val firmwareReleaseDao get() = dbManager.currentDb.value.firmwareReleaseDao() - suspend fun insertFirmwareReleases( - firmwareReleases: List, - releaseType: FirmwareReleaseType, - ) = withContext(dispatchers.io) { - firmwareReleases.forEach { firmwareRelease -> - firmwareReleaseDao.insert(firmwareRelease.asEntity(releaseType)) - } - } - suspend fun deleteAllFirmwareReleases() = withContext(dispatchers.io) { firmwareReleaseDao.deleteAll() } + /** Transactionally replaces all rows of each given type with its API list; other types are untouched. */ + suspend fun replaceFirmwareReleases(releasesByType: Map>) = + withContext(dispatchers.io) { + firmwareReleaseDao.replaceByTypes( + types = releasesByType.keys.toList(), + releases = releasesByType.flatMap { (type, releases) -> releases.map { it.asEntity(type) } }, + ) + } + suspend fun getLatestRelease(releaseType: FirmwareReleaseType): FirmwareReleaseEntity? = withContext(dispatchers.io) { val releases = firmwareReleaseDao.getReleasesByType(releaseType) @@ -55,6 +55,4 @@ class FirmwareReleaseLocalDataSource( return@withContext latestRelease } } - - suspend fun hasAnyEntries(): Boolean = withContext(dispatchers.io) { firmwareReleaseDao.count() > 0 } } diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/DeviceHardwareRepositoryImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/DeviceHardwareRepositoryImpl.kt index f2e49bc44..34eee82c2 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/DeviceHardwareRepositoryImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/DeviceHardwareRepositoryImpl.kt @@ -17,7 +17,10 @@ package org.meshtastic.core.data.repository import co.touchlab.kermit.Logger -import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.async import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -53,8 +56,15 @@ class DeviceHardwareRepositoryImpl( private val dispatchers: CoroutineDispatchers, ) : DeviceHardwareRepository { - /** Single-flight guard so concurrent collectors don't duplicate the full-table refresh. */ - private val refreshMutex = Mutex() + /** Guards [inFlightRefresh] so concurrent callers share one full-table refresh. */ + private val refreshGuard = Mutex() + + private var inFlightRefresh: Deferred? = null + + // This @Single lives for the entire app lifetime, so the SupervisorJob is never cancelled. Refreshes run here + // so a caller that stops waiting (the node-details path bounds its wait) can't abort the shared fetch — slow + // api.meshtastic.org responses (measured 20-60s) still land in the DB for the next lookup. + private val refreshScope = CoroutineScope(dispatchers.io + SupervisorJob()) /** * Retrieves device hardware information by its model ID and optional target string. @@ -83,7 +93,7 @@ class DeviceHardwareRepositoryImpl( var entities = lookupEntities(hwModel, target) if (forceRefresh || entities.isEmpty() || entities.any { it.isStale() }) { - singleFlightRefresh() + singleFlightRefresh(maxWaitMs = NETWORK_REFRESH_TIMEOUT_MS) entities = lookupEntities(hwModel, target) } @@ -126,32 +136,35 @@ class DeviceHardwareRepositoryImpl( } /** - * Performs a single-flight network refresh: concurrent callers share one in-flight request rather than each - * triggering a full API fetch. + * Starts (or joins) a single shared refresh running in [refreshScope]. When [maxWaitMs] is set, the caller waits at + * most that long before falling back to cached data; the refresh itself always runs to completion, bounded only by + * the HttpClient's own timeout/retry policy. */ - private suspend fun singleFlightRefresh() { - refreshMutex.withLock { - safeCatching { - val remoteHardware = - withTimeoutOrNull(NETWORK_REFRESH_TIMEOUT_MS) { - Logger.d { "DeviceHardwareRepository: fetching from remote API" } - remoteDataSource.getAllDeviceHardware() - } - if (remoteHardware == null) { - Logger.w { - "DeviceHardwareRepository: network refresh timed out after ${NETWORK_REFRESH_TIMEOUT_MS}ms" - } - } else { - Logger.d { "DeviceHardwareRepository: remote returned ${remoteHardware.size} entries" } - withContext(NonCancellable + dispatchers.io) { - localDataSource.insertAllDeviceHardware(remoteHardware) - } - // Refresh msh.to device links from the API after a hardware refresh. Runs outside the hardware - // network timeout so that deadline can't cancel it mid-write. - deviceLinkRepository.reconcile() - } + private suspend fun singleFlightRefresh(maxWaitMs: Long? = null) { + val refresh = + refreshGuard.withLock { + inFlightRefresh?.takeIf { it.isActive } + ?: refreshScope + .async { + safeCatching { + Logger.d { "DeviceHardwareRepository: fetching from remote API" } + val remoteHardware = remoteDataSource.getAllDeviceHardware() + Logger.d { + "DeviceHardwareRepository: remote returned ${remoteHardware.size} entries" + } + localDataSource.insertAllDeviceHardware(remoteHardware) + // Refresh msh.to device links from the API after a hardware refresh. + deviceLinkRepository.reconcile() + } + .onFailure { e -> Logger.w(e) { "DeviceHardwareRepository: network refresh failed" } } + Unit + } + .also { inFlightRefresh = it } } - .onFailure { e -> Logger.w(e) { "DeviceHardwareRepository: network refresh failed" } } + if (maxWaitMs == null) { + refresh.join() + } else if (withTimeoutOrNull(maxWaitMs) { refresh.join() } == null) { + Logger.w { "DeviceHardwareRepository: refresh still in flight after ${maxWaitMs}ms; using cached data" } } } @@ -209,7 +222,7 @@ class DeviceHardwareRepositoryImpl( companion object { private val CACHE_EXPIRATION_TIME_MS = TimeConstants.ONE_DAY.inWholeMilliseconds - /** Maximum time to wait for the remote API before falling back to cached/bundled data. */ + /** Maximum time a blocking lookup waits for an in-flight refresh before returning cached/bundled data. */ private const val NETWORK_REFRESH_TIMEOUT_MS = 5_000L } } diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/DeviceLinkRepositoryImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/DeviceLinkRepositoryImpl.kt index e5af9fc88..cdb5d31e7 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/DeviceLinkRepositoryImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/DeviceLinkRepositoryImpl.kt @@ -18,14 +18,15 @@ package org.meshtastic.core.data.repository import co.touchlab.kermit.Logger import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map +import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext -import kotlinx.coroutines.withTimeoutOrNull import kotlinx.serialization.json.Json import org.koin.core.annotation.Single import org.meshtastic.core.common.util.nowMillis @@ -62,6 +63,9 @@ class DeviceLinkRepositoryImpl( /** Serializes seeding and network refreshes so concurrent collectors don't duplicate writes. */ private val writeMutex = Mutex() + /** Single-flights stale-triggered refreshes so concurrent collectors don't start duplicate fetches. */ + private val refreshMutex = Mutex() + @Volatile private var lastRefreshMillis = 0L override suspend fun ensureImported() { @@ -69,25 +73,23 @@ class DeviceLinkRepositoryImpl( } override suspend fun reconcile() { - writeMutex.withLock { - safeCatching { - // Bound only the network call by the timeout; the DB write runs after so a deadline can't cancel it - // mid-write and leave the cache half-pruned. - val remoteLinks = - withTimeoutOrNull(NETWORK_REFRESH_TIMEOUT_MS) { remoteDataSource.getDeviceLinks() } - if (remoteLinks == null) { - Logger.w { - "DeviceLinkRepository: network refresh timed out after ${NETWORK_REFRESH_TIMEOUT_MS}ms" - } - } else { - withContext(NonCancellable + dispatchers.io) { - store(remoteLinks) + safeCatching { + // The network call is bounded by the HttpClient's own timeout/retry policy — api.meshtastic.org has + // been measured taking 20-60s, so a short local deadline would cancel every refresh. It runs outside + // writeMutex so seeding (and therefore first emission) is never blocked behind a slow fetch. The DB + // write is NonCancellable so a cancelled caller can't leave the cache half-pruned. + val remoteLinks = remoteDataSource.getDeviceLinks() + writeMutex.withLock { + withContext(NonCancellable + dispatchers.io) { + // Only an applied payload counts as fresh — an ignored empty response must not suppress + // retries for a whole expiration window while the cache stays stale. + if (store(remoteLinks)) { lastRefreshMillis = nowMillis } } } - .onFailure { e -> Logger.w(e) { "DeviceLinkRepository: network refresh failed" } } } + .onFailure { e -> Logger.w(e) { "DeviceLinkRepository: network refresh failed" } } } override suspend fun getLinksForTarget(platformioTarget: String, regionCode: String): List = @@ -109,8 +111,12 @@ class DeviceLinkRepositoryImpl( override fun observeAllLinks(): Flow> = flow { ensureSeeded() - refreshIfStale() - emitAll(localDataSource.observeAll().map { entities -> entities.map { it.asExternalModel() } }) + coroutineScope { + // Refresh concurrently so a slow API response can't delay the first emission; Room re-emits when the + // refreshed rows land. + launch { refreshIfStale() } + emitAll(localDataSource.observeAll().map { entities -> entities.map { it.asExternalModel() } }) + } } /** Seeds the table from the bundled snapshot if empty (fresh install, data clear, radio switch). */ @@ -128,31 +134,30 @@ class DeviceLinkRepositoryImpl( } } - /** Best-effort network refresh, gated by [CACHE_EXPIRATION_TIME_MS]. */ + /** Best-effort network refresh, gated by [CACHE_EXPIRATION_TIME_MS] and single-flighted via [refreshMutex]. */ private suspend fun refreshIfStale() { - if (nowMillis - lastRefreshMillis > CACHE_EXPIRATION_TIME_MS) reconcile() + if (nowMillis - lastRefreshMillis <= CACHE_EXPIRATION_TIME_MS) return + refreshMutex.withLock { if (nowMillis - lastRefreshMillis > CACHE_EXPIRATION_TIME_MS) reconcile() } } /** * Maps resolved API links to the cached domain model, upserts them, and prunes short codes that no longer exist. * Internal links (GitHub, YouTube, …) are dropped — they never belong to a device's purchase section. An empty list - * is ignored rather than wiping the cache on a bad response. + * is ignored rather than wiping the cache on a bad response. Returns whether anything was stored. */ - private suspend fun store(networkLinks: List) { + private suspend fun store(networkLinks: List): Boolean { val links = networkLinks.filter { it.type != NetworkDeviceLink.TYPE_INTERNAL }.map { it.toDeviceLink() } if (links.isEmpty()) { Logger.w { "DeviceLinkRepository: no device links to store; leaving cache untouched" } - return + return false } localDataSource.upsertAll(links.map { it.asEntity() }) localDataSource.deleteNotIn(links.map { it.shortCode }) Logger.i { "DeviceLinkRepository: stored ${links.size} device links" } + return true } private companion object { private val CACHE_EXPIRATION_TIME_MS = TimeConstants.ONE_DAY.inWholeMilliseconds - - /** Maximum time to wait for the remote API before falling back to cached/bundled data. */ - private const val NETWORK_REFRESH_TIMEOUT_MS = 5_000L } } diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/FirmwareReleaseRepositoryImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/FirmwareReleaseRepositoryImpl.kt index d50c83ef4..68864fa22 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/FirmwareReleaseRepositoryImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/repository/FirmwareReleaseRepositoryImpl.kt @@ -31,12 +31,16 @@ import org.meshtastic.core.data.util.staleWhileRevalidateFlow import org.meshtastic.core.database.entity.FirmwareRelease import org.meshtastic.core.database.entity.FirmwareReleaseEntity import org.meshtastic.core.database.entity.FirmwareReleaseType +import org.meshtastic.core.database.entity.asDeviceVersion +import org.meshtastic.core.database.entity.asEntity import org.meshtastic.core.database.entity.asExternalModel import org.meshtastic.core.di.CoroutineDispatchers +import org.meshtastic.core.model.NetworkFirmwareRelease import org.meshtastic.core.model.NetworkFirmwareReleases import org.meshtastic.core.model.util.TimeConstants import org.meshtastic.core.network.FirmwareReleaseRemoteDataSource import org.meshtastic.core.repository.FirmwareReleaseRepository +import kotlin.concurrent.Volatile @Single open class FirmwareReleaseRepositoryImpl( @@ -50,6 +54,11 @@ open class FirmwareReleaseRepositoryImpl( /** Single-flight guard so concurrent collectors share one network refresh. */ private val refreshMutex = Mutex() + /** Guards [seedChecked] so concurrent collectors decode the bundled snapshot at most once per process. */ + private val seedMutex = Mutex() + + @Volatile private var seedChecked = false + override val stableRelease: Flow = getLatestFirmware(FirmwareReleaseType.STABLE) override val alphaRelease: Flow = getLatestFirmware(FirmwareReleaseType.ALPHA) @@ -64,7 +73,10 @@ open class FirmwareReleaseRepositoryImpl( }, fetch = { singleFlightRefresh() }, context = dispatchers.default, - networkTimeoutMs = NETWORK_REFRESH_TIMEOUT_MS, + // No collector blocks on the fetch (cache is emitted first), so let the HttpClient's own + // timeout/retry policy bound it — api.meshtastic.org routinely takes 20-60s to serve this list, + // and a short deadline here cancels every refresh, pinning users to the bundled seed data. + networkTimeoutMs = null, tag = "FirmwareReleaseRepository", ) @@ -72,26 +84,59 @@ open class FirmwareReleaseRepositoryImpl( localDataSource.deleteAllFirmwareReleases() } + /** + * Applies the bundled snapshot per release type whenever it is newer than what is cached for that type — not just + * when the cache is empty. The bundle is refreshed weekly in CI, so an app update carries fresh data even for users + * whose network path to api.meshtastic.org chronically fails. Checked once per process; a cache that is already + * newer (from a successful network refresh) is never regressed, and a type the bundle doesn't ship is left + * untouched. + */ private suspend fun ensureSeeded() { - if (!localDataSource.hasAnyEntries()) { + if (seedChecked) return + seedMutex.withLock { + if (seedChecked) return safeCatching { - Logger.d { "FirmwareReleaseRepository: seeding cache from bundled JSON" } - val jsonReleases = - assetReader.decode("firmware_releases.json", json) - ?: NetworkFirmwareReleases() - localDataSource.insertFirmwareReleases(jsonReleases.releases.stable, FirmwareReleaseType.STABLE) - localDataSource.insertFirmwareReleases(jsonReleases.releases.alpha, FirmwareReleaseType.ALPHA) + val bundled = assetReader.decode("firmware_releases.json", json)?.releases + if (bundled == null) { + Logger.w { "FirmwareReleaseRepository: no bundled releases available to seed from" } + } else { + val toApply = + listOf( + FirmwareReleaseType.STABLE to bundled.stable, + FirmwareReleaseType.ALPHA to bundled.alpha, + ) + .filter { (type, releases) -> isBundleNewerFor(type, releases) } + .toMap() + if (toApply.isNotEmpty()) { + Logger.i { "FirmwareReleaseRepository: applying bundled snapshot for ${toApply.keys}" } + localDataSource.replaceFirmwareReleases(toApply) + } + } } .onFailure { e -> Logger.w(e) { "FirmwareReleaseRepository: failed to seed cache from bundled JSON" } } + seedChecked = true } } + /** True when [bundled] contains a release newer than anything cached for [type]. */ + private suspend fun isBundleNewerFor(type: FirmwareReleaseType, bundled: List): Boolean { + val bundledNewest = bundled.maxOfOrNull { it.asEntity(type).asDeviceVersion() } ?: return false + val cachedNewest = localDataSource.getLatestRelease(type)?.asDeviceVersion() + return cachedNewest == null || bundledNewest > cachedNewest + } + private suspend fun singleFlightRefresh() { refreshMutex.withLock { Logger.d { "FirmwareReleaseRepository: fetching from remote API" } - val networkReleases = remoteDataSource.getFirmwareReleases() - localDataSource.insertFirmwareReleases(networkReleases.releases.stable, FirmwareReleaseType.STABLE) - localDataSource.insertFirmwareReleases(networkReleases.releases.alpha, FirmwareReleaseType.ALPHA) + val releases = remoteDataSource.getFirmwareReleases().releases + if (releases.stable.isEmpty() && releases.alpha.isEmpty()) { + Logger.w { "FirmwareReleaseRepository: remote returned no releases; leaving cache untouched" } + } else { + // Replace rather than upsert so releases pulled or reclassified upstream don't linger as "latest". + localDataSource.replaceFirmwareReleases( + mapOf(FirmwareReleaseType.STABLE to releases.stable, FirmwareReleaseType.ALPHA to releases.alpha), + ) + } } } @@ -99,8 +144,5 @@ open class FirmwareReleaseRepositoryImpl( companion object { private val CACHE_EXPIRATION_TIME_MS = TimeConstants.ONE_HOUR.inWholeMilliseconds - - /** Maximum time to wait for the remote API before falling back to cached/bundled data. */ - private const val NETWORK_REFRESH_TIMEOUT_MS = 5_000L } } diff --git a/core/data/src/jvmTest/kotlin/org/meshtastic/core/data/repository/FirmwareReleaseRepositoryImplTest.kt b/core/data/src/jvmTest/kotlin/org/meshtastic/core/data/repository/FirmwareReleaseRepositoryImplTest.kt new file mode 100644 index 000000000..2123e20c6 --- /dev/null +++ b/core/data/src/jvmTest/kotlin/org/meshtastic/core/data/repository/FirmwareReleaseRepositoryImplTest.kt @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.data.repository + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import kotlinx.serialization.json.Json +import okio.Buffer +import okio.Source +import org.meshtastic.core.data.datasource.BundledAssetReader +import org.meshtastic.core.data.datasource.FirmwareReleaseLocalDataSource +import org.meshtastic.core.database.entity.FirmwareReleaseEntity +import org.meshtastic.core.database.entity.FirmwareReleaseType +import org.meshtastic.core.di.CoroutineDispatchers +import org.meshtastic.core.model.NetworkDeviceHardware +import org.meshtastic.core.model.NetworkDeviceLinksResponse +import org.meshtastic.core.model.NetworkFirmwareRelease +import org.meshtastic.core.model.NetworkFirmwareReleases +import org.meshtastic.core.model.Releases +import org.meshtastic.core.network.FirmwareReleaseRemoteDataSource +import org.meshtastic.core.network.service.ApiService +import org.meshtastic.core.testing.FakeDatabaseProvider +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class FirmwareReleaseRepositoryImplTest { + + /** Only [getFirmwareReleases] is exercised; the other endpoints are never called by this repository. */ + private class FakeApiService(var response: NetworkFirmwareReleases) : ApiService { + override suspend fun getDeviceHardware(): List = error("unused") + + override suspend fun getDeviceLinks(): NetworkDeviceLinksResponse = error("unused") + + override suspend fun getFirmwareReleases(): NetworkFirmwareReleases = response + } + + /** Serves `firmware_releases.json` from [bundled] via the real decode path, or nothing when null. */ + private class FakeBundledAssetReader(var bundled: Releases? = null) : BundledAssetReader { + override fun open(name: String): Source? { + val releases = bundled ?: return null + if (name != "firmware_releases.json") return null + val bytes = Json.encodeToString(NetworkFirmwareReleases(releases = releases)).encodeToByteArray() + return Buffer().write(bytes) + } + } + + // Real dispatchers + runBlocking, NOT runTest — see DeviceLinkRepositoryImplTest for why (Room + virtual time). + private val unconfined = Dispatchers.Unconfined + private val dispatchers = CoroutineDispatchers(main = unconfined, io = unconfined, default = unconfined) + + private lateinit var dbProvider: FakeDatabaseProvider + private lateinit var api: FakeApiService + private lateinit var seed: FakeBundledAssetReader + private lateinit var repository: FirmwareReleaseRepositoryImpl + + private val dao + get() = dbProvider.currentDb.value.firmwareReleaseDao() + + private fun release(id: String) = NetworkFirmwareRelease(id = id, title = id, zipUrl = "https://example.com/$id") + + /** A cached row old enough that the repository always refreshes. */ + private fun staleRow(id: String, type: FirmwareReleaseType) = + FirmwareReleaseEntity(id = id, title = id, releaseType = type, lastUpdated = 0) + + @BeforeTest + fun setup() { + dbProvider = FakeDatabaseProvider() + api = FakeApiService(NetworkFirmwareReleases()) + seed = FakeBundledAssetReader() + repository = + FirmwareReleaseRepositoryImpl( + remoteDataSource = FirmwareReleaseRemoteDataSource(api, dispatchers), + localDataSource = FirmwareReleaseLocalDataSource(dbProvider, dispatchers), + assetReader = seed, + json = Json { ignoreUnknownKeys = true }, + dispatchers = dispatchers, + ) + } + + @AfterTest fun tearDown() = dbProvider.close() + + @Test + fun refreshPrunesRemovedAndReclassifiedReleases() = runBlocking { + // Old snapshot: 2.7.26 was published as alpha, 9.9.9 was pulled entirely upstream. + dao.insert(staleRow("v2.7.15.567b8ea", FirmwareReleaseType.STABLE)) + dao.insert(staleRow("v2.7.26.54e0d8d", FirmwareReleaseType.ALPHA)) + dao.insert(staleRow("v9.9.9.deadbee", FirmwareReleaseType.ALPHA)) + // Upstream promoted 2.7.26 to stable and dropped 9.9.9. + api.response = + NetworkFirmwareReleases( + releases = + Releases(stable = listOf(release("v2.7.26.54e0d8d")), alpha = listOf(release("v2.7.25.104df5f"))), + ) + + val stableEmissions = repository.stableRelease.toList() + + assertEquals("v2.7.15.567b8ea", stableEmissions.first()?.id, "cached value is emitted before the refresh") + assertEquals("v2.7.26.54e0d8d", stableEmissions.last()?.id, "refreshed value is emitted after the fetch") + assertEquals( + listOf("v2.7.25.104df5f"), + dao.getReleasesByType(FirmwareReleaseType.ALPHA).map { it.id }, + "pulled and reclassified releases are pruned from the alpha rows", + ) + } + + @Test + fun refreshLeavesLocalRowsUntouched() = runBlocking { + dao.insert(staleRow("v2.7.15.567b8ea", FirmwareReleaseType.STABLE)) + dao.insert(staleRow("local-import", FirmwareReleaseType.LOCAL)) + api.response = NetworkFirmwareReleases(releases = Releases(stable = listOf(release("v2.7.26.54e0d8d")))) + + repository.stableRelease.toList() + + assertEquals(listOf("local-import"), dao.getReleasesByType(FirmwareReleaseType.LOCAL).map { it.id }) + } + + @Test + fun emptyResponseLeavesCacheUntouched() = runBlocking { + dao.insert(staleRow("v2.7.15.567b8ea", FirmwareReleaseType.STABLE)) + api.response = NetworkFirmwareReleases() + + val emissions = repository.stableRelease.toList() + + assertEquals("v2.7.15.567b8ea", emissions.last()?.id) + assertTrue(dao.getReleasesByType(FirmwareReleaseType.STABLE).isNotEmpty()) + } + + @Test + fun emptyCacheSeedsFromBundledSnapshot() = runBlocking { + seed.bundled = Releases(stable = listOf(release("v2.7.15.567b8ea")), alpha = listOf(release("v2.7.25.104df5f"))) + api.response = NetworkFirmwareReleases() + + val emissions = repository.stableRelease.toList() + + assertEquals("v2.7.15.567b8ea", emissions.first()?.id) + assertEquals(listOf("v2.7.25.104df5f"), dao.getReleasesByType(FirmwareReleaseType.ALPHA).map { it.id }) + } + + @Test + fun newerBundledSnapshotReplacesOlderCache() = runBlocking { + // App update ships a fresher bundle than what a network-starved user has cached. + dao.insert(FirmwareReleaseEntity(id = "v2.7.15.567b8ea", releaseType = FirmwareReleaseType.STABLE)) + seed.bundled = Releases(stable = listOf(release("v2.7.26.54e0d8d"))) + + val emissions = repository.stableRelease.toList() + + assertEquals("v2.7.26.54e0d8d", emissions.first()?.id, "bundle applies before the first emission") + } + + @Test + fun partialBundleLeavesTypesItDoesNotShipUntouched() = runBlocking { + // Cache has data for both types; the bundle ships only a newer stable list. + dao.insert(FirmwareReleaseEntity(id = "v2.7.15.567b8ea", releaseType = FirmwareReleaseType.STABLE)) + dao.insert(FirmwareReleaseEntity(id = "v2.7.25.104df5f", releaseType = FirmwareReleaseType.ALPHA)) + seed.bundled = Releases(stable = listOf(release("v2.7.26.54e0d8d"))) + + val emissions = repository.stableRelease.toList() + + assertEquals("v2.7.26.54e0d8d", emissions.first()?.id, "newer bundled stable applies") + assertEquals( + listOf("v2.7.25.104df5f"), + dao.getReleasesByType(FirmwareReleaseType.ALPHA).map { it.id }, + "alpha cache survives a stable-only bundle", + ) + } + + @Test + fun olderBundledSnapshotNeverRegressesCache() = runBlocking { + // A successful network refresh left the cache newer than the (weekly) bundle. + dao.insert(FirmwareReleaseEntity(id = "v2.8.0.abc1234", releaseType = FirmwareReleaseType.STABLE)) + seed.bundled = Releases(stable = listOf(release("v2.7.26.54e0d8d"))) + + val emissions = repository.stableRelease.toList() + + assertEquals("v2.8.0.abc1234", emissions.first()?.id) + } +} diff --git a/core/database/src/commonMain/kotlin/org/meshtastic/core/database/dao/FirmwareReleaseDao.kt b/core/database/src/commonMain/kotlin/org/meshtastic/core/database/dao/FirmwareReleaseDao.kt index dfd5d3962..77e8f9aad 100644 --- a/core/database/src/commonMain/kotlin/org/meshtastic/core/database/dao/FirmwareReleaseDao.kt +++ b/core/database/src/commonMain/kotlin/org/meshtastic/core/database/dao/FirmwareReleaseDao.kt @@ -18,6 +18,7 @@ package org.meshtastic.core.database.dao import androidx.room3.Dao import androidx.room3.Query +import androidx.room3.Transaction import androidx.room3.Upsert import org.meshtastic.core.database.entity.FirmwareReleaseEntity import org.meshtastic.core.database.entity.FirmwareReleaseType @@ -29,6 +30,19 @@ interface FirmwareReleaseDao { @Query("DELETE FROM firmware_release") suspend fun deleteAll() + @Query("DELETE FROM firmware_release WHERE release_type = :releaseType") + suspend fun deleteByType(releaseType: FirmwareReleaseType) + + /** + * Replaces all rows of the given [types] with [releases] in one transaction, so releases removed or reclassified + * upstream are pruned and a crash mid-refresh can't leave the table half-written. Other types are untouched. + */ + @Transaction + suspend fun replaceByTypes(types: List, releases: List) { + types.forEach { deleteByType(it) } + releases.forEach { insert(it) } + } + @Query("SELECT * FROM firmware_release") suspend fun getAllReleases(): List diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/HttpClientDefaults.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/HttpClientDefaults.kt index 35694f87f..41a331bea 100644 --- a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/HttpClientDefaults.kt +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/HttpClientDefaults.kt @@ -25,9 +25,16 @@ import io.ktor.client.plugins.HttpRequestRetryConfig * Desktop) when installing [io.ktor.client.plugins.HttpTimeout] and [io.ktor.client.plugins.HttpRequestRetry]. */ object HttpClientDefaults { - /** Timeout in milliseconds for connect, request, and socket operations. */ + /** Timeout in milliseconds for connect and socket operations. */ const val TIMEOUT_MS = 30_000L + /** + * Timeout in milliseconds for a whole request. Deliberately generous: api.meshtastic.org has been measured taking + * 20-60s to serve `github/firmware/list` and `resource/deviceHardware`, and callers use stale-while-revalidate + * caching so nothing user-facing waits on this deadline. + */ + const val REQUEST_TIMEOUT_MS = 90_000L + /** Maximum number of automatic retries on server errors (5xx) and transient connection/IO failures. */ const val MAX_RETRIES = 3 diff --git a/desktopApp/src/main/kotlin/org/meshtastic/desktop/di/DesktopKoinModule.kt b/desktopApp/src/main/kotlin/org/meshtastic/desktop/di/DesktopKoinModule.kt index bb4615c84..7ba51244c 100644 --- a/desktopApp/src/main/kotlin/org/meshtastic/desktop/di/DesktopKoinModule.kt +++ b/desktopApp/src/main/kotlin/org/meshtastic/desktop/di/DesktopKoinModule.kt @@ -235,7 +235,7 @@ private fun desktopPlatformStubsModule() = module { install(ContentNegotiation) { json(get()) } install(DefaultRequest) { url(HttpClientDefaults.API_BASE_URL) } install(HttpTimeout) { - requestTimeoutMillis = HttpClientDefaults.TIMEOUT_MS + requestTimeoutMillis = HttpClientDefaults.REQUEST_TIMEOUT_MS connectTimeoutMillis = HttpClientDefaults.TIMEOUT_MS socketTimeoutMillis = HttpClientDefaults.TIMEOUT_MS }