feat(sdk): Extract SlidingSync cache functions into their own module.

This commit is contained in:
Ivan Enderlin
2023-04-05 09:05:03 +02:00
parent 690c3b1977
commit fc8b9d7de4
3 changed files with 168 additions and 152 deletions

View File

@@ -12,12 +12,11 @@ use ruma::{
},
assign, OwnedRoomId,
};
use tracing::{trace, warn};
use url::Url;
use super::{
Error, FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncInner,
SlidingSyncList, SlidingSyncListBuilder, SlidingSyncPositionMarkers, SlidingSyncRoom,
cache::restore_sliding_sync_state, Error, SlidingSync, SlidingSyncInner, SlidingSyncList,
SlidingSyncListBuilder, SlidingSyncPositionMarkers, SlidingSyncRoom,
};
use crate::{Client, Result};
@@ -237,7 +236,7 @@ impl SlidingSyncBuilder {
// Load an existing state from the cache.
if let Some(storage_key) = &self.storage_key {
build_from_storage(
restore_sliding_sync_state(
&client,
storage_key,
&mut self.lists,
@@ -272,151 +271,3 @@ impl SlidingSyncBuilder {
}))
}
}
fn format_storage_key_for_sliding_sync(storage_key: &str) -> String {
format!("sliding_sync_store::{storage_key}")
}
fn format_storage_key_for_sliding_sync_list(storage_key: &str, list_name: &str) -> String {
format!("sliding_sync_store::{storage_key}::{list_name}")
}
/// Clean the storage for everything related to `SlidingSync`.
async fn clean_storage(
client: &Client,
storage_key: &str,
lists: &BTreeMap<String, SlidingSyncList>,
) {
let storage = client.store();
for list_name in lists.keys() {
let storage_key_for_list = format_storage_key_for_sliding_sync_list(storage_key, list_name);
let _ = storage.remove_custom_value(storage_key_for_list.as_bytes()).await;
}
let _ = storage
.remove_custom_value(format_storage_key_for_sliding_sync(storage_key).as_bytes())
.await;
}
/// Build the `SlidingSync` and siblings from the storage.
async fn build_from_storage(
client: &Client,
storage_key: &str,
lists: &mut BTreeMap<String, SlidingSyncList>,
delta_token: &mut Option<String>,
rooms_found: &mut BTreeMap<OwnedRoomId, SlidingSyncRoom>,
extensions: &mut Option<ExtensionsConfig>,
) -> Result<()> {
let storage = client.store();
let mut collected_lists_and_frozen_lists = Vec::with_capacity(lists.len());
// Preload the `FrozenSlidingSyncList` objects from the cache.
//
// Even if a cache was detected as obsolete, we go over all of them, so that we
// are sure all obsolete cache entries are removed.
for (list_name, list) in lists.iter_mut() {
let storage_key_for_list = format_storage_key_for_sliding_sync_list(storage_key, list_name);
match storage
.get_custom_value(storage_key_for_list.as_bytes())
.await?
.map(|custom_value| serde_json::from_slice::<FrozenSlidingSyncList>(&custom_value))
{
// List has been found and successfully deserialized.
Some(Ok(frozen_list)) => {
trace!(list_name, "successfully read the list from cache");
// Keep it for later.
collected_lists_and_frozen_lists.push((list, frozen_list));
}
// List has been found, but it wasn't possible to deserialize it. It's declared
// as obsolete. The main reason might be that the internal representation of a
// `SlidingSyncList` might have changed. Instead of considering this as a strong
// error, we remove the entry from the cache and keep the list in its initial
// state.
Some(Err(_)) => {
warn!(
list_name,
"failed to deserialize the list from the cache, it is obsolete; removing the cache entry!"
);
// Let's clear everything and stop here.
clean_storage(client, storage_key, lists).await;
return Ok(());
}
None => {
trace!(list_name, "failed to find the list in the cache");
// A missing cache doesn't make anything obsolete.
// We just do nothing here.
}
}
}
// Preload the `SlidingSync` object from the cache.
match storage
.get_custom_value(format_storage_key_for_sliding_sync(storage_key).as_bytes())
.await?
.map(|custom_value| serde_json::from_slice::<FrozenSlidingSync>(&custom_value))
{
// `SlidingSync` has been found and successfully deserialized.
Some(Ok(FrozenSlidingSync { to_device_since, delta_token: frozen_delta_token })) => {
trace!("successfully read the `SlidingSync` from the cache");
// OK, at this step, everything has been loaded successfully from the cache.
// Let's update all the `SlidingSyncList`.
for (list, FrozenSlidingSyncList { maximum_number_of_rooms, room_list, rooms }) in
collected_lists_and_frozen_lists
{
list.set_from_cold(maximum_number_of_rooms, room_list);
for (key, frozen_room) in rooms.into_iter() {
rooms_found.entry(key).or_insert_with(|| {
SlidingSyncRoom::from_frozen(frozen_room, client.clone())
});
}
}
// Let's update the `SlidingSync`.
if let Some(since) = to_device_since {
if let Some(to_device_ext) =
extensions.get_or_insert_with(Default::default).to_device.as_mut()
{
to_device_ext.since = Some(since);
}
}
*delta_token = frozen_delta_token;
}
// `SlidingSync` has been found, but it wasn't possible to deserialize it. It's
// declared as obsolete. The main reason might be that the internal
// representation of a `SlidingSync` might have changed.
// Instead of considering this as a strong error, we remove
// the entry from the cache and keep `SlidingSync` in its initial
// state.
Some(Err(_)) => {
warn!(
"failed to deserialize `SlidingSync` from the cache, it is obsolote; removing the cache entry!"
);
// Let's clear everything and stop here.
clean_storage(client, storage_key, lists).await;
return Ok(());
}
None => {
trace!("Failed to find the `SlidingSync` object in the cache");
}
}
Ok(())
}

