From 3511a4a053a95f9936ce6e26b345d46871170406 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 9 Mar 2023 16:40:26 +0100 Subject: [PATCH] fix(sdk): `SlidingSync` has a lock for both `pos` _and_ `delta_token`. This patch updates `SlidingSync.pos` and `.delta_token`. Each field has their own lock. It's annoying because they must updated at the same time to not be out-of-sync. So a new field `SlidingSync.position` of type `SlidingSyncPositionMarkers` is created. It holds `pos` and `delta_token. This new field is behind a single `RwLock`. --- crates/matrix-sdk/src/sliding_sync/builder.rs | 26 ++++++----- crates/matrix-sdk/src/sliding_sync/mod.rs | 46 +++++++++++++------ 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 62d839741..27e203c12 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -17,7 +17,7 @@ use url::Url; use super::{ Error, FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncInner, - SlidingSyncList, SlidingSyncListBuilder, SlidingSyncRoom, + SlidingSyncList, SlidingSyncListBuilder, SlidingSyncPositionMarkers, SlidingSyncRoom, }; use crate::{Client, Result}; @@ -232,7 +232,7 @@ impl SlidingSyncBuilder { pub async fn build(mut self) -> Result { let client = self.client.ok_or(Error::BuildMissingField("client"))?; - let mut delta_token_inner = None; + let mut delta_token = None; let mut rooms_found: BTreeMap = BTreeMap::new(); if let Some(storage_key) = &self.storage_key { @@ -261,12 +261,13 @@ impl SlidingSyncBuilder { } } - if let Some(FrozenSlidingSync { to_device_since, delta_token }) = client - .store() - .get_custom_value(storage_key.as_bytes()) - .await? - .map(|v| serde_json::from_slice::(&v)) - .transpose()? + if let Some(FrozenSlidingSync { to_device_since, delta_token: frozen_delta_token }) = + client + .store() + .get_custom_value(storage_key.as_bytes()) + .await? + .map(|v| serde_json::from_slice::(&v)) + .transpose()? { trace!("frozen for generic found"); @@ -278,7 +279,7 @@ impl SlidingSyncBuilder { } } - delta_token_inner = delta_token; + delta_token = frozen_delta_token; } trace!("sync unfrozen done"); @@ -300,8 +301,11 @@ impl SlidingSyncBuilder { extensions: Mutex::new(self.extensions), reset_counter: Default::default(), - pos: StdRwLock::new(Observable::new(None)), - delta_token: StdRwLock::new(Observable::new(delta_token_inner)), + position: StdRwLock::new(SlidingSyncPositionMarkers { + pos: Observable::new(None), + delta_token: Observable::new(delta_token), + }), + subscriptions: StdRwLock::new(self.subscriptions), unsubscribe: Default::default(), })) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 567c52ac4..b19ececa9 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -666,10 +666,8 @@ pub(super) struct SlidingSyncInner { /// The storage key to keep this cache at and load it from storage_key: Option, - /// The `pos` marker. - pos: StdRwLock>>, - - delta_token: StdRwLock>>, + /// The `pos` and `delta_token` markers. + position: StdRwLock, /// The lists of this Sliding Sync instance. lists: StdRwLock>, @@ -918,11 +916,11 @@ impl SlidingSync { mut sync_response: SyncResponse, list_generators: &mut BTreeMap, ) -> Result { - Observable::set(&mut self.inner.pos.write().unwrap(), Some(sliding_sync_response.pos)); - Observable::set( - &mut self.inner.delta_token.write().unwrap(), - sliding_sync_response.delta_token, - ); + { + let mut position_lock = self.inner.position.write().unwrap(); + Observable::set(&mut position_lock.pos, Some(sliding_sync_response.pos)); + Observable::set(&mut position_lock.delta_token, sliding_sync_response.delta_token); + } let update_summary = { let mut rooms = Vec::new(); @@ -1018,8 +1016,12 @@ impl SlidingSync { } } - let pos = self.inner.pos.read().unwrap().clone(); - let delta_token = self.inner.delta_token.read().unwrap().clone(); + let (pos, delta_token) = { + let position_lock = self.inner.position.read().unwrap(); + + (position_lock.pos.clone(), position_lock.delta_token.clone()) + }; + let room_subscriptions = self.inner.subscriptions.read().unwrap().clone(); let unsubscribe_rooms = mem::take(&mut *self.inner.unsubscribe.write().unwrap()); let timeout = Duration::from_secs(30); @@ -1198,7 +1200,11 @@ impl SlidingSync { warn!("Session expired. Restarting Sliding Sync."); // To “restart” a Sliding Sync session, we set `pos` to its initial value. - Observable::set(&mut self.inner.pos.write().unwrap(), None); + { + let mut position_lock = self.inner.position.write().unwrap(); + + Observable::set(&mut position_lock.pos, None); + } debug!(?self.inner.extensions, "Sliding Sync has been reset"); }); @@ -1218,15 +1224,25 @@ impl SlidingSync { impl SlidingSync { /// Get a copy of the `pos` value. pub fn pos(&self) -> Option { - self.inner.pos.read().unwrap().clone() + let position_lock = self.inner.position.read().unwrap(); + + position_lock.pos.clone() } /// Set a new value for `pos`. pub fn set_pos(&self, new_pos: String) { - Observable::set(&mut self.inner.pos.write().unwrap(), Some(new_pos)); + let mut position_lock = self.inner.position.write().unwrap(); + + Observable::set(&mut position_lock.pos, Some(new_pos)); } } +#[derive(Debug)] +pub(super) struct SlidingSyncPositionMarkers { + pos: Observable>, + delta_token: Observable>, +} + #[derive(Serialize, Deserialize)] struct FrozenSlidingSync { #[serde(skip_serializing_if = "Option::is_none")] @@ -1238,7 +1254,7 @@ struct FrozenSlidingSync { impl From<&SlidingSync> for FrozenSlidingSync { fn from(sliding_sync: &SlidingSync) -> Self { FrozenSlidingSync { - delta_token: sliding_sync.inner.delta_token.read().unwrap().clone(), + delta_token: sliding_sync.inner.position.read().unwrap().delta_token.clone(), to_device_since: sliding_sync .inner .extensions