chore: refactor fields restored by restore_sliding_sync_state

This commit is contained in:
Benjamin Bouvier
2023-08-24 13:50:57 +02:00
parent 719cbee96e
commit 941ecbfe0d
3 changed files with 35 additions and 54 deletions

View File

@@ -229,8 +229,6 @@ impl SlidingSyncBuilder {
pub async fn build(self) -> Result<SlidingSync> {
let client = self.client;
let mut delta_token = None;
let (internal_channel_sender, _internal_channel_receiver) = channel(8);
let mut lists = BTreeMap::new();
@@ -242,14 +240,9 @@ impl SlidingSyncBuilder {
}
// Reload existing state from the cache.
restore_sliding_sync_state(
&client,
&self.storage_key,
&lists,
&mut delta_token,
&mut None, // to_device_token is ignored here
)
.await?;
let restored_fields =
restore_sliding_sync_state(&client, &self.storage_key, &lists).await?;
let delta_token = restored_fields.and_then(|fields| fields.delta_token);
let rooms = AsyncRwLock::new(self.rooms);
let lists = AsyncRwLock::new(lists);

View File

@@ -158,6 +158,13 @@ pub(super) async fn restore_sliding_sync_list(
Ok(None)
}
/// Fields restored during `restore_sliding_sync_state`.
#[derive(Default)]
pub(super) struct RestoredFields {
pub delta_token: Option<String>,
pub to_device_token: Option<String>,
}
/// Restore the `SlidingSync`'s state from what is stored in the storage.
///
/// If one cache is obsolete (corrupted, and cannot be deserialized or
@@ -166,16 +173,16 @@ pub(super) async fn restore_sliding_sync_state(
client: &Client,
storage_key: &str,
lists: &BTreeMap<String, SlidingSyncList>,
delta_token: &mut Option<String>,
to_device_token: &mut Option<String>,
) -> Result<()> {
) -> Result<Option<RestoredFields>> {
let _timer = timer!(format!("loading sliding sync {storage_key} state from DB"));
let mut restored_fields = RestoredFields::default();
#[cfg(feature = "e2e-encryption")]
if let Some(olm_machine) = &*client.olm_machine().await {
match olm_machine.store().next_batch_token().await? {
Some(token) => {
*to_device_token = Some(token);
restored_fields.to_device_token = Some(token);
}
None => trace!("No `SlidingSync` in the crypto-store cache"),
}
@@ -194,10 +201,11 @@ pub(super) async fn restore_sliding_sync_state(
trace!("Successfully read the `SlidingSync` from the cache");
// Only update the to-device token if we failed to read it from the crypto store
// above.
if to_device_token.is_none() {
*to_device_token = to_device_since;
if restored_fields.to_device_token.is_none() {
restored_fields.to_device_token = to_device_since;
}
*delta_token = frozen_delta_token;
restored_fields.delta_token = frozen_delta_token;
}
// `SlidingSync` has been found, but it wasn't possible to deserialize it. It's
@@ -214,7 +222,7 @@ pub(super) async fn restore_sliding_sync_state(
// Let's clear everything and stop here.
clean_storage(client, storage_key, lists).await;
return Ok(());
return Ok(None);
}
None => {
@@ -222,7 +230,7 @@ pub(super) async fn restore_sliding_sync_state(
}
}
Ok(())
Ok(Some(restored_fields))
}
#[cfg(test)]
@@ -436,19 +444,12 @@ mod tests {
// Ok, forget about the sliding sync, let's recreate one from scratch.
drop(sliding_sync);
let mut read_delta_token = None;
restore_sliding_sync_state(
&client,
&storage_key_prefix,
&[].into(),
&mut read_delta_token,
&mut None,
)
.await?;
let restored_fields = restore_sliding_sync_state(&client, &storage_key_prefix, &[].into())
.await?
.expect("must have restored sliding sync fields");
// After restoring, the delta token and to-device token could be read.
assert_eq!(read_delta_token.unwrap(), delta_token);
assert_eq!(restored_fields.delta_token.unwrap(), delta_token);
// Test the "migration" path: assume a missing to-device token in crypto store,
// but present in a former state store.
@@ -474,22 +475,14 @@ mod tests {
)
.await?;
let mut read_delta_token = None;
let mut read_to_device_token = None;
restore_sliding_sync_state(
&client,
&storage_key_prefix,
&[].into(),
&mut read_delta_token,
&mut read_to_device_token,
)
.await?;
let restored_fields = restore_sliding_sync_state(&client, &storage_key_prefix, &[].into())
.await?
.expect("must have restored fields");
// After restoring, the delta token and to-device token could be read, even if
// they came both from the state store.
assert_eq!(read_delta_token.unwrap(), delta_token);
assert_eq!(read_to_device_token.unwrap(), to_device_token);
assert_eq!(restored_fields.delta_token.unwrap(), delta_token);
assert_eq!(restored_fields.to_device_token.unwrap(), to_device_token);
Ok(())
}

View File

@@ -489,17 +489,12 @@ impl SlidingSync {
if to_device_enabled {
let lists = self.inner.lists.read().await;
let mut to_device_token = None;
restore_sliding_sync_state(
&self.inner.client,
&self.inner.storage_key,
&lists,
&mut None,
&mut to_device_token,
)
.await?;
request.extensions.to_device.since = to_device_token;
if let Some(restored_fields) =
restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key, &lists)
.await?
{
request.extensions.to_device.since = restored_fields.to_device_token;
}
}
// Apply the transaction id if one was generated.