From 832146b43dd17fede7ee5e2c53beec33aa294263 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 8 Mar 2023 12:12:58 +0100 Subject: [PATCH] feat(sdk): Create `SlidingSyncInner`. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move all `SlidingSync`'s fields into a new `SlidingSyncInner` type, and then update `SlidingSync` to contain a single `inner: Arc` field. First off, it's much simpler to understand what's behind an `Arc` for “clonability” of `SlidingSync`, and what's not. We don't need to worry about adding a new field that is not behind an `Arc` for a specific reason or anything. The pattern is clear now. Second, this is required for next commits. --- crates/matrix-sdk/src/sliding_sync/builder.rs | 32 ++-- crates/matrix-sdk/src/sliding_sync/mod.rs | 159 ++++++++++-------- 2 files changed, 103 insertions(+), 88 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 0eb35a262..6feaa3909 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -16,8 +16,8 @@ use tracing::trace; use url::Url; use super::{ - Error, FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList, - SlidingSyncListBuilder, SlidingSyncRoom, + Error, FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncInner, + SlidingSyncList, SlidingSyncListBuilder, SlidingSyncRoom, }; use crate::{Client, Result}; @@ -286,24 +286,26 @@ impl SlidingSyncBuilder { trace!(len = rooms_found.len(), "rooms unfrozen"); - let rooms = Arc::new(StdRwLock::new(rooms_found)); - let lists = Arc::new(StdRwLock::new(self.lists)); + let rooms = StdRwLock::new(rooms_found); + let lists = StdRwLock::new(self.lists); Ok(SlidingSync { - homeserver: self.homeserver, - client, - storage_key: self.storage_key, + inner: Arc::new(SlidingSyncInner { + homeserver: self.homeserver, + client, + storage_key: self.storage_key, - lists, - rooms, + lists, + rooms, - extensions: Mutex::new(self.extensions).into(), - reset_counter: Default::default(), + extensions: Mutex::new(self.extensions).into(), + reset_counter: Default::default(), - pos: Arc::new(StdRwLock::new(Observable::new(None))), - delta_token: Arc::new(StdRwLock::new(Observable::new(delta_token_inner))), - subscriptions: Arc::new(StdRwLock::new(self.subscriptions)), - unsubscribe: Default::default(), + pos: StdRwLock::new(Observable::new(None)), + delta_token: StdRwLock::new(Observable::new(delta_token_inner)), + subscriptions: StdRwLock::new(self.subscriptions), + unsubscribe: Default::default(), + }), }) } } diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 67aa4c521..725a0aec0 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -643,6 +643,11 @@ const MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION: u8 = 3; /// The Sliding Sync instance. #[derive(Clone, Debug)] pub struct SlidingSync { + inner: Arc, +} + +#[derive(Debug)] +pub(super) struct SlidingSyncInner { /// Customize the homeserver for sliding sync only homeserver: Option, @@ -653,55 +658,33 @@ pub struct SlidingSync { storage_key: Option, /// The `pos` marker. - pos: Arc>>>, + pos: StdRwLock>>, - delta_token: Arc>>>, + delta_token: StdRwLock>>, /// The lists of this Sliding Sync instance. - lists: Arc>>, + lists: StdRwLock>, /// The rooms details - rooms: Arc>>, + rooms: StdRwLock>, - subscriptions: Arc>>, - unsubscribe: Arc>>, + subscriptions: StdRwLock>, + unsubscribe: StdRwLock>, /// Number of times a Sliding Session session has been reset. - reset_counter: Arc, + reset_counter: AtomicU8, /// the intended state of the extensions being supplied to sliding /sync /// calls. May contain the latest next_batch for to_devices, etc. - extensions: Arc>>, -} - -#[derive(Serialize, Deserialize)] -struct FrozenSlidingSync { - #[serde(skip_serializing_if = "Option::is_none")] - to_device_since: Option, - #[serde(skip_serializing_if = "Option::is_none")] - delta_token: Option, -} - -impl From<&SlidingSync> for FrozenSlidingSync { - fn from(v: &SlidingSync) -> Self { - FrozenSlidingSync { - delta_token: v.delta_token.read().unwrap().clone(), - to_device_since: v - .extensions - .lock() - .unwrap() - .as_ref() - .and_then(|ext| ext.to_device.as_ref()?.since.clone()), - } - } + extensions: Mutex>, } impl SlidingSync { async fn cache_to_storage(&self) -> Result<(), crate::Error> { - let Some(storage_key) = self.storage_key.as_ref() else { return Ok(()) }; + let Some(storage_key) = self.inner.storage_key.as_ref() else { return Ok(()) }; trace!(storage_key, "Saving to storage for later use"); - let store = self.client.store(); + let store = self.inner.client.store(); // Write this `SlidingSync` instance, as a `FrozenSlidingSync` instance, inside // the client store. @@ -714,9 +697,10 @@ impl SlidingSync { // Write every `SlidingSyncList` inside the client the store. let frozen_lists = { - let rooms_lock = self.rooms.read().unwrap(); + let rooms_lock = self.inner.rooms.read().unwrap(); - self.lists + self.inner + .lists .read() .unwrap() .iter() @@ -747,16 +731,16 @@ impl SlidingSync { /// lists but without the current state. pub fn new_builder_copy(&self) -> SlidingSyncBuilder { let mut builder = Self::builder() - .client(self.client.clone()) - .subscriptions(self.subscriptions.read().unwrap().to_owned()); + .client(self.inner.client.clone()) + .subscriptions(self.inner.subscriptions.read().unwrap().to_owned()); - for list in self.lists.read().unwrap().values().map(|list| { + for list in self.inner.lists.read().unwrap().values().map(|list| { list.new_builder().build().expect("builder worked before, builder works now") }) { builder = builder.add_list(list); } - if let Some(homeserver) = &self.homeserver { + if let Some(homeserver) = &self.inner.homeserver { builder.homeserver(homeserver.clone()) } else { builder @@ -769,7 +753,7 @@ impl SlidingSync { /// poll the stream after you've altered this. If you do that during, it /// might take one round trip to take effect. pub fn subscribe(&self, room_id: OwnedRoomId, settings: Option) { - self.subscriptions.write().unwrap().insert(room_id, settings.unwrap_or_default()); + self.inner.subscriptions.write().unwrap().insert(room_id, settings.unwrap_or_default()); } /// Unsubscribe from a given room. @@ -778,14 +762,14 @@ impl SlidingSync { /// poll the stream after you've altered this. If you do that during, it /// might take one round trip to take effect. pub fn unsubscribe(&self, room_id: OwnedRoomId) { - if self.subscriptions.write().unwrap().remove(&room_id).is_some() { - self.unsubscribe.write().unwrap().push(room_id); + if self.inner.subscriptions.write().unwrap().remove(&room_id).is_some() { + self.inner.unsubscribe.write().unwrap().push(room_id); } } /// Add the common extensions if not already configured. pub fn add_common_extensions(&self) { - let mut lock = self.extensions.lock().unwrap(); + let mut lock = self.inner.extensions.lock().unwrap(); let mut cfg = lock.get_or_insert_with(Default::default); if cfg.to_device.is_none() { @@ -803,18 +787,19 @@ impl SlidingSync { /// Lookup a specific room pub fn get_room(&self, room_id: &RoomId) -> Option { - self.rooms.read().unwrap().get(room_id).cloned() + self.inner.rooms.read().unwrap().get(room_id).cloned() } /// Check the number of rooms. pub fn get_number_of_rooms(&self) -> usize { - self.rooms.read().unwrap().len() + self.inner.rooms.read().unwrap().len() } fn update_to_device_since(&self, since: String) { // FIXME: Find a better place where the to-device since token should be // persisted. - self.extensions + self.inner + .extensions .lock() .unwrap() .get_or_insert_with(Default::default) @@ -829,7 +814,7 @@ impl SlidingSync { /// listening to the stream and is therefor not necessarily up to date /// with the lists used for the stream. pub fn list(&self, list_name: &str) -> Option { - self.lists.read().unwrap().get(list_name).cloned() + self.inner.lists.read().unwrap().get(list_name).cloned() } /// Remove the SlidingSyncList named `list_name` from the lists list if @@ -839,7 +824,7 @@ impl SlidingSync { /// stream created after this. The old stream will still continue to use the /// previous set of lists. pub fn pop_list(&self, list_name: &String) -> Option { - self.lists.write().unwrap().remove(list_name) + self.inner.lists.write().unwrap().remove(list_name) } /// Add the list to the list of lists. @@ -852,7 +837,7 @@ impl SlidingSync { /// stream created after this. The old stream will still continue to use the /// previous set of lists. pub fn add_list(&self, list: SlidingSyncList) -> Option { - self.lists.write().unwrap().insert(list.name.clone(), list) + self.inner.lists.write().unwrap().insert(list.name.clone(), list) } /// Lookup a set of rooms @@ -860,14 +845,14 @@ impl SlidingSync { &self, room_ids: I, ) -> Vec> { - let rooms = self.rooms.read().unwrap(); + let rooms = self.inner.rooms.read().unwrap(); room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect() } /// Get all rooms. pub fn get_all_rooms(&self) -> Vec { - self.rooms.read().unwrap().values().cloned().collect() + self.inner.rooms.read().unwrap().values().cloned().collect() } fn prepare_extension_config(&self, pos: Option<&str>) -> ExtensionsConfig { @@ -875,7 +860,7 @@ impl SlidingSync { // The pos is `None`, it's either our initial sync or the proxy forgot about us // and sent us an `UnknownPos` error. We need to send out the config for our // extensions. - let mut extensions = self.extensions.lock().unwrap().clone().unwrap_or_default(); + let mut extensions = self.inner.extensions.lock().unwrap().clone().unwrap_or_default(); // Always enable to-device events and the e2ee-extension on the initial request, // no matter what the caller wants. @@ -898,6 +883,7 @@ impl SlidingSync { // We already enabled all the things, just fetch out the to-device since token // out of self.extensions and set it in a new, and empty, `ExtensionsConfig`. let since = self + .inner .extensions .lock() .unwrap() @@ -919,12 +905,15 @@ impl SlidingSync { mut sync_response: SyncResponse, list_generators: &mut BTreeMap, ) -> Result { - Observable::set(&mut self.pos.write().unwrap(), Some(sliding_sync_response.pos)); - Observable::set(&mut self.delta_token.write().unwrap(), sliding_sync_response.delta_token); + Observable::set(&mut self.inner.pos.write().unwrap(), Some(sliding_sync_response.pos)); + Observable::set( + &mut self.inner.delta_token.write().unwrap(), + sliding_sync_response.delta_token, + ); let update_summary = { let mut rooms = Vec::new(); - let mut rooms_map = self.rooms.write().unwrap(); + let mut rooms_map = self.inner.rooms.write().unwrap(); for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() { // `sync_response` contains the rooms with decrypted events if any, so look at @@ -948,7 +937,7 @@ impl SlidingSync { rooms_map.insert( room_id.clone(), SlidingSyncRoom::new( - self.client.clone(), + self.inner.client.clone(), room_id.clone(), room_data, timeline, @@ -1014,10 +1003,10 @@ impl SlidingSync { return Ok(None); } - let pos = self.pos.read().unwrap().clone(); - let delta_token = self.delta_token.read().unwrap().clone(); - let room_subscriptions = self.subscriptions.read().unwrap().clone(); - let unsubscribe_rooms = mem::take(&mut *self.unsubscribe.write().unwrap()); + let pos = self.inner.pos.read().unwrap().clone(); + let delta_token = self.inner.delta_token.read().unwrap().clone(); + let room_subscriptions = self.inner.subscriptions.read().unwrap().clone(); + let unsubscribe_rooms = mem::take(&mut *self.inner.unsubscribe.write().unwrap()); let timeout = Duration::from_secs(30); let extensions = self.prepare_extension_config(pos.as_deref()); @@ -1028,7 +1017,7 @@ impl SlidingSync { let request_config = RequestConfig::default().timeout(timeout + Duration::from_secs(30)); // Prepare the request. - let request = self.client.send_with_homeserver( + let request = self.inner.client.send_with_homeserver( assign!(v4::Request::new(), { pos, delta_token, @@ -1042,7 +1031,7 @@ impl SlidingSync { extensions, }), Some(request_config), - self.homeserver.as_ref().map(ToString::to_string), + self.inner.homeserver.as_ref().map(ToString::to_string), ); // Send the request and get a response with end-to-end encryption support. @@ -1057,7 +1046,8 @@ impl SlidingSync { #[cfg(feature = "e2e-encryption")] let response = { let (e2ee_uploads, response) = - futures_util::future::join(self.client.send_outgoing_requests(), request).await; + futures_util::future::join(self.inner.client.send_outgoing_requests(), request) + .await; if let Err(error) = e2ee_uploads { error!(?error, "Error while sending outgoing E2EE requests"); @@ -1103,7 +1093,7 @@ impl SlidingSync { // move to Sliding Sync, i.e. to `v4::Response`), but processing the // `sliding_sync_response` is vital, so it must be done somewhere; for now it // happens here. - let sync_response = self.client.process_sliding_sync(&response).await?; + let sync_response = self.inner.client.process_sliding_sync(&response).await?; debug!("Sliding sync response has been processed"); @@ -1120,12 +1110,12 @@ impl SlidingSync { /// /// This stream will send requests and will handle responses automatically, /// hence updating the lists. - #[instrument(name = "sync_stream", skip_all, parent = &self.client.root_span)] + #[instrument(name = "sync_stream", skip_all, parent = &self.inner.client.root_span)] pub fn stream(&self) -> impl Stream> + '_ { // Collect all the lists that need to be updated. let mut list_generators = { let mut list_generators = BTreeMap::new(); - let lock = self.lists.read().unwrap(); + let lock = self.inner.lists.read().unwrap(); for (name, lists) in lock.iter() { list_generators.insert(name.clone(), lists.request_generator()); @@ -1136,7 +1126,7 @@ impl SlidingSync { let stream_id = Uuid::new_v4().to_string(); - debug!(?self.extensions, stream_id, "About to run the sync stream"); + debug!(?self.inner.extensions, stream_id, "About to run the sync stream"); let instrument_span = Span::current(); @@ -1145,12 +1135,12 @@ impl SlidingSync { let sync_span = info_span!(parent: &instrument_span, "sync_once"); sync_span.in_scope(|| { - debug!(?self.extensions, "Sync stream loop is running"); + debug!(?self.inner.extensions, "Sync stream loop is running"); }); match self.sync_once(&stream_id, &mut list_generators).instrument(sync_span.clone()).await { Ok(Some(updates)) => { - self.reset_counter.store(0, Ordering::SeqCst); + self.inner.reset_counter.store(0, Ordering::SeqCst); yield Ok(updates); } @@ -1164,7 +1154,7 @@ impl SlidingSync { // The session has expired. // Has it expired too many times? - if self.reset_counter.fetch_add(1, Ordering::SeqCst) >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION { + if self.inner.reset_counter.fetch_add(1, Ordering::SeqCst) >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION { sync_span.in_scope(|| error!("Session expired {MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION} times in a row")); // The session has expired too many times, let's raise an error! @@ -1178,9 +1168,9 @@ impl SlidingSync { warn!("Session expired. Restarting Sliding Sync."); // To “restart” a Sliding Sync session, we set `pos` to its initial value. - Observable::set(&mut self.pos.write().unwrap(), None); + Observable::set(&mut self.inner.pos.write().unwrap(), None); - debug!(?self.extensions, "Sliding Sync has been reset"); + debug!(?self.inner.extensions, "Sliding Sync has been reset"); }); } @@ -1198,12 +1188,35 @@ impl SlidingSync { impl SlidingSync { /// Get a copy of the `pos` value. pub fn pos(&self) -> Option { - self.pos.read().unwrap().clone() + self.inner.pos.read().unwrap().clone() } /// Set a new value for `pos`. pub fn set_pos(&self, new_pos: String) { - Observable::set(&mut self.pos.write().unwrap(), Some(new_pos)); + Observable::set(&mut self.inner.pos.write().unwrap(), Some(new_pos)); + } +} + +#[derive(Serialize, Deserialize)] +struct FrozenSlidingSync { + #[serde(skip_serializing_if = "Option::is_none")] + to_device_since: Option, + #[serde(skip_serializing_if = "Option::is_none")] + delta_token: Option, +} + +impl From<&SlidingSync> for FrozenSlidingSync { + fn from(sliding_sync: &SlidingSync) -> Self { + FrozenSlidingSync { + delta_token: sliding_sync.inner.delta_token.read().unwrap().clone(), + to_device_since: sliding_sync + .inner + .extensions + .lock() + .unwrap() + .as_ref() + .and_then(|ext| ext.to_device.as_ref()?.since.clone()), + } } }