diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index b593c75a1..2a87e1a18 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -246,8 +246,7 @@ impl SlidingSyncBuilder { } // Reload existing state from the cache. - let restored_fields = - restore_sliding_sync_state(&client, &self.storage_key, &lists).await?; + let restored_fields = restore_sliding_sync_state(&client, &self.storage_key).await?; let pos = if let Some(fields) = restored_fields { #[cfg(feature = "e2e-encryption")] diff --git a/crates/matrix-sdk/src/sliding_sync/cache.rs b/crates/matrix-sdk/src/sliding_sync/cache.rs index 02b331ba4..a78442f26 100644 --- a/crates/matrix-sdk/src/sliding_sync/cache.rs +++ b/crates/matrix-sdk/src/sliding_sync/cache.rs @@ -4,17 +4,12 @@ //! same cache. It helps to define what it sometimes called a “cold start”, or a //! “fast start”. -use std::collections::BTreeMap; - use matrix_sdk_base::{StateStore, StoreError}; use matrix_sdk_common::timer; use ruma::UserId; use tracing::{trace, warn}; -use super::{ - FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList, - SlidingSyncPositionMarkers, -}; +use super::{FrozenSlidingSyncList, SlidingSync, SlidingSyncPositionMarkers}; #[cfg(feature = "e2e-encryption")] use crate::sliding_sync::FrozenSlidingSyncPos; use crate::{sliding_sync::SlidingSyncListCachePolicy, Client, Result}; @@ -48,30 +43,6 @@ async fn invalidate_cached_list( let _ = storage.remove_custom_value(storage_key_for_list.as_bytes()).await; } -/// Clean the storage for everything related to `SlidingSync` and all known -/// lists. -async fn clean_storage( - client: &Client, - storage_key: &str, - lists: &BTreeMap, -) { - let storage = client.state_store(); - for list_name in lists.keys() { - invalidate_cached_list(storage, storage_key, list_name).await; - } - let instance_storage_key = format_storage_key_for_sliding_sync(storage_key); - let _ = storage.remove_custom_value(instance_storage_key.as_bytes()).await; - - #[cfg(feature = "e2e-encryption")] - if let Some(olm_machine) = &*client.olm_machine().await { - // Invalidate the value stored for the TERRIBLE HACK. - let _ = olm_machine - .store() - .set_custom_value(&instance_storage_key, "".as_bytes().to_vec()) - .await; - } -} - /// Store the `SlidingSync`'s state in the storage. pub(super) async fn store_sliding_sync_state( sliding_sync: &SlidingSync, @@ -83,15 +54,6 @@ pub(super) async fn store_sliding_sync_state( trace!(storage_key, "Saving a `SlidingSync` to the state store"); let storage = sliding_sync.inner.client.state_store(); - // Write this `SlidingSync` instance, as a `FrozenSlidingSync` instance, inside - // the store. - storage - .set_custom_value( - instance_storage_key.as_bytes(), - serde_json::to_vec(&FrozenSlidingSync::new())?, - ) - .await?; - #[cfg(feature = "e2e-encryption")] { let position = _position; @@ -194,7 +156,6 @@ pub(super) struct RestoredFields { pub(super) async fn restore_sliding_sync_state( client: &Client, storage_key: &str, - lists: &BTreeMap, ) -> Result> { let _timer = timer!(format!("loading sliding sync {storage_key} state from DB")); @@ -210,60 +171,16 @@ pub(super) async fn restore_sliding_sync_state( } } - let storage = client.state_store(); let instance_storage_key = format_storage_key_for_sliding_sync(storage_key); // Preload the `SlidingSync` object from the cache. - match storage - .get_custom_value(instance_storage_key.as_bytes()) - .await? - .map(|custom_value| serde_json::from_slice::(&custom_value)) - { - // `SlidingSync` has been found and successfully deserialized. - Some(Ok(FrozenSlidingSync { to_device_since })) => { - trace!("Successfully read the `SlidingSync` from the cache"); - // Only update the to-device token if we failed to read it from the crypto store - // above. - if restored_fields.to_device_token.is_none() { - restored_fields.to_device_token = to_device_since; + #[cfg(feature = "e2e-encryption")] + if let Some(olm_machine) = &*client.olm_machine().await { + if let Ok(Some(blob)) = olm_machine.store().get_custom_value(&instance_storage_key).await { + if let Ok(frozen_pos) = serde_json::from_slice::(&blob) { + trace!("Successfully read the `Sliding Sync` pos from the crypto store cache"); + restored_fields.pos = frozen_pos.pos; } - - #[cfg(feature = "e2e-encryption")] - { - if let Some(olm_machine) = &*client.olm_machine().await { - if let Ok(Some(blob)) = - olm_machine.store().get_custom_value(&instance_storage_key).await - { - if let Ok(frozen_pos) = - serde_json::from_slice::(&blob) - { - trace!("Successfully read the `Sliding Sync` pos from the crypto store cache"); - restored_fields.pos = frozen_pos.pos; - } - } - } - } - } - - // `SlidingSync` has been found, but it wasn't possible to deserialize it. It's - // declared as obsolete. The main reason might be that the internal - // representation of a `SlidingSync` might have changed. - // Instead of considering this as a strong error, we remove - // the entry from the cache and keep `SlidingSync` in its initial - // state. - Some(Err(_)) => { - warn!( - "failed to deserialize `SlidingSync` from the cache, it is obsolete; removing the cache entry!" - ); - - // Let's clear everything and stop here. - clean_storage(client, storage_key, lists).await; - - return Ok(None); - } - - None => { - trace!("No Sliding Sync object in the cache"); } } @@ -274,13 +191,12 @@ pub(super) async fn restore_sliding_sync_state( mod tests { use std::sync::{Arc, RwLock}; - use assert_matches::assert_matches; use matrix_sdk_test::async_test; use super::{ - clean_storage, format_storage_key_for_sliding_sync, + super::SlidingSyncList, format_storage_key_for_sliding_sync, format_storage_key_for_sliding_sync_list, format_storage_key_prefix, - restore_sliding_sync_state, store_sliding_sync_state, SlidingSyncList, + restore_sliding_sync_state, store_sliding_sync_state, }; use crate::{test_utils::logged_in_client, Result}; @@ -291,30 +207,26 @@ mod tests { let store = client.state_store(); + let sync_id = "test-sync-id"; + let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap()); + // Store entries don't exist. - assert!(store - .get_custom_value(format_storage_key_for_sliding_sync("hello").as_bytes()) - .await? - .is_none()); - assert!(store .get_custom_value( - format_storage_key_for_sliding_sync_list("hello", "list_foo").as_bytes() + format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes() ) .await? .is_none()); assert!(store .get_custom_value( - format_storage_key_for_sliding_sync_list("hello", "list_bar").as_bytes() + format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes() ) .await? .is_none()); // Create a new `SlidingSync` instance, and store it. let storage_key = { - let sync_id = "test-sync-id"; - let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap()); let sliding_sync = client .sliding_sync(sync_id)? .add_cached_list(SlidingSyncList::builder("list_foo")) @@ -340,12 +252,7 @@ mod tests { storage_key }; - // Store entries now exist for the sliding sync object and list_foo. - assert!(store - .get_custom_value(format_storage_key_for_sliding_sync(&storage_key).as_bytes()) - .await? - .is_some()); - + // Store entries now exist for `list_foo`. assert!(store .get_custom_value( format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes() @@ -353,7 +260,7 @@ mod tests { .await? .is_some()); - // But not for list_bar. + // But not for `list_bar`. assert!(store .get_custom_value( format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes() @@ -362,74 +269,45 @@ mod tests { .is_none()); // Create a new `SlidingSync`, and it should be read from the cache. - let storage_key = { - let sync_id = "test-sync-id"; - let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap()); - let max_number_of_room_stream = Arc::new(RwLock::new(None)); - let cloned_stream = max_number_of_room_stream.clone(); - let sliding_sync = client - .sliding_sync(sync_id)? - .add_cached_list(SlidingSyncList::builder("list_foo").once_built(move |list| { - // In the `once_built()` handler, nothing has been read from the cache yet. - assert_eq!(list.maximum_number_of_rooms(), None); + let max_number_of_room_stream = Arc::new(RwLock::new(None)); + let cloned_stream = max_number_of_room_stream.clone(); + let sliding_sync = client + .sliding_sync(sync_id)? + .add_cached_list(SlidingSyncList::builder("list_foo").once_built(move |list| { + // In the `once_built()` handler, nothing has been read from the cache yet. + assert_eq!(list.maximum_number_of_rooms(), None); - let mut stream = cloned_stream.write().unwrap(); - *stream = Some(list.maximum_number_of_rooms_stream()); - list - })) - .await? - .add_list(SlidingSyncList::builder("list_bar")) - .build() - .await?; + let mut stream = cloned_stream.write().unwrap(); + *stream = Some(list.maximum_number_of_rooms_stream()); + list + })) + .await? + .add_list(SlidingSyncList::builder("list_bar")) + .build() + .await?; - // Check the list' state. - { - let lists = sliding_sync.inner.lists.read().await; - - // This one was cached. - let list_foo = lists.get("list_foo").unwrap(); - assert_eq!(list_foo.maximum_number_of_rooms(), Some(42)); - - // This one wasn't. - let list_bar = lists.get("list_bar").unwrap(); - assert_eq!(list_bar.maximum_number_of_rooms(), None); - } - - // The maximum number of rooms reloaded from the cache should have been - // published. - { - let mut stream = - max_number_of_room_stream.write().unwrap().take().expect("stream must be set"); - let initial_max_number_of_rooms = - stream.next().await.expect("stream must have emitted something"); - assert_eq!(initial_max_number_of_rooms, Some(42)); - } - - // Clean the cache. + // Check the list' state. + { let lists = sliding_sync.inner.lists.read().await; - clean_storage(&client, &storage_key, &lists).await; - storage_key - }; - // Store entries don't exist. - assert!(store - .get_custom_value(format_storage_key_for_sliding_sync(&storage_key).as_bytes()) - .await? - .is_none()); + // This one was cached. + let list_foo = lists.get("list_foo").unwrap(); + assert_eq!(list_foo.maximum_number_of_rooms(), Some(42)); - assert!(store - .get_custom_value( - format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes() - ) - .await? - .is_none()); + // This one wasn't. + let list_bar = lists.get("list_bar").unwrap(); + assert_eq!(list_bar.maximum_number_of_rooms(), None); + } - assert!(store - .get_custom_value( - format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes() - ) - .await? - .is_none()); + // The maximum number of rooms reloaded from the cache should have been + // published. + { + let mut stream = + max_number_of_room_stream.write().unwrap().take().expect("stream must be set"); + let initial_max_number_of_rooms = + stream.next().await.expect("stream must have emitted something"); + assert_eq!(initial_max_number_of_rooms, Some(42)); + } Ok(()) } @@ -437,8 +315,6 @@ mod tests { #[cfg(feature = "e2e-encryption")] #[async_test] async fn test_sliding_sync_high_level_cache_and_restore() -> Result<()> { - use crate::sliding_sync::FrozenSlidingSync; - let client = logged_in_client(Some("https://foo.bar".to_owned())).await; let sync_id = "test-sync-id"; @@ -465,21 +341,10 @@ mod tests { store_sliding_sync_state(&sliding_sync, &position_guard).await?; } - // The delta token has been correctly written to the state store (but not the - // to_device_since, since it's in the other store). - let state_store = client.state_store(); - assert_matches!( - state_store.get_custom_value(full_storage_key.as_bytes()).await?, - Some(bytes) => { - let deserialized: FrozenSlidingSync = serde_json::from_slice(&bytes)?; - assert!(deserialized.to_device_since.is_none()); - } - ); - // Ok, forget about the sliding sync, let's recreate one from scratch. drop(sliding_sync); - let restored_fields = restore_sliding_sync_state(&client, &storage_key_prefix, &[].into()) + let restored_fields = restore_sliding_sync_state(&client, &storage_key_prefix) .await? .expect("must have restored sliding sync fields"); @@ -496,28 +361,6 @@ mod tests { assert!(olm_machine.store().next_batch_token().await?.is_none()); } - let to_device_token = "to_device_token".to_owned(); - - // Put that delta-token in the state store. - let state_store = client.state_store(); - state_store - .set_custom_value( - full_storage_key.as_bytes(), - serde_json::to_vec(&FrozenSlidingSync { - to_device_since: Some(to_device_token.clone()), - })?, - ) - .await?; - - let restored_fields = restore_sliding_sync_state(&client, &storage_key_prefix, &[].into()) - .await? - .expect("must have restored fields"); - - // After restoring, the to-device since token, and the stream position could be - // read from the state store. - assert_eq!(restored_fields.to_device_token.unwrap(), to_device_token); - assert_eq!(restored_fields.pos.unwrap(), pos); - Ok(()) } } diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index a66b20e58..659da0b0a 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -387,8 +387,7 @@ impl SlidingSync { self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true); let restored_fields = if self.inner.share_pos || to_device_enabled { - let lists = self.inner.lists.read().await; - restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key, &lists).await? + restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await? } else { None }; @@ -792,21 +791,6 @@ pub(super) struct SlidingSyncPositionMarkers { pos: Option, } -/// Frozen bits of a Sliding Sync that are stored in the *state* store. -#[derive(Debug, Serialize, Deserialize)] -struct FrozenSlidingSync { - /// Deprecated: prefer storing in the crypto store. - #[serde(skip_serializing_if = "Option::is_none")] - to_device_since: Option, -} - -impl FrozenSlidingSync { - fn new() -> Self { - // The to-device token must be saved in the `FrozenCryptoSlidingSync` now. - Self { to_device_since: None } - } -} - #[derive(Serialize, Deserialize)] struct FrozenSlidingSyncPos { #[serde(skip_serializing_if = "Option::is_none")] @@ -921,7 +905,7 @@ mod tests { use super::{ http, sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager}, - FrozenSlidingSync, SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode, + SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode, SlidingSyncStickyParameters, }; use crate::{ @@ -1134,16 +1118,6 @@ mod tests { Ok(()) } - #[async_test] - async fn test_to_device_token_properly_are_not_cached() -> Result<()> { - // FrozenSlidingSync doesn't contain the to_device_token anymore, as it's saved - // in the crypto store since PR #2323. - let frozen = FrozenSlidingSync::new(); - assert!(frozen.to_device_since.is_none()); - - Ok(()) - } - #[async_test] async fn test_add_list() -> Result<()> { let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo") @@ -1821,13 +1795,9 @@ mod tests { assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0")); - let restored_fields = restore_sliding_sync_state( - &client, - &sliding_sync.inner.storage_key, - &*sliding_sync.inner.lists.read().await, - ) - .await? - .expect("must have restored fields"); + let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key) + .await? + .expect("must have restored fields"); // While it has been saved into the database, it's not necessarily going to be // used later! @@ -1916,13 +1886,9 @@ mod tests { assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned())); - let restored_fields = restore_sliding_sync_state( - &client, - &sliding_sync.inner.storage_key, - &*sliding_sync.inner.lists.read().await, - ) - .await? - .expect("must have restored fields"); + let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key) + .await? + .expect("must have restored fields"); // While it has been saved into the database, it's not necessarily going to be // used later!