View File

@@ -0,0 +1,164 @@
//! Cache utilities.
//!
//! A `SlidingSync` instance can be stored in a cache, and restored from the
//! same cache. It helps to define what it sometimes called a “cold start”, or a
//! “fast start”.
use std::collections::BTreeMap;
use ruma::{api::client::sync::sync_events::v4::ExtensionsConfig, OwnedRoomId};
use tracing::{trace, warn};
use super::{FrozenSlidingSync, FrozenSlidingSyncList, SlidingSyncList, SlidingSyncRoom};
use crate::{Client, Result};
fn format_storage_key_for_sliding_sync(storage_key: &str) -> String {
format!("sliding_sync_store::{storage_key}")
}
fn format_storage_key_for_sliding_sync_list(storage_key: &str, list_name: &str) -> String {
format!("sliding_sync_store::{storage_key}::{list_name}")
}
/// Clean the storage for everything related to `SlidingSync`.
async fn clean_storage(
client: &Client,
storage_key: &str,
lists: &BTreeMap<String, SlidingSyncList>,
) {
let storage = client.store();
for list_name in lists.keys() {
let storage_key_for_list = format_storage_key_for_sliding_sync_list(storage_key, list_name);
let _ = storage.remove_custom_value(storage_key_for_list.as_bytes()).await;
}
let _ = storage
.remove_custom_value(format_storage_key_for_sliding_sync(storage_key).as_bytes())
.await;
}
/// Restore the `SlidingSync`'s state from what is stored in the storage.
///
/// If one cache is obsolete (corrupted, and cannot be deserialized or
/// anything), the entire `SlidingSync` cache is removed.
pub(super) async fn restore_sliding_sync_state(
client: &Client,
storage_key: &str,
lists: &mut BTreeMap<String, SlidingSyncList>,
delta_token: &mut Option<String>,
rooms_found: &mut BTreeMap<OwnedRoomId, SlidingSyncRoom>,
extensions: &mut Option<ExtensionsConfig>,
) -> Result<()> {
let storage = client.store();
let mut collected_lists_and_frozen_lists = Vec::with_capacity(lists.len());
// Preload the `FrozenSlidingSyncList` objects from the cache.
//
// Even if a cache was detected as obsolete, we go over all of them, so that we
// are sure all obsolete cache entries are removed.
for (list_name, list) in lists.iter_mut() {
let storage_key_for_list = format_storage_key_for_sliding_sync_list(storage_key, list_name);
match storage
.get_custom_value(storage_key_for_list.as_bytes())
.await?
.map(|custom_value| serde_json::from_slice::<FrozenSlidingSyncList>(&custom_value))
{
// List has been found and successfully deserialized.
Some(Ok(frozen_list)) => {
trace!(list_name, "successfully read the list from cache");
// Keep it for later.
collected_lists_and_frozen_lists.push((list, frozen_list));
}
// List has been found, but it wasn't possible to deserialize it. It's declared
// as obsolete. The main reason might be that the internal representation of a
// `SlidingSyncList` might have changed. Instead of considering this as a strong
// error, we remove the entry from the cache and keep the list in its initial
// state.
Some(Err(_)) => {
warn!(
list_name,
"failed to deserialize the list from the cache, it is obsolete; removing the cache entry!"
);
// Let's clear everything and stop here.
clean_storage(client, storage_key, lists).await;
return Ok(());
}
None => {
trace!(list_name, "failed to find the list in the cache");
// A missing cache doesn't make anything obsolete.
// We just do nothing here.
}
}
}
// Preload the `SlidingSync` object from the cache.
match storage
.get_custom_value(format_storage_key_for_sliding_sync(storage_key).as_bytes())
.await?
.map(|custom_value| serde_json::from_slice::<FrozenSlidingSync>(&custom_value))
{
// `SlidingSync` has been found and successfully deserialized.
Some(Ok(FrozenSlidingSync { to_device_since, delta_token: frozen_delta_token })) => {
trace!("successfully read the `SlidingSync` from the cache");
// OK, at this step, everything has been loaded successfully from the cache.
// Let's update all the `SlidingSyncList`.
for (list, FrozenSlidingSyncList { maximum_number_of_rooms, room_list, rooms }) in
collected_lists_and_frozen_lists
{
list.set_from_cold(maximum_number_of_rooms, room_list);
for (key, frozen_room) in rooms.into_iter() {
rooms_found.entry(key).or_insert_with(|| {
SlidingSyncRoom::from_frozen(frozen_room, client.clone())
});
}
}
// Let's update the `SlidingSync`.
if let Some(since) = to_device_since {
if let Some(to_device_ext) =
extensions.get_or_insert_with(Default::default).to_device.as_mut()
{
to_device_ext.since = Some(since);
}
}
*delta_token = frozen_delta_token;
}
// `SlidingSync` has been found, but it wasn't possible to deserialize it. It's
// declared as obsolete. The main reason might be that the internal
// representation of a `SlidingSync` might have changed.
// Instead of considering this as a strong error, we remove
// the entry from the cache and keep `SlidingSync` in its initial
// state.
Some(Err(_)) => {
warn!(
"failed to deserialize `SlidingSync` from the cache, it is obsolote; removing the cache entry!"
);
// Let's clear everything and stop here.
clean_storage(client, storage_key, lists).await;
return Ok(());
}
None => {
trace!("Failed to find the `SlidingSync` object in the cache");
}
}
Ok(())
}

View File

@@ -16,6 +16,7 @@
#![doc = include_str!("README.md")]
mod builder;
mod cache;
mod client;
mod error;
mod list;