mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-07 07:27:45 -04:00
Merge pull request #1736 from Hywan/feat-sliding-sync-storage
This commit is contained in:
@@ -12,7 +12,7 @@ use ruma::{
|
||||
},
|
||||
assign, OwnedRoomId,
|
||||
};
|
||||
use tracing::trace;
|
||||
use tracing::{trace, warn};
|
||||
use url::Url;
|
||||
|
||||
use super::{
|
||||
@@ -235,58 +235,18 @@ impl SlidingSyncBuilder {
|
||||
let mut delta_token = None;
|
||||
let mut rooms_found: BTreeMap<OwnedRoomId, SlidingSyncRoom> = BTreeMap::new();
|
||||
|
||||
// Load an existing state from the cache.
|
||||
if let Some(storage_key) = &self.storage_key {
|
||||
trace!(storage_key, "trying to load from cold");
|
||||
|
||||
for (name, list) in &mut self.lists {
|
||||
if let Some(frozen_list) = client
|
||||
.store()
|
||||
.get_custom_value(format!("{storage_key}::{name}").as_bytes())
|
||||
.await?
|
||||
.map(|v| serde_json::from_slice::<FrozenSlidingSyncList>(&v))
|
||||
.transpose()?
|
||||
{
|
||||
trace!(name, "frozen for list found");
|
||||
|
||||
let FrozenSlidingSyncList { maximum_number_of_rooms, rooms_list, rooms } =
|
||||
frozen_list;
|
||||
list.set_from_cold(maximum_number_of_rooms, rooms_list);
|
||||
|
||||
for (key, frozen_room) in rooms.into_iter() {
|
||||
rooms_found.entry(key).or_insert_with(|| {
|
||||
SlidingSyncRoom::from_frozen(frozen_room, client.clone())
|
||||
});
|
||||
}
|
||||
} else {
|
||||
trace!(name, "no frozen state for list found");
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(FrozenSlidingSync { to_device_since, delta_token: frozen_delta_token }) =
|
||||
client
|
||||
.store()
|
||||
.get_custom_value(storage_key.as_bytes())
|
||||
.await?
|
||||
.map(|v| serde_json::from_slice::<FrozenSlidingSync>(&v))
|
||||
.transpose()?
|
||||
{
|
||||
trace!("frozen for generic found");
|
||||
|
||||
if let Some(since) = to_device_since {
|
||||
if let Some(to_device_ext) =
|
||||
self.extensions.get_or_insert_with(Default::default).to_device.as_mut()
|
||||
{
|
||||
to_device_ext.since = Some(since);
|
||||
}
|
||||
}
|
||||
|
||||
delta_token = frozen_delta_token;
|
||||
}
|
||||
|
||||
trace!("sync unfrozen done");
|
||||
};
|
||||
|
||||
trace!(len = rooms_found.len(), "rooms unfrozen");
|
||||
build_from_storage(
|
||||
&client,
|
||||
storage_key,
|
||||
&mut self.lists,
|
||||
&mut delta_token,
|
||||
&mut rooms_found,
|
||||
&mut self.extensions,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let rooms = StdRwLock::new(rooms_found);
|
||||
let lists = StdRwLock::new(self.lists);
|
||||
@@ -312,3 +272,151 @@ impl SlidingSyncBuilder {
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
fn format_storage_key_for_sliding_sync(storage_key: &str) -> String {
|
||||
format!("sliding_sync_store::{storage_key}")
|
||||
}
|
||||
|
||||
fn format_storage_key_for_sliding_sync_list(storage_key: &str, list_name: &str) -> String {
|
||||
format!("sliding_sync_store::{storage_key}::{list_name}")
|
||||
}
|
||||
|
||||
/// Clean the storage for everything related to `SlidingSync`.
|
||||
async fn clean_storage(
|
||||
client: &Client,
|
||||
storage_key: &str,
|
||||
lists: &BTreeMap<String, SlidingSyncList>,
|
||||
) {
|
||||
let storage = client.store();
|
||||
|
||||
for list_name in lists.keys() {
|
||||
let storage_key_for_list = format_storage_key_for_sliding_sync_list(storage_key, list_name);
|
||||
|
||||
let _ = storage.remove_custom_value(storage_key_for_list.as_bytes()).await;
|
||||
}
|
||||
|
||||
let _ = storage
|
||||
.remove_custom_value(format_storage_key_for_sliding_sync(storage_key).as_bytes())
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Build the `SlidingSync` and siblings from the storage.
|
||||
async fn build_from_storage(
|
||||
client: &Client,
|
||||
storage_key: &str,
|
||||
lists: &mut BTreeMap<String, SlidingSyncList>,
|
||||
delta_token: &mut Option<String>,
|
||||
rooms_found: &mut BTreeMap<OwnedRoomId, SlidingSyncRoom>,
|
||||
extensions: &mut Option<ExtensionsConfig>,
|
||||
) -> Result<()> {
|
||||
let storage = client.store();
|
||||
|
||||
let mut collected_lists_and_frozen_lists = Vec::with_capacity(lists.len());
|
||||
|
||||
// Preload the `FrozenSlidingSyncList` objects from the cache.
|
||||
//
|
||||
// Even if a cache was detected as obsolete, we go over all of them, so that we
|
||||
// are sure all obsolete cache entries are removed.
|
||||
for (list_name, list) in lists.iter_mut() {
|
||||
let storage_key_for_list = format_storage_key_for_sliding_sync_list(storage_key, list_name);
|
||||
|
||||
match storage
|
||||
.get_custom_value(storage_key_for_list.as_bytes())
|
||||
.await?
|
||||
.map(|custom_value| serde_json::from_slice::<FrozenSlidingSyncList>(&custom_value))
|
||||
{
|
||||
// List has been found and successfully deserialized.
|
||||
Some(Ok(frozen_list)) => {
|
||||
trace!(list_name, "successfully read the list from cache");
|
||||
|
||||
// Keep it for later.
|
||||
collected_lists_and_frozen_lists.push((list, frozen_list));
|
||||
}
|
||||
|
||||
// List has been found, but it wasn't possible to deserialize it. It's declared
|
||||
// as obsolete. The main reason might be that the internal representation of a
|
||||
// `SlidingSyncList` might have changed. Instead of considering this as a strong
|
||||
// error, we remove the entry from the cache and keep the list in its initial
|
||||
// state.
|
||||
Some(Err(_)) => {
|
||||
warn!(
|
||||
list_name,
|
||||
"failed to deserialize the list from the cache, it is obsolete; removing the cache entry!"
|
||||
);
|
||||
|
||||
// Let's clear everything and stop here.
|
||||
clean_storage(client, storage_key, lists).await;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
None => {
|
||||
trace!(list_name, "failed to find the list in the cache");
|
||||
|
||||
// A missing cache doesn't make anything obsolete.
|
||||
// We just do nothing here.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Preload the `SlidingSync` object from the cache.
|
||||
match storage
|
||||
.get_custom_value(format_storage_key_for_sliding_sync(storage_key).as_bytes())
|
||||
.await?
|
||||
.map(|custom_value| serde_json::from_slice::<FrozenSlidingSync>(&custom_value))
|
||||
{
|
||||
// `SlidingSync` has been found and successfully deserialized.
|
||||
Some(Ok(FrozenSlidingSync { to_device_since, delta_token: frozen_delta_token })) => {
|
||||
trace!("successfully read the `SlidingSync` from the cache");
|
||||
|
||||
// OK, at this step, everything has been loaded successfully from the cache.
|
||||
|
||||
// Let's update all the `SlidingSyncList`.
|
||||
for (list, FrozenSlidingSyncList { maximum_number_of_rooms, rooms_list, rooms }) in
|
||||
collected_lists_and_frozen_lists
|
||||
{
|
||||
list.set_from_cold(maximum_number_of_rooms, rooms_list);
|
||||
|
||||
for (key, frozen_room) in rooms.into_iter() {
|
||||
rooms_found.entry(key).or_insert_with(|| {
|
||||
SlidingSyncRoom::from_frozen(frozen_room, client.clone())
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Let's update the `SlidingSync`.
|
||||
if let Some(since) = to_device_since {
|
||||
if let Some(to_device_ext) =
|
||||
extensions.get_or_insert_with(Default::default).to_device.as_mut()
|
||||
{
|
||||
to_device_ext.since = Some(since);
|
||||
}
|
||||
}
|
||||
|
||||
*delta_token = frozen_delta_token;
|
||||
}
|
||||
|
||||
// `SlidingSync` has been found, but it wasn't possible to deserialize it. It's
|
||||
// declared as obsolete. The main reason might be that the internal
|
||||
// representation of a `SlidingSync` might have changed.
|
||||
// Instead of considering this as a strong error, we remove
|
||||
// the entry from the cache and keep `SlidingSync` in its initial
|
||||
// state.
|
||||
Some(Err(_)) => {
|
||||
warn!(
|
||||
"failed to deserialize `SlidingSync` from the cache, it is obsolote; removing the cache entry!"
|
||||
);
|
||||
|
||||
// Let's clear everything and stop here.
|
||||
clean_storage(client, storage_key, lists).await;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
None => {
|
||||
trace!("Failed to find the `SlidingSync` object in the cache");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user