From fcfdf5c57cde840d80fe82ccb094a8f7347a4609 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 23 Feb 2023 16:09:00 +0100 Subject: [PATCH 1/9] chore(sdk): Rename a variable in `SlidingSync::handle_response`. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 3292ce7ea..7bb5c409f 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -1002,31 +1002,37 @@ impl SlidingSync { let mut rooms = Vec::new(); let mut rooms_map = self.rooms.write().unwrap(); - for (id, mut room_data) in sliding_sync_response.rooms.into_iter() { + for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() { // `sync_response` contains the rooms with decrypted events if any, so look at // the timeline events here first if the room exists. // Otherwise, let's look at the timeline inside the `sliding_sync_response`. - let timeline = if let Some(joined_room) = sync_response.rooms.join.remove(&id) { + let timeline = if let Some(joined_room) = sync_response.rooms.join.remove(&room_id) + { joined_room.timeline.events } else { room_data.timeline.drain(..).map(Into::into).collect() }; - if let Some(mut room) = rooms_map.remove(&id) { + if let Some(mut room) = rooms_map.remove(&room_id) { // The room existed before, let's update it. room.update(room_data, timeline); - rooms_map.insert(id.clone(), room); + rooms_map.insert(room_id.clone(), room); } else { // First time we need this room, let's create it. rooms_map.insert( - id.clone(), - SlidingSyncRoom::new(self.client.clone(), id.clone(), room_data, timeline), + room_id.clone(), + SlidingSyncRoom::new( + self.client.clone(), + room_id.clone(), + room_data, + timeline, + ), ); } - rooms.push(id); + rooms.push(room_id); } let mut updated_views = Vec::new(); From 23b3e3aa10b2fb4fda8ad5f1727454f6176a3e8a Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 23 Feb 2023 16:34:07 +0100 Subject: [PATCH 2/9] chore(sdk): Format code a little bit. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 7bb5c409f..3a6cdaa32 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -1039,25 +1039,27 @@ impl SlidingSync { for (name, updates) in sliding_sync_response.lists { let Some(generator) = views.get_mut(&name) else { - error!("Response for view {name} - unknown to us. skipping"); + error!("Response for view `{name}` - unknown to us; skipping"); + continue }; + let count: u32 = updates.count.try_into().expect("the list total count convertible into u32"); + if generator.handle_response(count, &updates.ops, &rooms)? { updated_views.push(name.clone()); } } // Update the `to-device` next-batch if found. - if let Some(to_device_since) = - sliding_sync_response.extensions.to_device.map(|t| t.next_batch) - { - self.update_to_device_since(to_device_since) - } + sliding_sync_response + .extensions + .to_device + .map(|to_device| self.update_to_device_since(to_device.next_batch)); - // track the most recently successfully sent extensions (needed for sticky - // semantics) + // Track the most recently successfully sent extensions (needed for sticky + // semantics). if extensions.is_some() { *self.sent_extensions.lock().unwrap() = extensions; } From f803e58e367c7d83c3525ca7a1c94f6ce93e0b84 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 23 Feb 2023 16:53:16 +0100 Subject: [PATCH 3/9] chore(sdk): Keep `to_remove` inside its own scope. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 3a6cdaa32..5fd3da093 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -1077,18 +1077,21 @@ impl SlidingSync { views: &mut BTreeMap, ) -> Result> { let mut requests = BTreeMap::new(); - let mut to_remove = Vec::new(); - for (name, generator) in views.iter_mut() { - if let Some(request) = generator.next() { - requests.insert(name.clone(), request); - } else { - to_remove.push(name.clone()); + { + let mut views_to_remove = Vec::new(); + + for (name, generator) in views.iter_mut() { + if let Some(request) = generator.next() { + requests.insert(name.clone(), request); + } else { + views_to_remove.push(name.clone()); + } } - } - for n in to_remove { - views.remove(&n); + for view_name in views_to_remove { + views.remove(&view_name); + } } if views.is_empty() { From 98ea3f096b33905b2f6b55fa6c858672df5bba82 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 27 Feb 2023 11:53:15 +0100 Subject: [PATCH 4/9] chore(sdk): Clean up code of `SlidingSync::cache_to_storage`. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 40 +++++++++++++++-------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 5fd3da093..bce751bcb 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -815,32 +815,44 @@ impl From<&SlidingSync> for FrozenSlidingSync { } impl SlidingSync { - async fn cache_to_storage(&self) -> Result<()> { + async fn cache_to_storage(&self) -> Result<(), crate::Error> { let Some(storage_key) = self.storage_key.as_ref() else { return Ok(()) }; trace!(storage_key, "saving to storage for later use"); - let v = serde_json::to_vec(&FrozenSlidingSync::from(self))?; - self.client.store().set_custom_value(storage_key.as_bytes(), v).await?; + + let store = self.client.store(); + + // Write this `SlidingSync` instance, as a `FrozenSlidingSync` instance, inside + // the client store. + store + .set_custom_value( + storage_key.as_bytes(), + serde_json::to_vec(&FrozenSlidingSync::from(self))?, + ) + .await?; + + // Write every `SlidingSyncView` inside the client the store. let frozen_views = { let rooms_lock = self.rooms.read().unwrap(); + self.views .read() .unwrap() .iter() .map(|(name, view)| { - (name.clone(), FrozenSlidingSyncView::freeze(view, &rooms_lock)) + Ok(( + format!("{storage_key}::{name}"), + serde_json::to_vec(&FrozenSlidingSyncView::freeze(view, &rooms_lock))?, + )) }) - .collect::>() + .collect::, crate::Error>>()? }; - for (name, frozen) in frozen_views { - trace!(storage_key, name, "saving to view for later use"); - self.client - .store() - .set_custom_value( - format!("{storage_key}::{name}").as_bytes(), - serde_json::to_vec(&frozen)?, - ) - .await?; // FIXME: parallelize? + + for (storage_key, frozen_view) in frozen_views { + trace!(storage_key, "Saving the frozen Sliding Sync View"); + + store.set_custom_value(storage_key.as_bytes(), frozen_view).await?; } + Ok(()) } From e5c85410a8d09014924001458afae3ad941f091a Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 27 Feb 2023 11:54:48 +0100 Subject: [PATCH 5/9] doc(sdk): Fix documentation, add links etc. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 26 +++++++++++------------ 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index bce751bcb..d2098f562 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -861,19 +861,16 @@ impl SlidingSync { SlidingSyncBuilder::new() } - /// Generate a new SlidingSyncBuilder with the same inner settings and views - /// but without the current state + /// Generate a new [`SlidingSyncBuilder`] with the same inner settings and + /// views but without the current state. pub fn new_builder_copy(&self) -> SlidingSyncBuilder { let mut builder = Self::builder() .client(self.client.clone()) .subscriptions(self.subscriptions.read().unwrap().to_owned()); - for view in self - .views - .read() - .unwrap() - .values() - .map(|v| v.new_builder().build().expect("builder worked before, builder works now")) - { + + for view in self.views.read().unwrap().values().map(|view| { + view.new_builder().build().expect("builder worked before, builder works now") + }) { builder = builder.add_view(view); } @@ -904,10 +901,11 @@ impl SlidingSync { } } - /// Add the common extensions if not already configured + /// Add the common extensions if not already configured. pub fn add_common_extensions(&self) { let mut lock = self.extensions.lock().unwrap(); let mut cfg = lock.get_or_insert_with(Default::default); + if cfg.to_device.is_none() { cfg.to_device = Some(assign!(ToDeviceConfig::default(), { enabled: Some(true) })); } @@ -955,7 +953,7 @@ impl SlidingSync { /// /// Note: Remember that this change will only be applicable for any new /// stream created after this. The old stream will still continue to use the - /// previous set of views + /// previous set of views. pub fn pop_view(&self, view_name: &String) -> Option { self.views.write().unwrap().remove(view_name) } @@ -968,7 +966,7 @@ impl SlidingSync { /// /// Note: Remember that this change will only be applicable for any new /// stream created after this. The old stream will still continue to use the - /// previous set of views + /// previous set of views. pub fn add_view(&self, view: SlidingSyncView) -> Option { self.views.write().unwrap().insert(view.name.clone(), view) } @@ -979,6 +977,7 @@ impl SlidingSync { room_ids: I, ) -> Vec> { let rooms = self.rooms.read().unwrap(); + room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect() } @@ -1116,10 +1115,11 @@ impl SlidingSync { let unsubscribe_rooms = mem::take(&mut *self.unsubscribe.write().unwrap()); let timeout = Duration::from_secs(30); - // implement stickiness by only sending extensions if they have + // Implement stickiness by only sending extensions if they have // changed since the last time we sent them let extensions = { let extensions = self.extensions.lock().unwrap(); + if *extensions == *self.sent_extensions.lock().unwrap() { None } else { From 4dbd1a7db4fd7a83a6f5229815dfbc440b61548a Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 27 Feb 2023 11:56:16 +0100 Subject: [PATCH 6/9] fix(sdk): Fix `join` in `SlidingSync::sync_once`. This patch cleans up the `SlidingSync::sync_once` method by renaming variables, adding more comments, simplifying the code by reducing the number of variables etc. This patch also changes `futures_util::join!` to `futures_util::future::join`. It does the same but the macro needs the `async-await-macros` feature to be turned on, while the second works without any features. Finally, this patch improves the log messages by making them more clear for a new reader. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 51 +++++++++++++---------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index d2098f562..4a927a92c 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -1087,14 +1087,14 @@ impl SlidingSync { &self, views: &mut BTreeMap, ) -> Result> { - let mut requests = BTreeMap::new(); + let mut lists_of_requests = BTreeMap::new(); { let mut views_to_remove = Vec::new(); for (name, generator) in views.iter_mut() { if let Some(request) = generator.next() { - requests.insert(name.clone(), request); + lists_of_requests.insert(name.clone(), request); } else { views_to_remove.push(name.clone()); } @@ -1127,40 +1127,49 @@ impl SlidingSync { } }; - let request = assign!(v4::Request::new(), { - lists: requests, - pos, - delta_token, - timeout: Some(timeout), - room_subscriptions, - unsubscribe_rooms, - extensions: extensions.clone().unwrap_or_default(), - }); - debug!("requesting"); + debug!("Sending the sliding sync request"); - // 30s for the long poll + 30s for network delays + // Configure long-polling. We need 30 seconds for the long-poll itself, in + // addition to 30 more extra seconds for the network delays. let request_config = RequestConfig::default().timeout(timeout + Duration::from_secs(30)); + + // Prepare the request. let request = self.client.send_with_homeserver( - request, + assign!(v4::Request::new(), { + lists: lists_of_requests, + pos, + delta_token, + timeout: Some(timeout), + room_subscriptions, + unsubscribe_rooms, + extensions: extensions.clone().unwrap_or_default(), + }), Some(request_config), self.homeserver.as_ref().map(ToString::to_string), ); + // Send the request and get a response with end-to-end encryption support. #[cfg(feature = "e2e-encryption")] let response = { - let (e2ee_uploads, resp) = - futures_util::join!(self.client.send_outgoing_requests(), request); - if let Err(e) = e2ee_uploads { - error!(error = ?e, "Error while sending outgoing E2EE requests"); + let (e2ee_uploads, response) = + futures_util::future::join(self.client.send_outgoing_requests(), request).await; + + if let Err(error) = e2ee_uploads { + error!(?error, "Error while sending outgoing E2EE requests"); } - resp + + response }?; + + // Send the request and get a response _without_ end-to-end encryption support. #[cfg(not(feature = "e2e-encryption"))] let response = request.await?; - debug!("received"); + + debug!("Sliding sync response received"); let updates = self.handle_response(response, extensions, views).await?; - debug!("handled"); + + debug!("Sliding sync response has been handled"); Ok(Some(updates)) } From 16c9845a4784eedf52ca58e3997c0deccfeebb43 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 27 Feb 2023 12:09:14 +0100 Subject: [PATCH 7/9] chore(sdk): Rename `SlidingSync.failure_count` to `.reset_counter`. This patch cleans up the `SlidingSync::stream` method. It renames variables, improves log messages etc. This patch also creates a new `MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION` constant. This value was previously hardcoded and lost in the code, now it's easier to spot it for further updates. This patch finally renames the `failure_count` field to `reset_counter`, because it doesn't count the number of failure, but the number of `ErrorKind::UnknownPos` exactly, i.e. the number of times we reset the `SlidingSync` state. --- crates/matrix-sdk/src/sliding_sync/builder.rs | 2 +- crates/matrix-sdk/src/sliding_sync/mod.rs | 69 ++++++++++++------- 2 files changed, 46 insertions(+), 25 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 39a5af3a0..d6cddc8c2 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -298,7 +298,7 @@ impl SlidingSyncBuilder { extensions: Mutex::new(self.extensions).into(), sent_extensions: Mutex::new(None).into(), - failure_count: Default::default(), + reset_counter: Default::default(), pos: Arc::new(StdRwLock::new(Observable::new(None))), delta_token: Arc::new(StdRwLock::new(Observable::new(delta_token_inner))), diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 4a927a92c..088948e14 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -755,6 +755,15 @@ pub struct UpdateSummary { pub rooms: Vec, } +/// Number of times a Sliding Sync session can expire before raising an error. +/// +/// A Sliding Sync session can expire. In this case, it is reset. However, to +/// avoid entering an infinite loop of “it's expired, let's reset, it's expired, +/// let's reset…” (maybe if the network has an issue, or the server, or anything +/// else), we defined a maximum times a session can expire before +/// raising a proper error. +const MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION: u8 = 3; + /// The sliding sync instance #[derive(Clone, Debug)] pub struct SlidingSync { @@ -780,8 +789,8 @@ pub struct SlidingSync { subscriptions: Arc>>, unsubscribe: Arc>>, - /// keeping track of retries and failure counts - failure_count: Arc, + /// Number of times a Sliding Session session has been reset. + reset_counter: Arc, /// the intended state of the extensions being supplied to sliding /sync /// calls. May contain the latest next_batch for to_devices, etc. @@ -1174,66 +1183,78 @@ impl SlidingSync { Ok(Some(updates)) } - /// Create the inner stream for the view. + /// Create a _new_ Sliding Sync stream. /// - /// Run this stream to receive new updates from the server. + /// This stream will send requests and will handle responses automatically, + /// hence updating the views. #[instrument(name = "sync_stream", skip_all, parent = &self.client.root_span)] pub fn stream(&self) -> impl Stream> + '_ { + // Collect all the views that needsto be updated. let mut views = { let mut views = BTreeMap::new(); - let views_lock = self.views.read().unwrap(); - for (name, view) in views_lock.iter() { + let lock = self.views.read().unwrap(); + + for (name, view) in lock.iter() { views.insert(name.clone(), view.request_generator()); } + views }; - debug!(?self.extensions, "Setting view stream going"); - let stream_span = Span::current(); + debug!(?self.extensions, "About to run the sync stream"); + + let instrument_span = Span::current(); async_stream::stream! { loop { - let sync_span = info_span!(parent: &stream_span, "sync_once"); + let sync_span = info_span!(parent: &instrument_span, "sync_once"); sync_span.in_scope(|| { - debug!(?self.extensions, "Sync loop running"); + debug!(?self.extensions, "Sync stream loop is running"); }); match self.sync_once(&mut views).instrument(sync_span.clone()).await { Ok(Some(updates)) => { - self.failure_count.store(0, Ordering::SeqCst); + self.reset_counter.store(0, Ordering::SeqCst); - yield Ok(updates) + yield Ok(updates); } Ok(None) => { break; } - Err(e) => { - if e.client_api_error_kind() == Some(&ErrorKind::UnknownPos) { - // session expired, let's reset - if self.failure_count.fetch_add(1, Ordering::SeqCst) >= 3 { - sync_span.in_scope(|| error!("session expired three times in a row")); - yield Err(e.into()); + Err(error) => { + if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) { + // The session has expired. - break + // Has it expired too many times? + if self.reset_counter.fetch_add(1, Ordering::SeqCst) >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION { + sync_span.in_scope(|| error!("Session expired {MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION} times in a row")); + + // The session has expired too many times, let's raise an error! + yield Err(error.into()); + + break; } + // Let's reset the Sliding Sync session. sync_span.in_scope(|| { - warn!("Session expired. Restarting sliding sync."); + warn!("Session expired. Restarting Sliding Sync."); + + // To “restart” a Sliding Sync session, we set `pos` to its initial value. Observable::set(&mut self.pos.write().unwrap(), None); - // reset our extensions to the last known good ones. + // We also need to reset our extensions to the last known good ones. *self.extensions.lock().unwrap() = self.sent_extensions.lock().unwrap().take(); - debug!(?self.extensions, "Resetting view stream"); + debug!(?self.extensions, "Sliding Sync has been reset"); }); } - yield Err(e.into()); + yield Err(error.into()); - continue + continue; } } } From f51b51b19e4de42d01542ad25b46d84440ce8bd5 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 27 Feb 2023 12:14:01 +0100 Subject: [PATCH 8/9] chore(sdk): Avoid mapping to a function returning `()`. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 088948e14..005e3d000 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -1072,11 +1072,10 @@ impl SlidingSync { } } - // Update the `to-device` next-batch if found. - sliding_sync_response - .extensions - .to_device - .map(|to_device| self.update_to_device_since(to_device.next_batch)); + // Update the `to-device` next-batch if any. + if let Some(to_device) = sliding_sync_response.extensions.to_device { + self.update_to_device_since(to_device.next_batch); + } // Track the most recently successfully sent extensions (needed for sticky // semantics). From ac1d5afac3bf0ec32dde09e47ddc18201e08899b Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 27 Feb 2023 13:29:03 +0100 Subject: [PATCH 9/9] doc(sdk): Improve inline documentation with link to an issue. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 005e3d000..ba3b6ef6b 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -1157,6 +1157,14 @@ impl SlidingSync { ); // Send the request and get a response with end-to-end encryption support. + // + // Sending the `/sync` request out when end-to-end encryption is enabled means + // that we need to also send out any outgoing e2ee related request out + // coming from the `OlmMachine::outgoing_requests()` method. + // + // FIXME: Processing outgiong requests at the same time while a `/sync` is in + // flight is currently not supported. + // More info: [#1386](https://github.com/matrix-org/matrix-rust-sdk/issues/1386). #[cfg(feature = "e2e-encryption")] let response = { let (e2ee_uploads, response) =