From cfaf9a5ad218f5af83f45bb2aae74e7556489456 Mon Sep 17 00:00:00 2001 From: Torsten Grote Date: Thu, 26 Sep 2024 17:36:22 -0300 Subject: [PATCH] [db] Fix concurrent index updates If we are used to update the same repo at almost the same time, a race-condition can happen that tries to apply a diff to an already updated DB. We don't download anything while holding a DB transaction, so the download and check for the repo timestamp happens before we enter the transaction. However, we forgot to re-check the timestamp again within the transaction to be sure the DB state is still as expected. --- .../org/fdroid/index/v2/IndexV2UpdaterTest.kt | 65 ++++++++++++++++++- .../org/fdroid/index/v2/IndexV2Updater.kt | 6 ++ 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/libs/database/src/dbTest/java/org/fdroid/index/v2/IndexV2UpdaterTest.kt b/libs/database/src/dbTest/java/org/fdroid/index/v2/IndexV2UpdaterTest.kt index 66fc0bcc5..38db9bd6b 100644 --- a/libs/database/src/dbTest/java/org/fdroid/index/v2/IndexV2UpdaterTest.kt +++ b/libs/database/src/dbTest/java/org/fdroid/index/v2/IndexV2UpdaterTest.kt @@ -6,6 +6,12 @@ import io.mockk.Runs import io.mockk.every import io.mockk.just import io.mockk.mockk +import io.mockk.slot +import io.mockk.spyk +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.async +import kotlinx.coroutines.runBlocking import org.fdroid.CompatibilityChecker import org.fdroid.database.DbTest import org.fdroid.database.Repository @@ -26,6 +32,7 @@ import org.junit.Rule import org.junit.Test import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith +import java.util.concurrent.CountDownLatch import kotlin.test.assertEquals import kotlin.test.assertIs import kotlin.test.assertNull @@ -231,6 +238,60 @@ internal class IndexV2UpdaterTest : DbTest() { assertEquals(TWO, updatedRepo.formatVersion) } + @Test + @OptIn(DelicateCoroutinesApi::class) + fun concurrentUpdateTest() { + val db = spyk(db) // spy on the DB, so we can mock a call to aid in concurrency + indexUpdater = IndexV2Updater( + database = db, + tempFileProvider = tempFileProvider, + downloaderFactory = downloaderFactory, + compatibilityChecker = compatibilityChecker, + ) + val repoId = streamIndexV2IntoDb("index-empty-v2.json") + val repo1 = prepareUpdate( + repoId = repoId, + entryPath = "diff-empty-min/$SIGNED_FILE_NAME", + jsonPath = "diff-empty-min/23.json", + indexFileV2 = TestDataEntry.emptyToMin.diffs["23"] ?: fail() + ) + val repo2 = prepareUpdate( + repoId = repoId, + entryPath = "diff-empty-min/$SIGNED_FILE_NAME", + jsonPath = "diff-empty-min/23.json", + indexFileV2 = TestDataEntry.emptyToMin.diffs["23"] ?: fail() + ) + val latch = CountDownLatch(1) + val runSlot = slot() + every { db.runInTransaction(capture(runSlot)) } answers { + runSlot.captured.run() + latch.countDown() + } andThenAnswer { + latch.await() + runSlot.captured.run() + } + runBlocking { + GlobalScope.async { + val result1 = indexUpdater.update(repo1).noError() + assertEquals(IndexUpdateResult.Processed, result1) + + val entryFile = tmpFolder.newFile() + val indexFile = tmpFolder.newFile() + assets.open("diff-empty-min/$SIGNED_FILE_NAME").use { inputStream -> + entryFile.outputStream().use { inputStream.copyTo(it) } + } + assets.open("diff-empty-min/23.json").use { inputStream -> + indexFile.outputStream().use { inputStream.copyTo(it) } + } + every { tempFileProvider.createTempFile() } returnsMany listOf(entryFile, indexFile) + + val result2 = indexUpdater.update(repo2) + assertIs(result2) + assertIs(result2.e) + }.await() + } + } + private fun prepareUpdate( repoId: Long, entryPath: String, @@ -253,11 +314,11 @@ internal class IndexV2UpdaterTest : DbTest() { every { tempFileProvider.createTempFile() } returnsMany listOf(entryFile, indexFile) every { - downloaderFactory.createWithTryFirstMirror(repo, entryUri, entryFileV2, entryFile) + downloaderFactory.createWithTryFirstMirror(repo, entryUri, entryFileV2, any()) } returns downloader every { downloader.download() } just Runs every { - downloaderFactory.createWithTryFirstMirror(repo, indexUri, indexFileV2, indexFile) + downloaderFactory.createWithTryFirstMirror(repo, indexUri, indexFileV2, any()) } returns downloader every { downloader.download() } just Runs diff --git a/libs/database/src/main/java/org/fdroid/index/v2/IndexV2Updater.kt b/libs/database/src/main/java/org/fdroid/index/v2/IndexV2Updater.kt index 792027b86..c17154084 100644 --- a/libs/database/src/main/java/org/fdroid/index/v2/IndexV2Updater.kt +++ b/libs/database/src/main/java/org/fdroid/index/v2/IndexV2Updater.kt @@ -94,6 +94,12 @@ public class IndexV2Updater( file.inputStream().use { inputStream -> val repoDao = db.getRepositoryDao() db.runInTransaction { + // ensure somebody else hasn't updated the repo in the meantime + val currentTimestamp = repoDao.getRepository(repo.repoId)?.timestamp + if (currentTimestamp != repo.timestamp) throw ConcurrentModificationException( + "Repo timestamp expected ${repo.timestamp}, but was $currentTimestamp" + ) + // still the expected timestamp, so go on processing... streamProcessor.process(repoVersion, inputStream) { i -> listener?.onUpdateProgress(repo, i, entryFile.numPackages) }