mirror of
https://github.com/bitfireAT/davx5-ose.git
synced 2025-12-23 23:17:50 -05:00
Make SyncManager suspending (first part) (#1451)
* Introduce coroutine dispatcher for sync operations * Make `logSyncTimeBlocking` and `insertOrReplace` suspend functions * Suspend first bunch of SyncManager methods; move sync dispatcher usage to SyncManager * [WIP] Fix tests TODO: extract test framework changes to separate PR * Remove mainDispatcher from SyncManagerTest * Remove explicit coroutineScope naming
This commit is contained in:
@@ -25,9 +25,10 @@ import dagger.hilt.android.testing.BindValue
|
||||
import dagger.hilt.android.testing.HiltAndroidRule
|
||||
import dagger.hilt.android.testing.HiltAndroidTest
|
||||
import io.mockk.every
|
||||
import io.mockk.impl.annotations.MockK
|
||||
import io.mockk.impl.annotations.RelaxedMockK
|
||||
import io.mockk.junit4.MockKRule
|
||||
import io.mockk.mockk
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import okhttp3.Protocol
|
||||
import okhttp3.internal.http.StatusLine
|
||||
import okhttp3.mockwebserver.MockResponse
|
||||
@@ -45,6 +46,12 @@ import javax.inject.Inject
|
||||
@HiltAndroidTest
|
||||
class SyncManagerTest {
|
||||
|
||||
@get:Rule
|
||||
val hiltRule = HiltAndroidRule(this)
|
||||
|
||||
@get:Rule
|
||||
val mockKRule = MockKRule(this)
|
||||
|
||||
@Inject
|
||||
lateinit var accountSettingsFactory: AccountSettings.Factory
|
||||
|
||||
@@ -58,24 +65,19 @@ class SyncManagerTest {
|
||||
lateinit var syncManagerFactory: TestSyncManager.Factory
|
||||
|
||||
@BindValue
|
||||
@MockK(relaxed = true)
|
||||
@RelaxedMockK
|
||||
lateinit var syncStatsRepository: DavSyncStatsRepository
|
||||
|
||||
@Inject
|
||||
lateinit var workerFactory: HiltWorkerFactory
|
||||
|
||||
@get:Rule
|
||||
val hiltRule = HiltAndroidRule(this)
|
||||
|
||||
@get:Rule
|
||||
val mockKRule = MockKRule(this)
|
||||
|
||||
private lateinit var account: Account
|
||||
private lateinit var server: MockWebServer
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
hiltRule.inject()
|
||||
|
||||
TestUtils.setUpWorkManager(context, workerFactory)
|
||||
|
||||
account = TestAccount.create()
|
||||
@@ -122,7 +124,7 @@ class SyncManagerTest {
|
||||
|
||||
|
||||
@Test
|
||||
fun testPerformSync_503RetryAfter_DelaySeconds() {
|
||||
fun testPerformSync_503RetryAfter_DelaySeconds() = runTest {
|
||||
server.enqueue(MockResponse()
|
||||
.setResponseCode(503)
|
||||
.setHeader("Retry-After", "60")) // 60 seconds
|
||||
@@ -139,7 +141,7 @@ class SyncManagerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testPerformSync_FirstSync_Empty() {
|
||||
fun testPerformSync_FirstSync_Empty() = runTest {
|
||||
val collection = LocalTestCollection() /* no last known ctag */
|
||||
server.enqueue(queryCapabilitiesResponse())
|
||||
|
||||
@@ -154,7 +156,7 @@ class SyncManagerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testPerformSync_UploadNewMember_ETagOnPut() {
|
||||
fun testPerformSync_UploadNewMember_ETagOnPut() = runTest {
|
||||
val collection = LocalTestCollection().apply {
|
||||
lastSyncState = SyncState(SyncState.Type.CTAG, "old-ctag")
|
||||
entries += LocalTestResource().apply {
|
||||
@@ -197,7 +199,7 @@ class SyncManagerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testPerformSync_UploadModifiedMember_ETagOnPut() {
|
||||
fun testPerformSync_UploadModifiedMember_ETagOnPut() = runTest {
|
||||
val collection = LocalTestCollection().apply {
|
||||
lastSyncState = SyncState(SyncState.Type.CTAG, "old-ctag")
|
||||
entries += LocalTestResource().apply {
|
||||
@@ -244,7 +246,7 @@ class SyncManagerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testPerformSync_UploadModifiedMember_NoETagOnPut() {
|
||||
fun testPerformSync_UploadModifiedMember_NoETagOnPut() = runTest {
|
||||
val collection = LocalTestCollection().apply {
|
||||
lastSyncState = SyncState(SyncState.Type.CTAG, "old-ctag")
|
||||
entries += LocalTestResource().apply {
|
||||
@@ -289,7 +291,7 @@ class SyncManagerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testPerformSync_UploadModifiedMember_412PreconditionFailed() {
|
||||
fun testPerformSync_UploadModifiedMember_412PreconditionFailed() = runTest {
|
||||
val collection = LocalTestCollection().apply {
|
||||
lastSyncState = SyncState(SyncState.Type.CTAG, "old-ctag")
|
||||
entries += LocalTestResource().apply {
|
||||
@@ -335,7 +337,7 @@ class SyncManagerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testPerformSync_NoopOnMemberWithSameETag() {
|
||||
fun testPerformSync_NoopOnMemberWithSameETag() = runTest {
|
||||
val collection = LocalTestCollection().apply {
|
||||
lastSyncState = SyncState(SyncState.Type.CTAG, "ctag1")
|
||||
entries += LocalTestResource().apply {
|
||||
@@ -372,7 +374,7 @@ class SyncManagerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testPerformSync_DownloadNewMember() {
|
||||
fun testPerformSync_DownloadNewMember() = runTest {
|
||||
val collection = LocalTestCollection().apply {
|
||||
lastSyncState = SyncState(SyncState.Type.CTAG, "old-ctag")
|
||||
}
|
||||
@@ -406,7 +408,7 @@ class SyncManagerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testPerformSync_DownloadUpdatedMember() {
|
||||
fun testPerformSync_DownloadUpdatedMember() = runTest {
|
||||
val collection = LocalTestCollection().apply {
|
||||
lastSyncState = SyncState(SyncState.Type.CTAG, "old-ctag")
|
||||
entries += LocalTestResource().apply {
|
||||
@@ -444,7 +446,7 @@ class SyncManagerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testPerformSync_RemoveVanishedMember() {
|
||||
fun testPerformSync_RemoveVanishedMember() = runTest {
|
||||
val collection = LocalTestCollection().apply {
|
||||
lastSyncState = SyncState(SyncState.Type.CTAG, "old-ctag")
|
||||
entries += LocalTestResource().apply {
|
||||
@@ -464,7 +466,7 @@ class SyncManagerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testPerformSync_CTagDidntChange() {
|
||||
fun testPerformSync_CTagDidntChange() = runTest {
|
||||
val collection = LocalTestCollection().apply {
|
||||
lastSyncState = SyncState(SyncState.Type.CTAG, "ctag1")
|
||||
}
|
||||
|
||||
@@ -11,13 +11,14 @@ import at.bitfire.dav4jvm.Response
|
||||
import at.bitfire.dav4jvm.property.caldav.GetCTag
|
||||
import at.bitfire.davdroid.db.Collection
|
||||
import at.bitfire.davdroid.db.SyncState
|
||||
import at.bitfire.davdroid.di.SyncDispatcher
|
||||
import at.bitfire.davdroid.network.HttpClient
|
||||
import at.bitfire.davdroid.resource.LocalResource
|
||||
import at.bitfire.davdroid.settings.AccountSettings
|
||||
import at.bitfire.davdroid.util.DavUtils.lastSegment
|
||||
import dagger.assisted.Assisted
|
||||
import dagger.assisted.AssistedFactory
|
||||
import dagger.assisted.AssistedInject
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import okhttp3.HttpUrl
|
||||
import okhttp3.RequestBody
|
||||
import okhttp3.RequestBody.Companion.toRequestBody
|
||||
@@ -30,7 +31,8 @@ class TestSyncManager @AssistedInject constructor(
|
||||
@Assisted httpClient: HttpClient,
|
||||
@Assisted syncResult: SyncResult,
|
||||
@Assisted localCollection: LocalTestCollection,
|
||||
@Assisted collection: Collection
|
||||
@Assisted collection: Collection,
|
||||
@SyncDispatcher syncDispatcher: CoroutineDispatcher
|
||||
): SyncManager<LocalTestResource, LocalTestCollection, DavCollection>(
|
||||
account,
|
||||
httpClient,
|
||||
@@ -38,7 +40,8 @@ class TestSyncManager @AssistedInject constructor(
|
||||
authority,
|
||||
syncResult,
|
||||
localCollection,
|
||||
collection
|
||||
collection,
|
||||
syncDispatcher
|
||||
) {
|
||||
|
||||
@AssistedFactory
|
||||
|
||||
@@ -14,7 +14,7 @@ import kotlinx.coroutines.flow.Flow
|
||||
interface SyncStatsDao {
|
||||
|
||||
@Insert(onConflict = OnConflictStrategy.REPLACE)
|
||||
fun insertOrReplace(syncStats: SyncStats)
|
||||
suspend fun insertOrReplace(syncStats: SyncStats)
|
||||
|
||||
@Query("SELECT * FROM syncstats WHERE collectionId=:id")
|
||||
fun getByCollectionIdFlow(id: Long): Flow<List<SyncStats>>
|
||||
|
||||
@@ -40,7 +40,7 @@ class DavSyncStatsRepository @Inject constructor(
|
||||
}
|
||||
}
|
||||
|
||||
fun logSyncTimeBlocking(collectionId: Long, authority: String, lastSync: Long = System.currentTimeMillis()) {
|
||||
suspend fun logSyncTime(collectionId: Long, authority: String, lastSync: Long = System.currentTimeMillis()) {
|
||||
dao.insertOrReplace(SyncStats(
|
||||
id = 0,
|
||||
collectionId = collectionId,
|
||||
|
||||
@@ -18,6 +18,7 @@ import at.bitfire.davdroid.sync.account.setAndVerifyUserData
|
||||
import dagger.assisted.Assisted
|
||||
import dagger.assisted.AssistedFactory
|
||||
import dagger.assisted.AssistedInject
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import java.util.logging.Level
|
||||
|
||||
/**
|
||||
@@ -108,7 +109,9 @@ class AddressBookSyncer @AssistedInject constructor(
|
||||
addressBook,
|
||||
collection
|
||||
)
|
||||
syncManager.performSync()
|
||||
runBlocking {
|
||||
syncManager.performSync()
|
||||
}
|
||||
|
||||
} catch(e: Exception) {
|
||||
logger.log(Level.SEVERE, "Couldn't sync contacts", e)
|
||||
|
||||
@@ -20,6 +20,7 @@ import at.bitfire.dav4jvm.property.webdav.SyncToken
|
||||
import at.bitfire.davdroid.R
|
||||
import at.bitfire.davdroid.db.Collection
|
||||
import at.bitfire.davdroid.db.SyncState
|
||||
import at.bitfire.davdroid.di.SyncDispatcher
|
||||
import at.bitfire.davdroid.network.HttpClient
|
||||
import at.bitfire.davdroid.resource.LocalCalendar
|
||||
import at.bitfire.davdroid.resource.LocalEvent
|
||||
@@ -32,6 +33,7 @@ import at.bitfire.ical4android.util.DateUtils
|
||||
import dagger.assisted.Assisted
|
||||
import dagger.assisted.AssistedFactory
|
||||
import dagger.assisted.AssistedInject
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import net.fortuna.ical4j.model.Component
|
||||
import net.fortuna.ical4j.model.component.VAlarm
|
||||
import net.fortuna.ical4j.model.property.Action
|
||||
@@ -56,7 +58,8 @@ class CalendarSyncManager @AssistedInject constructor(
|
||||
@Assisted syncResult: SyncResult,
|
||||
@Assisted localCalendar: LocalCalendar,
|
||||
@Assisted collection: Collection,
|
||||
private val accountSettingsFactory: AccountSettings.Factory
|
||||
accountSettingsFactory: AccountSettings.Factory,
|
||||
@SyncDispatcher syncDispatcher: CoroutineDispatcher
|
||||
): SyncManager<LocalEvent, LocalCalendar, DavCalendar>(
|
||||
account,
|
||||
httpClient,
|
||||
@@ -64,7 +67,8 @@ class CalendarSyncManager @AssistedInject constructor(
|
||||
authority,
|
||||
syncResult,
|
||||
localCalendar,
|
||||
collection
|
||||
collection,
|
||||
syncDispatcher
|
||||
) {
|
||||
|
||||
@AssistedFactory
|
||||
@@ -144,7 +148,7 @@ class CalendarSyncManager @AssistedInject constructor(
|
||||
return super.processLocallyDeleted()
|
||||
}
|
||||
|
||||
override fun uploadDirty(): Boolean {
|
||||
override suspend fun uploadDirty(): Boolean {
|
||||
var modified = false
|
||||
if (localCollection.readOnly) {
|
||||
for (event in localCollection.findDirty()) {
|
||||
|
||||
@@ -15,6 +15,7 @@ import at.bitfire.ical4android.AndroidCalendar
|
||||
import dagger.assisted.Assisted
|
||||
import dagger.assisted.AssistedFactory
|
||||
import dagger.assisted.AssistedInject
|
||||
import kotlinx.coroutines.runBlocking
|
||||
|
||||
/**
|
||||
* Sync logic for calendars
|
||||
@@ -64,7 +65,9 @@ class CalendarSyncer @AssistedInject constructor(
|
||||
localCollection,
|
||||
remoteCollection
|
||||
)
|
||||
syncManager.performSync()
|
||||
runBlocking {
|
||||
syncManager.performSync()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -24,6 +24,7 @@ import at.bitfire.dav4jvm.property.webdav.SyncToken
|
||||
import at.bitfire.davdroid.R
|
||||
import at.bitfire.davdroid.db.Collection
|
||||
import at.bitfire.davdroid.db.SyncState
|
||||
import at.bitfire.davdroid.di.SyncDispatcher
|
||||
import at.bitfire.davdroid.network.HttpClient
|
||||
import at.bitfire.davdroid.resource.LocalAddress
|
||||
import at.bitfire.davdroid.resource.LocalAddressBook
|
||||
@@ -44,6 +45,7 @@ import dagger.assisted.AssistedFactory
|
||||
import dagger.assisted.AssistedInject
|
||||
import ezvcard.VCardVersion
|
||||
import ezvcard.io.CannotParseException
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import okhttp3.HttpUrl
|
||||
import okhttp3.HttpUrl.Companion.toHttpUrlOrNull
|
||||
import okhttp3.MediaType
|
||||
@@ -104,7 +106,8 @@ class ContactsSyncManager @AssistedInject constructor(
|
||||
@Assisted collection: Collection,
|
||||
val dirtyVerifier: Optional<ContactDirtyVerifier>,
|
||||
accountSettingsFactory: AccountSettings.Factory,
|
||||
private val httpClientBuilder: HttpClient.Builder
|
||||
private val httpClientBuilder: HttpClient.Builder,
|
||||
@SyncDispatcher syncDispatcher: CoroutineDispatcher
|
||||
): SyncManager<LocalAddress, LocalAddressBook, DavAddressBook>(
|
||||
account,
|
||||
httpClient,
|
||||
@@ -112,7 +115,8 @@ class ContactsSyncManager @AssistedInject constructor(
|
||||
authority,
|
||||
syncResult,
|
||||
localAddressBook,
|
||||
collection
|
||||
collection,
|
||||
syncDispatcher
|
||||
) {
|
||||
|
||||
@AssistedFactory
|
||||
@@ -228,7 +232,7 @@ class ContactsSyncManager @AssistedInject constructor(
|
||||
// mirror deletions to remote collection (DELETE)
|
||||
super.processLocallyDeleted()
|
||||
|
||||
override fun uploadDirty(): Boolean {
|
||||
override suspend fun uploadDirty(): Boolean {
|
||||
var modified = false
|
||||
|
||||
if (localCollection.readOnly) {
|
||||
|
||||
@@ -19,6 +19,7 @@ import at.bitfire.dav4jvm.property.webdav.SyncToken
|
||||
import at.bitfire.davdroid.R
|
||||
import at.bitfire.davdroid.db.Collection
|
||||
import at.bitfire.davdroid.db.SyncState
|
||||
import at.bitfire.davdroid.di.SyncDispatcher
|
||||
import at.bitfire.davdroid.network.HttpClient
|
||||
import at.bitfire.davdroid.resource.LocalJtxCollection
|
||||
import at.bitfire.davdroid.resource.LocalJtxICalObject
|
||||
@@ -29,6 +30,7 @@ import at.bitfire.ical4android.JtxICalObject
|
||||
import dagger.assisted.Assisted
|
||||
import dagger.assisted.AssistedFactory
|
||||
import dagger.assisted.AssistedInject
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import okhttp3.HttpUrl
|
||||
import okhttp3.RequestBody
|
||||
import okhttp3.RequestBody.Companion.toRequestBody
|
||||
@@ -44,7 +46,8 @@ class JtxSyncManager @AssistedInject constructor(
|
||||
@Assisted authority: String,
|
||||
@Assisted syncResult: SyncResult,
|
||||
@Assisted localCollection: LocalJtxCollection,
|
||||
@Assisted collection: Collection
|
||||
@Assisted collection: Collection,
|
||||
@SyncDispatcher syncDispatcher: CoroutineDispatcher
|
||||
): SyncManager<LocalJtxICalObject, LocalJtxCollection, DavCalendar>(
|
||||
account,
|
||||
httpClient,
|
||||
@@ -52,7 +55,8 @@ class JtxSyncManager @AssistedInject constructor(
|
||||
authority,
|
||||
syncResult,
|
||||
localCollection,
|
||||
collection
|
||||
collection,
|
||||
syncDispatcher
|
||||
) {
|
||||
|
||||
@AssistedFactory
|
||||
|
||||
@@ -16,6 +16,7 @@ import at.bitfire.ical4android.TaskProvider
|
||||
import dagger.assisted.Assisted
|
||||
import dagger.assisted.AssistedFactory
|
||||
import dagger.assisted.AssistedInject
|
||||
import kotlinx.coroutines.runBlocking
|
||||
|
||||
/**
|
||||
* Sync logic for jtx board
|
||||
@@ -77,7 +78,9 @@ class JtxSyncer @AssistedInject constructor(
|
||||
localCollection,
|
||||
remoteCollection
|
||||
)
|
||||
syncManager.performSync()
|
||||
runBlocking {
|
||||
syncManager.performSync()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -41,9 +41,10 @@ import at.bitfire.davdroid.sync.account.InvalidAccountException
|
||||
import at.bitfire.ical4android.CalendarStorageException
|
||||
import at.bitfire.vcard4android.ContactsStorageException
|
||||
import dagger.hilt.android.qualifiers.ApplicationContext
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.withContext
|
||||
import okhttp3.HttpUrl
|
||||
import okhttp3.RequestBody
|
||||
import java.io.IOException
|
||||
@@ -79,7 +80,8 @@ abstract class SyncManager<ResourceType: LocalResource<*>, out CollectionType: L
|
||||
val authority: String,
|
||||
val syncResult: SyncResult,
|
||||
val localCollection: CollectionType,
|
||||
val collection: Collection
|
||||
val collection: Collection,
|
||||
val syncDispatcher: CoroutineDispatcher
|
||||
) {
|
||||
|
||||
enum class SyncAlgorithm {
|
||||
@@ -87,13 +89,6 @@ abstract class SyncManager<ResourceType: LocalResource<*>, out CollectionType: L
|
||||
COLLECTION_SYNC
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
||||
/** Maximum number of resources that are requested with one multiget request. */
|
||||
const val MAX_MULTIGET_RESOURCES = 10
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Inject
|
||||
lateinit var accountRepository: AccountRepository
|
||||
@@ -135,7 +130,7 @@ abstract class SyncManager<ResourceType: LocalResource<*>, out CollectionType: L
|
||||
} ?: emptyMap()
|
||||
}
|
||||
|
||||
fun performSync() {
|
||||
suspend fun performSync() = withContext(syncDispatcher) {
|
||||
// dismiss previous error notifications
|
||||
syncNotificationManager.dismissInvalidResource(localCollectionTag = localCollection.tag)
|
||||
|
||||
@@ -143,9 +138,9 @@ abstract class SyncManager<ResourceType: LocalResource<*>, out CollectionType: L
|
||||
logger.info("Preparing synchronization")
|
||||
if (!prepare()) {
|
||||
logger.info("No reason to synchronize, aborting")
|
||||
return
|
||||
return@withContext
|
||||
}
|
||||
syncStatsRepository.logSyncTimeBlocking(collection.id, authority)
|
||||
syncStatsRepository.logSyncTime(collection.id, authority)
|
||||
|
||||
logger.info("Querying server capabilities")
|
||||
var remoteSyncState = queryCapabilities()
|
||||
@@ -366,11 +361,10 @@ abstract class SyncManager<ResourceType: LocalResource<*>, out CollectionType: L
|
||||
*
|
||||
* @return whether local resources have been processed so that a synchronization is always necessary
|
||||
*/
|
||||
protected open fun uploadDirty(): Boolean {
|
||||
protected open suspend fun uploadDirty(): Boolean {
|
||||
var numUploaded = 0
|
||||
|
||||
// upload dirty resources (parallelized)
|
||||
runBlocking {
|
||||
coroutineScope { // structured concurrency
|
||||
for (local in localCollection.findDirty())
|
||||
launch {
|
||||
SyncException.wrapWithLocalResource(local) {
|
||||
@@ -546,80 +540,78 @@ abstract class SyncManager<ResourceType: LocalResource<*>, out CollectionType: L
|
||||
*
|
||||
* @param listRemote function to list remote resources (for instance, all since a certain sync-token)
|
||||
*/
|
||||
protected open fun syncRemote(listRemote: (MultiResponseCallback) -> Unit) {
|
||||
runBlocking {
|
||||
// download queue
|
||||
val toDownload = LinkedBlockingQueue<HttpUrl>()
|
||||
fun download(url: HttpUrl?) {
|
||||
if (url != null)
|
||||
toDownload += url
|
||||
protected open suspend fun syncRemote(listRemote: (MultiResponseCallback) -> Unit) = coroutineScope { // structured concurrency
|
||||
// download queue
|
||||
val toDownload = LinkedBlockingQueue<HttpUrl>()
|
||||
fun download(url: HttpUrl?) {
|
||||
if (url != null)
|
||||
toDownload += url
|
||||
|
||||
if (toDownload.size >= MAX_MULTIGET_RESOURCES || url == null) {
|
||||
while (toDownload.isNotEmpty()) {
|
||||
val bunch = LinkedList<HttpUrl>()
|
||||
toDownload.drainTo(bunch, MAX_MULTIGET_RESOURCES)
|
||||
launch {
|
||||
downloadRemote(bunch)
|
||||
}
|
||||
if (toDownload.size >= MAX_MULTIGET_RESOURCES || url == null) {
|
||||
while (toDownload.isNotEmpty()) {
|
||||
val bunch = LinkedList<HttpUrl>()
|
||||
toDownload.drainTo(bunch, MAX_MULTIGET_RESOURCES)
|
||||
launch {
|
||||
downloadRemote(bunch)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
coroutineScope { // structured concurrency: blocks until all inner coroutines are finished
|
||||
listRemote { response, relation ->
|
||||
// ignore non-members
|
||||
if (relation != Response.HrefRelation.MEMBER)
|
||||
return@listRemote
|
||||
|
||||
// ignore collections
|
||||
if (response[at.bitfire.dav4jvm.property.webdav.ResourceType::class.java]?.types?.contains(at.bitfire.dav4jvm.property.webdav.ResourceType.COLLECTION) == true)
|
||||
return@listRemote
|
||||
|
||||
val name = response.hrefName()
|
||||
|
||||
if (response.isSuccess()) {
|
||||
logger.fine("Found remote resource: $name")
|
||||
|
||||
launch {
|
||||
val local = localCollection.findByName(name)
|
||||
SyncException.wrapWithLocalResource(local) {
|
||||
if (local == null) {
|
||||
logger.info("$name has been added remotely, queueing download")
|
||||
download(response.href)
|
||||
} else {
|
||||
val localETag = local.eTag
|
||||
val remoteETag = response[GetETag::class.java]?.eTag
|
||||
?: throw DavException("Server didn't provide ETag")
|
||||
if (localETag == remoteETag) {
|
||||
logger.info("$name has not been changed on server (ETag still $remoteETag)")
|
||||
} else {
|
||||
logger.info("$name has been changed on server (current ETag=$remoteETag, last known ETag=$localETag)")
|
||||
download(response.href)
|
||||
}
|
||||
|
||||
// mark as remotely present, so that this resource won't be deleted at the end
|
||||
local.updateFlags(LocalResource.FLAG_REMOTELY_PRESENT)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} else if (response.status?.code == HttpURLConnection.HTTP_NOT_FOUND) {
|
||||
// collection sync: resource has been deleted on remote server
|
||||
launch {
|
||||
localCollection.findByName(name)?.let { local ->
|
||||
SyncException.wrapWithLocalResource(local) {
|
||||
logger.info("$name has been deleted on server, deleting locally")
|
||||
local.delete()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// download remaining resources
|
||||
download(null)
|
||||
}
|
||||
|
||||
coroutineScope { // structured concurrency
|
||||
listRemote { response, relation ->
|
||||
// ignore non-members
|
||||
if (relation != Response.HrefRelation.MEMBER)
|
||||
return@listRemote
|
||||
|
||||
// ignore collections
|
||||
if (response[at.bitfire.dav4jvm.property.webdav.ResourceType::class.java]?.types?.contains(at.bitfire.dav4jvm.property.webdav.ResourceType.COLLECTION) == true)
|
||||
return@listRemote
|
||||
|
||||
val name = response.hrefName()
|
||||
|
||||
if (response.isSuccess()) {
|
||||
logger.fine("Found remote resource: $name")
|
||||
|
||||
launch {
|
||||
val local = localCollection.findByName(name)
|
||||
SyncException.wrapWithLocalResource(local) {
|
||||
if (local == null) {
|
||||
logger.info("$name has been added remotely, queueing download")
|
||||
download(response.href)
|
||||
} else {
|
||||
val localETag = local.eTag
|
||||
val remoteETag = response[GetETag::class.java]?.eTag
|
||||
?: throw DavException("Server didn't provide ETag")
|
||||
if (localETag == remoteETag) {
|
||||
logger.info("$name has not been changed on server (ETag still $remoteETag)")
|
||||
} else {
|
||||
logger.info("$name has been changed on server (current ETag=$remoteETag, last known ETag=$localETag)")
|
||||
download(response.href)
|
||||
}
|
||||
|
||||
// mark as remotely present, so that this resource won't be deleted at the end
|
||||
local.updateFlags(LocalResource.FLAG_REMOTELY_PRESENT)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} else if (response.status?.code == HttpURLConnection.HTTP_NOT_FOUND) {
|
||||
// collection sync: resource has been deleted on remote server
|
||||
launch {
|
||||
localCollection.findByName(name)?.let { local ->
|
||||
SyncException.wrapWithLocalResource(local) {
|
||||
logger.info("$name has been deleted on server, deleting locally")
|
||||
local.delete()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// download remaining resources
|
||||
download(null)
|
||||
}
|
||||
|
||||
protected abstract fun listAllRemote(callback: MultiResponseCallback)
|
||||
@@ -771,4 +763,12 @@ abstract class SyncManager<ResourceType: LocalResource<*>, out CollectionType: L
|
||||
|
||||
protected abstract fun notifyInvalidResourceTitle(): String
|
||||
|
||||
|
||||
companion object {
|
||||
|
||||
/** Maximum number of resources that are requested with one multiget request. */
|
||||
const val MAX_MULTIGET_RESOURCES = 10
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -16,6 +16,7 @@ import at.bitfire.ical4android.TaskProvider
|
||||
import dagger.assisted.Assisted
|
||||
import dagger.assisted.AssistedFactory
|
||||
import dagger.assisted.AssistedInject
|
||||
import kotlinx.coroutines.runBlocking
|
||||
|
||||
/**
|
||||
* Sync logic for tasks in CalDAV collections ({@code VTODO}).
|
||||
@@ -78,7 +79,9 @@ class TaskSyncer @AssistedInject constructor(
|
||||
localCollection,
|
||||
remoteCollection
|
||||
)
|
||||
syncManager.performSync()
|
||||
runBlocking {
|
||||
syncManager.performSync()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -18,6 +18,7 @@ import at.bitfire.dav4jvm.property.webdav.SyncToken
|
||||
import at.bitfire.davdroid.R
|
||||
import at.bitfire.davdroid.db.Collection
|
||||
import at.bitfire.davdroid.db.SyncState
|
||||
import at.bitfire.davdroid.di.SyncDispatcher
|
||||
import at.bitfire.davdroid.network.HttpClient
|
||||
import at.bitfire.davdroid.resource.LocalResource
|
||||
import at.bitfire.davdroid.resource.LocalTask
|
||||
@@ -28,6 +29,7 @@ import at.bitfire.ical4android.Task
|
||||
import dagger.assisted.Assisted
|
||||
import dagger.assisted.AssistedFactory
|
||||
import dagger.assisted.AssistedInject
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import okhttp3.HttpUrl
|
||||
import okhttp3.RequestBody
|
||||
import okhttp3.RequestBody.Companion.toRequestBody
|
||||
@@ -46,7 +48,8 @@ class TasksSyncManager @AssistedInject constructor(
|
||||
@Assisted authority: String,
|
||||
@Assisted syncResult: SyncResult,
|
||||
@Assisted localCollection: LocalTaskList,
|
||||
@Assisted collection: Collection
|
||||
@Assisted collection: Collection,
|
||||
@SyncDispatcher syncDispatcher: CoroutineDispatcher
|
||||
): SyncManager<LocalTask, LocalTaskList, DavCalendar>(
|
||||
account,
|
||||
httpClient,
|
||||
@@ -54,7 +57,8 @@ class TasksSyncManager @AssistedInject constructor(
|
||||
authority,
|
||||
syncResult,
|
||||
localCollection,
|
||||
collection
|
||||
collection,
|
||||
syncDispatcher
|
||||
) {
|
||||
|
||||
@AssistedFactory
|
||||
|
||||
@@ -36,10 +36,8 @@ import at.bitfire.davdroid.sync.worker.BaseSyncWorker.Companion.commonTag
|
||||
import at.bitfire.davdroid.ui.NotificationRegistry
|
||||
import at.bitfire.ical4android.TaskProvider
|
||||
import dagger.Lazy
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.runInterruptible
|
||||
import kotlinx.coroutines.withContext
|
||||
import java.util.Collections
|
||||
import java.util.logging.Level
|
||||
import java.util.logging.Logger
|
||||
@@ -47,8 +45,7 @@ import javax.inject.Inject
|
||||
|
||||
abstract class BaseSyncWorker(
|
||||
context: Context,
|
||||
private val workerParams: WorkerParameters,
|
||||
private val syncDispatcher: CoroutineDispatcher
|
||||
private val workerParams: WorkerParameters
|
||||
) : CoroutineWorker(context, workerParams) {
|
||||
|
||||
@Inject
|
||||
@@ -142,7 +139,7 @@ abstract class BaseSyncWorker(
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun doSyncWork(account: Account, dataType: SyncDataType): Result = withContext(syncDispatcher) {
|
||||
suspend fun doSyncWork(account: Account, dataType: SyncDataType): Result {
|
||||
logger.info("Running ${javaClass.name}: account=$account, dataType=$dataType")
|
||||
|
||||
// pass possibly supplied flags to the selected syncer
|
||||
@@ -176,7 +173,7 @@ abstract class BaseSyncWorker(
|
||||
taskSyncer.create(account, currentProvider, extras, syncResult)
|
||||
else -> {
|
||||
logger.warning("No valid tasks provider found, aborting sync")
|
||||
return@withContext Result.failure()
|
||||
return Result.failure()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -208,7 +205,7 @@ abstract class BaseSyncWorker(
|
||||
delay(blockDuration * 1000)
|
||||
|
||||
logger.warning("Retrying on soft error (attempt $runAttemptCount of $MAX_RUN_ATTEMPTS)")
|
||||
return@withContext Result.retry()
|
||||
return Result.retry()
|
||||
}
|
||||
|
||||
logger.warning("Max retries on soft errors reached ($runAttemptCount of $MAX_RUN_ATTEMPTS). Treating as failed")
|
||||
@@ -225,7 +222,7 @@ abstract class BaseSyncWorker(
|
||||
}
|
||||
|
||||
output.putBoolean(OUTPUT_TOO_MANY_RETRIES, true)
|
||||
return@withContext Result.failure(output.build())
|
||||
return Result.failure(output.build())
|
||||
}
|
||||
|
||||
// If no soft error found, dismiss sync error notification
|
||||
@@ -239,12 +236,12 @@ abstract class BaseSyncWorker(
|
||||
// Note: SyncManager should have notified the user
|
||||
if (syncResult.hasHardError()) {
|
||||
logger.log(Level.WARNING, "Hard error while syncing", syncResult)
|
||||
return@withContext Result.failure(output.build())
|
||||
return Result.failure(output.build())
|
||||
}
|
||||
}
|
||||
|
||||
logger.log(Level.INFO, "Sync worker succeeded", syncResult)
|
||||
return@withContext Result.success(output.build())
|
||||
return Result.success(output.build())
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -12,12 +12,10 @@ import androidx.work.ForegroundInfo
|
||||
import androidx.work.WorkManager
|
||||
import androidx.work.WorkerParameters
|
||||
import at.bitfire.davdroid.R
|
||||
import at.bitfire.davdroid.di.SyncDispatcher
|
||||
import at.bitfire.davdroid.sync.SyncDataType
|
||||
import at.bitfire.davdroid.ui.NotificationRegistry
|
||||
import dagger.assisted.Assisted
|
||||
import dagger.assisted.AssistedInject
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
|
||||
/**
|
||||
* One-time sync worker.
|
||||
@@ -29,9 +27,8 @@ import kotlinx.coroutines.CoroutineDispatcher
|
||||
@HiltWorker
|
||||
class OneTimeSyncWorker @AssistedInject constructor(
|
||||
@Assisted appContext: Context,
|
||||
@Assisted workerParams: WorkerParameters,
|
||||
@SyncDispatcher syncDispatcher: CoroutineDispatcher
|
||||
) : BaseSyncWorker(appContext, workerParams, syncDispatcher) {
|
||||
@Assisted workerParams: WorkerParameters
|
||||
) : BaseSyncWorker(appContext, workerParams) {
|
||||
|
||||
/**
|
||||
* Used by WorkManager to show a foreground service notification for expedited jobs on Android <12.
|
||||
|
||||
@@ -10,12 +10,10 @@ import androidx.annotation.VisibleForTesting
|
||||
import androidx.hilt.work.HiltWorker
|
||||
import androidx.work.WorkManager
|
||||
import androidx.work.WorkerParameters
|
||||
import at.bitfire.davdroid.di.SyncDispatcher
|
||||
import at.bitfire.davdroid.sync.SyncDataType
|
||||
import dagger.assisted.Assisted
|
||||
import dagger.assisted.AssistedFactory
|
||||
import dagger.assisted.AssistedInject
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
|
||||
/**
|
||||
* Handles scheduled sync requests.
|
||||
@@ -36,9 +34,8 @@ import kotlinx.coroutines.CoroutineDispatcher
|
||||
@HiltWorker
|
||||
class PeriodicSyncWorker @AssistedInject constructor(
|
||||
@Assisted appContext: Context,
|
||||
@Assisted workerParams: WorkerParameters,
|
||||
@SyncDispatcher syncDispatcher: CoroutineDispatcher
|
||||
) : BaseSyncWorker(appContext, workerParams, syncDispatcher) {
|
||||
@Assisted workerParams: WorkerParameters
|
||||
) : BaseSyncWorker(appContext, workerParams) {
|
||||
|
||||
@AssistedFactory
|
||||
@VisibleForTesting
|
||||
|
||||
Reference in New Issue
Block a user