diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 965a3672b..0fd2deeb5 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -281,7 +281,6 @@ impl SlidingSyncBuilder { room_unsubscriptions: Default::default(), internal_channel: internal_channel_sender, - response_handling_lock: Arc::new(AsyncMutex::new(())), poll_timeout: self.poll_timeout, network_timeout: self.network_timeout, diff --git a/crates/matrix-sdk/src/sliding_sync/cache.rs b/crates/matrix-sdk/src/sliding_sync/cache.rs index 508869010..33dd46d2e 100644 --- a/crates/matrix-sdk/src/sliding_sync/cache.rs +++ b/crates/matrix-sdk/src/sliding_sync/cache.rs @@ -10,7 +10,10 @@ use matrix_sdk_base::{StateStore, StoreError}; use ruma::UserId; use tracing::{trace, warn}; -use super::{FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList}; +use super::{ + FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList, + SlidingSyncPositionMarkers, +}; use crate::{sliding_sync::SlidingSyncListCachePolicy, Client, Result}; /// Be careful: as this is used as a storage key; changing it requires migrating @@ -59,7 +62,10 @@ async fn clean_storage( } /// Store the `SlidingSync`'s state in the storage. -pub(super) async fn store_sliding_sync_state(sliding_sync: &SlidingSync) -> Result<()> { +pub(super) async fn store_sliding_sync_state( + sliding_sync: &SlidingSync, + position: &SlidingSyncPositionMarkers, +) -> Result<()> { let storage_key = &sliding_sync.inner.storage_key; let instance_storage_key = format_storage_key_for_sliding_sync(storage_key); @@ -71,7 +77,7 @@ pub(super) async fn store_sliding_sync_state(sliding_sync: &SlidingSync) -> Resu storage .set_custom_value( instance_storage_key.as_bytes(), - serde_json::to_vec(&FrozenSlidingSync::new(sliding_sync).await)?, + serde_json::to_vec(&FrozenSlidingSync::new(position).await)?, ) .await?; @@ -276,7 +282,9 @@ mod tests { list_bar.set_maximum_number_of_rooms(Some(1337)); } - assert!(sliding_sync.cache_to_storage().await.is_ok()); + let position_guard = sliding_sync.inner.position.lock().await; + assert!(sliding_sync.cache_to_storage(&*position_guard).await.is_ok()); + storage_key }; @@ -395,10 +403,13 @@ mod tests { // Emulate some data to be cached. let delta_token = "delta_token".to_owned(); - sliding_sync.inner.position.lock().await.delta_token = Some(delta_token.clone()); + { + let mut position_guard = sliding_sync.inner.position.lock().await; + position_guard.delta_token = Some(delta_token.clone()); - // Then, we can correctly cache the sliding sync instance. - store_sliding_sync_state(&sliding_sync).await?; + // Then, we can correctly cache the sliding sync instance. + store_sliding_sync_state(&sliding_sync, &*position_guard).await?; + } // The delta token has been correctly written to the state store (but not the // to_device_since, since it's in the other store). diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index fc4435d34..ad10cad28 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -127,9 +127,6 @@ pub(super) struct SlidingSyncInner { /// Internal channel used to pass messages between Sliding Sync and other /// types. internal_channel: Sender, - - /// A lock to ensure that responses are handled one at a time. - response_handling_lock: Arc>, } impl SlidingSync { @@ -137,8 +134,8 @@ impl SlidingSync { Self { inner: Arc::new(inner) } } - async fn cache_to_storage(&self) -> Result<()> { - cache::store_sliding_sync_state(self).await + async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> { + cache::store_sliding_sync_state(self, position).await } /// Create a new [`SlidingSyncBuilder`]. @@ -282,7 +279,7 @@ impl SlidingSync { async fn handle_response( &self, mut sliding_sync_response: v4::Response, - mut position_guard: OwnedMutexGuard, + position: &mut SlidingSyncPositionMarkers, ) -> Result { let pos = Some(sliding_sync_response.pos.clone()); @@ -427,16 +424,12 @@ impl SlidingSync { // Everything went well, we can update the position markers. // // Save the new position markers. - position_guard.pos = pos; - position_guard.delta_token = sliding_sync_response.delta_token.clone(); + position.pos = pos; + position.delta_token = sliding_sync_response.delta_token.clone(); // Keep this position markers in memory, in case it pops from the server. let mut past_positions = self.inner.past_positions.write().unwrap(); - past_positions.push(position_guard.clone()); - - // Release the position markers lock. - // It means that other requests can start to be sent. - drop(position_guard); + past_positions.push(position.clone()); Ok(update_summary) } @@ -548,7 +541,7 @@ impl SlidingSync { #[instrument(skip_all, fields(pos))] async fn sync_once(&self) -> Result { - let (request, request_config, requested_room_unsubscriptions, position_guard) = + let (request, request_config, requested_room_unsubscriptions, mut position_guard) = self.generate_sync_request(&mut LazyTransactionId::new()).await?; debug!("Sending request"); @@ -636,9 +629,8 @@ impl SlidingSync { debug!("Start handling response"); // In case the task running this future is detached, we must - // ensure responses are handled one at a time, hence we lock the - // `response_handling_lock`. - let response_handling_lock = this.inner.response_handling_lock.lock().await; + // ensure responses are handled one at a time. At this point we still own + // `position_guard`, so we're fine. // Room unsubscriptions have been received by the server. We can update the // unsubscriptions buffer. However, it would be an error to empty it entirely as @@ -652,13 +644,13 @@ impl SlidingSync { } // Handle the response. - let updates = this.handle_response(response, position_guard).await?; + let updates = this.handle_response(response, &mut *position_guard).await?; - this.cache_to_storage().await?; + this.cache_to_storage(&*position_guard).await?; - // Release the response handling lock. - // It means that other responses can be handled. - drop(response_handling_lock); + // Release the position guard lock. + // It means that other responses can be generated and then handled later. + drop(position_guard); debug!("Done handling response"); @@ -870,9 +862,7 @@ struct FrozenSlidingSync { } impl FrozenSlidingSync { - async fn new(sliding_sync: &SlidingSync) -> Self { - let position = sliding_sync.inner.position.lock().await; - + async fn new(position: &SlidingSyncPositionMarkers) -> Self { // The to-device token must be saved in the `FrozenCryptoSlidingSync` now. Self { delta_token: position.delta_token.clone(), to_device_since: None } } @@ -1166,7 +1156,8 @@ mod tests { // FrozenSlidingSync doesn't contain the to_device_token anymore, as it's saved // in the crypto store since PR #2323. - let frozen = FrozenSlidingSync::new(&sliding_sync).await; + let position_guard = sliding_sync.inner.position.lock().await; + let frozen = FrozenSlidingSync::new(&*position_guard).await; assert!(frozen.to_device_since.is_none()); Ok(()) @@ -1849,12 +1840,9 @@ mod tests { .await?; { - let sliding_sync_position_guard = - sliding_sync.inner.position.clone().lock_owned().await; + let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await; - sliding_sync - .handle_response(server_response.clone(), sliding_sync_position_guard) - .await?; + sliding_sync.handle_response(server_response.clone(), &mut *position_guard).await?; } // E2EE has been properly handled. @@ -1883,12 +1871,9 @@ mod tests { .await?; { - let sliding_sync_position_guard = - sliding_sync.inner.position.clone().lock_owned().await; + let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await; - sliding_sync - .handle_response(server_response.clone(), sliding_sync_position_guard) - .await?; + sliding_sync.handle_response(server_response.clone(), &mut *position_guard).await?; } // E2EE response has been ignored. @@ -1920,12 +1905,9 @@ mod tests { .await?; { - let sliding_sync_position_guard = - sliding_sync.inner.position.clone().lock_owned().await; + let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await; - sliding_sync - .handle_response(server_response.clone(), sliding_sync_position_guard) - .await?; + sliding_sync.handle_response(server_response.clone(), &mut *position_guard).await?; } // E2EE has been properly handled.