mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-10 08:53:00 -04:00
refactor(sdk): Remove FrozenSlidingSync.
This patch removes `FrozenSlidingSync`. Its unique field is supposed to be stored in the crypto store.
This commit is contained in:
@@ -246,8 +246,7 @@ impl SlidingSyncBuilder {
|
||||
}
|
||||
|
||||
// Reload existing state from the cache.
|
||||
let restored_fields =
|
||||
restore_sliding_sync_state(&client, &self.storage_key, &lists).await?;
|
||||
let restored_fields = restore_sliding_sync_state(&client, &self.storage_key).await?;
|
||||
|
||||
let pos = if let Some(fields) = restored_fields {
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
|
||||
@@ -4,17 +4,12 @@
|
||||
//! same cache. It helps to define what it sometimes called a “cold start”, or a
|
||||
//! “fast start”.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use matrix_sdk_base::{StateStore, StoreError};
|
||||
use matrix_sdk_common::timer;
|
||||
use ruma::UserId;
|
||||
use tracing::{trace, warn};
|
||||
|
||||
use super::{
|
||||
FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList,
|
||||
SlidingSyncPositionMarkers,
|
||||
};
|
||||
use super::{FrozenSlidingSyncList, SlidingSync, SlidingSyncPositionMarkers};
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
use crate::sliding_sync::FrozenSlidingSyncPos;
|
||||
use crate::{sliding_sync::SlidingSyncListCachePolicy, Client, Result};
|
||||
@@ -48,30 +43,6 @@ async fn invalidate_cached_list(
|
||||
let _ = storage.remove_custom_value(storage_key_for_list.as_bytes()).await;
|
||||
}
|
||||
|
||||
/// Clean the storage for everything related to `SlidingSync` and all known
|
||||
/// lists.
|
||||
async fn clean_storage(
|
||||
client: &Client,
|
||||
storage_key: &str,
|
||||
lists: &BTreeMap<String, SlidingSyncList>,
|
||||
) {
|
||||
let storage = client.state_store();
|
||||
for list_name in lists.keys() {
|
||||
invalidate_cached_list(storage, storage_key, list_name).await;
|
||||
}
|
||||
let instance_storage_key = format_storage_key_for_sliding_sync(storage_key);
|
||||
let _ = storage.remove_custom_value(instance_storage_key.as_bytes()).await;
|
||||
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
if let Some(olm_machine) = &*client.olm_machine().await {
|
||||
// Invalidate the value stored for the TERRIBLE HACK.
|
||||
let _ = olm_machine
|
||||
.store()
|
||||
.set_custom_value(&instance_storage_key, "".as_bytes().to_vec())
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Store the `SlidingSync`'s state in the storage.
|
||||
pub(super) async fn store_sliding_sync_state(
|
||||
sliding_sync: &SlidingSync,
|
||||
@@ -83,15 +54,6 @@ pub(super) async fn store_sliding_sync_state(
|
||||
trace!(storage_key, "Saving a `SlidingSync` to the state store");
|
||||
let storage = sliding_sync.inner.client.state_store();
|
||||
|
||||
// Write this `SlidingSync` instance, as a `FrozenSlidingSync` instance, inside
|
||||
// the store.
|
||||
storage
|
||||
.set_custom_value(
|
||||
instance_storage_key.as_bytes(),
|
||||
serde_json::to_vec(&FrozenSlidingSync::new())?,
|
||||
)
|
||||
.await?;
|
||||
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
{
|
||||
let position = _position;
|
||||
@@ -194,7 +156,6 @@ pub(super) struct RestoredFields {
|
||||
pub(super) async fn restore_sliding_sync_state(
|
||||
client: &Client,
|
||||
storage_key: &str,
|
||||
lists: &BTreeMap<String, SlidingSyncList>,
|
||||
) -> Result<Option<RestoredFields>> {
|
||||
let _timer = timer!(format!("loading sliding sync {storage_key} state from DB"));
|
||||
|
||||
@@ -210,60 +171,16 @@ pub(super) async fn restore_sliding_sync_state(
|
||||
}
|
||||
}
|
||||
|
||||
let storage = client.state_store();
|
||||
let instance_storage_key = format_storage_key_for_sliding_sync(storage_key);
|
||||
|
||||
// Preload the `SlidingSync` object from the cache.
|
||||
match storage
|
||||
.get_custom_value(instance_storage_key.as_bytes())
|
||||
.await?
|
||||
.map(|custom_value| serde_json::from_slice::<FrozenSlidingSync>(&custom_value))
|
||||
{
|
||||
// `SlidingSync` has been found and successfully deserialized.
|
||||
Some(Ok(FrozenSlidingSync { to_device_since })) => {
|
||||
trace!("Successfully read the `SlidingSync` from the cache");
|
||||
// Only update the to-device token if we failed to read it from the crypto store
|
||||
// above.
|
||||
if restored_fields.to_device_token.is_none() {
|
||||
restored_fields.to_device_token = to_device_since;
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
if let Some(olm_machine) = &*client.olm_machine().await {
|
||||
if let Ok(Some(blob)) = olm_machine.store().get_custom_value(&instance_storage_key).await {
|
||||
if let Ok(frozen_pos) = serde_json::from_slice::<FrozenSlidingSyncPos>(&blob) {
|
||||
trace!("Successfully read the `Sliding Sync` pos from the crypto store cache");
|
||||
restored_fields.pos = frozen_pos.pos;
|
||||
}
|
||||
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
{
|
||||
if let Some(olm_machine) = &*client.olm_machine().await {
|
||||
if let Ok(Some(blob)) =
|
||||
olm_machine.store().get_custom_value(&instance_storage_key).await
|
||||
{
|
||||
if let Ok(frozen_pos) =
|
||||
serde_json::from_slice::<FrozenSlidingSyncPos>(&blob)
|
||||
{
|
||||
trace!("Successfully read the `Sliding Sync` pos from the crypto store cache");
|
||||
restored_fields.pos = frozen_pos.pos;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// `SlidingSync` has been found, but it wasn't possible to deserialize it. It's
|
||||
// declared as obsolete. The main reason might be that the internal
|
||||
// representation of a `SlidingSync` might have changed.
|
||||
// Instead of considering this as a strong error, we remove
|
||||
// the entry from the cache and keep `SlidingSync` in its initial
|
||||
// state.
|
||||
Some(Err(_)) => {
|
||||
warn!(
|
||||
"failed to deserialize `SlidingSync` from the cache, it is obsolete; removing the cache entry!"
|
||||
);
|
||||
|
||||
// Let's clear everything and stop here.
|
||||
clean_storage(client, storage_key, lists).await;
|
||||
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
None => {
|
||||
trace!("No Sliding Sync object in the cache");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -274,13 +191,12 @@ pub(super) async fn restore_sliding_sync_state(
|
||||
mod tests {
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use matrix_sdk_test::async_test;
|
||||
|
||||
use super::{
|
||||
clean_storage, format_storage_key_for_sliding_sync,
|
||||
super::SlidingSyncList, format_storage_key_for_sliding_sync,
|
||||
format_storage_key_for_sliding_sync_list, format_storage_key_prefix,
|
||||
restore_sliding_sync_state, store_sliding_sync_state, SlidingSyncList,
|
||||
restore_sliding_sync_state, store_sliding_sync_state,
|
||||
};
|
||||
use crate::{test_utils::logged_in_client, Result};
|
||||
|
||||
@@ -291,30 +207,26 @@ mod tests {
|
||||
|
||||
let store = client.state_store();
|
||||
|
||||
let sync_id = "test-sync-id";
|
||||
let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap());
|
||||
|
||||
// Store entries don't exist.
|
||||
assert!(store
|
||||
.get_custom_value(format_storage_key_for_sliding_sync("hello").as_bytes())
|
||||
.await?
|
||||
.is_none());
|
||||
|
||||
assert!(store
|
||||
.get_custom_value(
|
||||
format_storage_key_for_sliding_sync_list("hello", "list_foo").as_bytes()
|
||||
format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes()
|
||||
)
|
||||
.await?
|
||||
.is_none());
|
||||
|
||||
assert!(store
|
||||
.get_custom_value(
|
||||
format_storage_key_for_sliding_sync_list("hello", "list_bar").as_bytes()
|
||||
format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes()
|
||||
)
|
||||
.await?
|
||||
.is_none());
|
||||
|
||||
// Create a new `SlidingSync` instance, and store it.
|
||||
let storage_key = {
|
||||
let sync_id = "test-sync-id";
|
||||
let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap());
|
||||
let sliding_sync = client
|
||||
.sliding_sync(sync_id)?
|
||||
.add_cached_list(SlidingSyncList::builder("list_foo"))
|
||||
@@ -340,12 +252,7 @@ mod tests {
|
||||
storage_key
|
||||
};
|
||||
|
||||
// Store entries now exist for the sliding sync object and list_foo.
|
||||
assert!(store
|
||||
.get_custom_value(format_storage_key_for_sliding_sync(&storage_key).as_bytes())
|
||||
.await?
|
||||
.is_some());
|
||||
|
||||
// Store entries now exist for `list_foo`.
|
||||
assert!(store
|
||||
.get_custom_value(
|
||||
format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes()
|
||||
@@ -353,7 +260,7 @@ mod tests {
|
||||
.await?
|
||||
.is_some());
|
||||
|
||||
// But not for list_bar.
|
||||
// But not for `list_bar`.
|
||||
assert!(store
|
||||
.get_custom_value(
|
||||
format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes()
|
||||
@@ -362,74 +269,45 @@ mod tests {
|
||||
.is_none());
|
||||
|
||||
// Create a new `SlidingSync`, and it should be read from the cache.
|
||||
let storage_key = {
|
||||
let sync_id = "test-sync-id";
|
||||
let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap());
|
||||
let max_number_of_room_stream = Arc::new(RwLock::new(None));
|
||||
let cloned_stream = max_number_of_room_stream.clone();
|
||||
let sliding_sync = client
|
||||
.sliding_sync(sync_id)?
|
||||
.add_cached_list(SlidingSyncList::builder("list_foo").once_built(move |list| {
|
||||
// In the `once_built()` handler, nothing has been read from the cache yet.
|
||||
assert_eq!(list.maximum_number_of_rooms(), None);
|
||||
let max_number_of_room_stream = Arc::new(RwLock::new(None));
|
||||
let cloned_stream = max_number_of_room_stream.clone();
|
||||
let sliding_sync = client
|
||||
.sliding_sync(sync_id)?
|
||||
.add_cached_list(SlidingSyncList::builder("list_foo").once_built(move |list| {
|
||||
// In the `once_built()` handler, nothing has been read from the cache yet.
|
||||
assert_eq!(list.maximum_number_of_rooms(), None);
|
||||
|
||||
let mut stream = cloned_stream.write().unwrap();
|
||||
*stream = Some(list.maximum_number_of_rooms_stream());
|
||||
list
|
||||
}))
|
||||
.await?
|
||||
.add_list(SlidingSyncList::builder("list_bar"))
|
||||
.build()
|
||||
.await?;
|
||||
let mut stream = cloned_stream.write().unwrap();
|
||||
*stream = Some(list.maximum_number_of_rooms_stream());
|
||||
list
|
||||
}))
|
||||
.await?
|
||||
.add_list(SlidingSyncList::builder("list_bar"))
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
// Check the list' state.
|
||||
{
|
||||
let lists = sliding_sync.inner.lists.read().await;
|
||||
|
||||
// This one was cached.
|
||||
let list_foo = lists.get("list_foo").unwrap();
|
||||
assert_eq!(list_foo.maximum_number_of_rooms(), Some(42));
|
||||
|
||||
// This one wasn't.
|
||||
let list_bar = lists.get("list_bar").unwrap();
|
||||
assert_eq!(list_bar.maximum_number_of_rooms(), None);
|
||||
}
|
||||
|
||||
// The maximum number of rooms reloaded from the cache should have been
|
||||
// published.
|
||||
{
|
||||
let mut stream =
|
||||
max_number_of_room_stream.write().unwrap().take().expect("stream must be set");
|
||||
let initial_max_number_of_rooms =
|
||||
stream.next().await.expect("stream must have emitted something");
|
||||
assert_eq!(initial_max_number_of_rooms, Some(42));
|
||||
}
|
||||
|
||||
// Clean the cache.
|
||||
// Check the list' state.
|
||||
{
|
||||
let lists = sliding_sync.inner.lists.read().await;
|
||||
clean_storage(&client, &storage_key, &lists).await;
|
||||
storage_key
|
||||
};
|
||||
|
||||
// Store entries don't exist.
|
||||
assert!(store
|
||||
.get_custom_value(format_storage_key_for_sliding_sync(&storage_key).as_bytes())
|
||||
.await?
|
||||
.is_none());
|
||||
// This one was cached.
|
||||
let list_foo = lists.get("list_foo").unwrap();
|
||||
assert_eq!(list_foo.maximum_number_of_rooms(), Some(42));
|
||||
|
||||
assert!(store
|
||||
.get_custom_value(
|
||||
format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes()
|
||||
)
|
||||
.await?
|
||||
.is_none());
|
||||
// This one wasn't.
|
||||
let list_bar = lists.get("list_bar").unwrap();
|
||||
assert_eq!(list_bar.maximum_number_of_rooms(), None);
|
||||
}
|
||||
|
||||
assert!(store
|
||||
.get_custom_value(
|
||||
format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes()
|
||||
)
|
||||
.await?
|
||||
.is_none());
|
||||
// The maximum number of rooms reloaded from the cache should have been
|
||||
// published.
|
||||
{
|
||||
let mut stream =
|
||||
max_number_of_room_stream.write().unwrap().take().expect("stream must be set");
|
||||
let initial_max_number_of_rooms =
|
||||
stream.next().await.expect("stream must have emitted something");
|
||||
assert_eq!(initial_max_number_of_rooms, Some(42));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -437,8 +315,6 @@ mod tests {
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
#[async_test]
|
||||
async fn test_sliding_sync_high_level_cache_and_restore() -> Result<()> {
|
||||
use crate::sliding_sync::FrozenSlidingSync;
|
||||
|
||||
let client = logged_in_client(Some("https://foo.bar".to_owned())).await;
|
||||
|
||||
let sync_id = "test-sync-id";
|
||||
@@ -465,21 +341,10 @@ mod tests {
|
||||
store_sliding_sync_state(&sliding_sync, &position_guard).await?;
|
||||
}
|
||||
|
||||
// The delta token has been correctly written to the state store (but not the
|
||||
// to_device_since, since it's in the other store).
|
||||
let state_store = client.state_store();
|
||||
assert_matches!(
|
||||
state_store.get_custom_value(full_storage_key.as_bytes()).await?,
|
||||
Some(bytes) => {
|
||||
let deserialized: FrozenSlidingSync = serde_json::from_slice(&bytes)?;
|
||||
assert!(deserialized.to_device_since.is_none());
|
||||
}
|
||||
);
|
||||
|
||||
// Ok, forget about the sliding sync, let's recreate one from scratch.
|
||||
drop(sliding_sync);
|
||||
|
||||
let restored_fields = restore_sliding_sync_state(&client, &storage_key_prefix, &[].into())
|
||||
let restored_fields = restore_sliding_sync_state(&client, &storage_key_prefix)
|
||||
.await?
|
||||
.expect("must have restored sliding sync fields");
|
||||
|
||||
@@ -496,28 +361,6 @@ mod tests {
|
||||
assert!(olm_machine.store().next_batch_token().await?.is_none());
|
||||
}
|
||||
|
||||
let to_device_token = "to_device_token".to_owned();
|
||||
|
||||
// Put that delta-token in the state store.
|
||||
let state_store = client.state_store();
|
||||
state_store
|
||||
.set_custom_value(
|
||||
full_storage_key.as_bytes(),
|
||||
serde_json::to_vec(&FrozenSlidingSync {
|
||||
to_device_since: Some(to_device_token.clone()),
|
||||
})?,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let restored_fields = restore_sliding_sync_state(&client, &storage_key_prefix, &[].into())
|
||||
.await?
|
||||
.expect("must have restored fields");
|
||||
|
||||
// After restoring, the to-device since token, and the stream position could be
|
||||
// read from the state store.
|
||||
assert_eq!(restored_fields.to_device_token.unwrap(), to_device_token);
|
||||
assert_eq!(restored_fields.pos.unwrap(), pos);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -387,8 +387,7 @@ impl SlidingSync {
|
||||
self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
|
||||
|
||||
let restored_fields = if self.inner.share_pos || to_device_enabled {
|
||||
let lists = self.inner.lists.read().await;
|
||||
restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key, &lists).await?
|
||||
restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -792,21 +791,6 @@ pub(super) struct SlidingSyncPositionMarkers {
|
||||
pos: Option<String>,
|
||||
}
|
||||
|
||||
/// Frozen bits of a Sliding Sync that are stored in the *state* store.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct FrozenSlidingSync {
|
||||
/// Deprecated: prefer storing in the crypto store.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
to_device_since: Option<String>,
|
||||
}
|
||||
|
||||
impl FrozenSlidingSync {
|
||||
fn new() -> Self {
|
||||
// The to-device token must be saved in the `FrozenCryptoSlidingSync` now.
|
||||
Self { to_device_since: None }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct FrozenSlidingSyncPos {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
@@ -921,7 +905,7 @@ mod tests {
|
||||
use super::{
|
||||
http,
|
||||
sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
|
||||
FrozenSlidingSync, SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
|
||||
SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
|
||||
SlidingSyncStickyParameters,
|
||||
};
|
||||
use crate::{
|
||||
@@ -1134,16 +1118,6 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_to_device_token_properly_are_not_cached() -> Result<()> {
|
||||
// FrozenSlidingSync doesn't contain the to_device_token anymore, as it's saved
|
||||
// in the crypto store since PR #2323.
|
||||
let frozen = FrozenSlidingSync::new();
|
||||
assert!(frozen.to_device_since.is_none());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_add_list() -> Result<()> {
|
||||
let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
|
||||
@@ -1821,13 +1795,9 @@ mod tests {
|
||||
|
||||
assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
|
||||
|
||||
let restored_fields = restore_sliding_sync_state(
|
||||
&client,
|
||||
&sliding_sync.inner.storage_key,
|
||||
&*sliding_sync.inner.lists.read().await,
|
||||
)
|
||||
.await?
|
||||
.expect("must have restored fields");
|
||||
let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
|
||||
.await?
|
||||
.expect("must have restored fields");
|
||||
|
||||
// While it has been saved into the database, it's not necessarily going to be
|
||||
// used later!
|
||||
@@ -1916,13 +1886,9 @@ mod tests {
|
||||
|
||||
assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
|
||||
|
||||
let restored_fields = restore_sliding_sync_state(
|
||||
&client,
|
||||
&sliding_sync.inner.storage_key,
|
||||
&*sliding_sync.inner.lists.read().await,
|
||||
)
|
||||
.await?
|
||||
.expect("must have restored fields");
|
||||
let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
|
||||
.await?
|
||||
.expect("must have restored fields");
|
||||
|
||||
// While it has been saved into the database, it's not necessarily going to be
|
||||
// used later!
|
||||
|
||||
Reference in New Issue
Block a user