From 8f6f20bbe7848724eb17318a688f3d4c8184d637 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 1 Mar 2023 16:50:36 +0100 Subject: [PATCH 1/9] fix(bindings): `SlidingSync.sync` returns an immediately cancellable `TaskHandle`. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before this patch, `SlidingSync::sync` was returning a callback-based `TaskHandle`. It was waiting on the “stream loop” to finish (since it's a long- poll, it means waiting 2*30s, cf. our code), before checking an atomic flag has some value to decide whether it was time to leave the loop or not. So when the user is cancelling this `TaskHandle`, a response (if any) was always handled. But in the meantime, it was possible to start a new `sync`, and it seems like it creates bugs. After this patch, `SlidingSync::sync` now returns a handle-based `TaskHandle`. It means that cancelling it will cancel the “stream loop” immediately. If no response was in-flight from the server, that's perfect, no problem. If a response was in-flight, the inner `pos` of the `SlidingSync` instance won't be updated as the response won't be handled. So the server will re-send the same response with the next sync request. I guess it's better this way. Thoughts? --- bindings/matrix-sdk-ffi/src/sliding_sync.rs | 30 ++++++++------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index a8c8ad0fe..986398b07 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -723,43 +723,35 @@ impl SlidingSync { let inner = self.inner.clone(); let client = self.client.clone(); let observer = self.observer.clone(); - let stop_loop = Arc::new(AtomicBool::new(false)); - let remote_stopper = stop_loop.clone(); - let stoppable = Arc::new(TaskHandle::with_callback(Box::new(move || { - remote_stopper.store(true, Ordering::Relaxed); - }))); - - RUNTIME.spawn(async move { + Arc::new(TaskHandle::with_handle(RUNTIME.spawn(async move { let stream = inner.stream(); pin_mut!(stream); + loop { - let update = match stream.next().await { - Some(Ok(u)) => u, - Some(Err(e)) => { - if client.process_sync_error(e) == LoopCtrl::Break { + let update_summary = match stream.next().await { + Some(Ok(update_summary)) => update_summary, + + Some(Err(error)) => { + if client.process_sync_error(error) == LoopCtrl::Break { warn!("loop was stopped by client error processing"); break; } else { continue; } } + None => { warn!("Inner streaming loop ended unexpectedly"); break; } }; + if let Some(ref observer) = *observer.read().unwrap() { - observer.did_receive_sync_update(update.into()); - } - if stop_loop.load(Ordering::Relaxed) { - trace!("stopped sync loop after cancellation"); - break; + observer.did_receive_sync_update(update_summary.into()); } } - }); - - stoppable + }))) } } From 07f6a3b3450c8c52c8ecb56dff8c3197b726444c Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 1 Mar 2023 16:56:32 +0100 Subject: [PATCH 2/9] chore: Remove warnings. --- bindings/matrix-sdk-ffi/src/sliding_sync.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index 986398b07..78e49b1b8 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -1,7 +1,4 @@ -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, RwLock, -}; +use std::sync::{Arc, RwLock}; use anyhow::Context; use eyeball::Observable; @@ -20,7 +17,7 @@ pub use matrix_sdk::{ SlidingSyncBuilder as MatrixSlidingSyncBuilder, SlidingSyncMode, SlidingSyncState, }; use tokio::{sync::broadcast::error::RecvError, task::JoinHandle}; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, error, warn}; use url::Url; use super::{Client, Room, RUNTIME}; @@ -41,6 +38,7 @@ impl TaskHandle { Self { handle: Some(handle), callback: Default::default() } } + #[allow(dead_code)] fn with_callback(callback: TaskHandleCallback) -> Self { Self { handle: Default::default(), callback: RwLock::new(Some(callback)) } } From 7369010964127f258fe6537fe1e328d24e15977b Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 8 Mar 2023 09:41:29 +0100 Subject: [PATCH 3/9] feat(sdk): Make `SlidingSync::handle_response` _sync_, no more _async_. The idea is to group all async and await points in `sync_once`. It's easier to think about it. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 62 +++++++++++------------ 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 0f3bee748..d9bc7b8cf 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -613,6 +613,7 @@ pub use error::*; use eyeball::Observable; use futures_core::stream::Stream; pub use list::*; +use matrix_sdk_base::sync::SyncResponse; pub use room::*; use ruma::{ api::client::{ @@ -912,38 +913,12 @@ impl SlidingSync { /// Handle the HTTP response. #[instrument(skip_all, fields(lists = list_generators.len()))] - async fn handle_response( + fn handle_response( &self, sliding_sync_response: v4::Response, - stream_id: &str, + mut sync_response: SyncResponse, list_generators: &mut BTreeMap, ) -> Result { - match &sliding_sync_response.txn_id { - None => { - error!(stream_id, "Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; it's missing"); - } - - Some(txn_id) if txn_id != stream_id => { - error!( - stream_id, - txn_id, - "Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; they differ" - ); - } - - _ => {} - } - - // Handle and transform a Sliding Sync Response to a `SyncResponse`. - // - // We may not need the `sync_response` in the future (once `SyncResponse` will - // move to Sliding Sync, i.e. to `v4::Response`), but processing the - // `sliding_sync_response` is vital, so it must be done somewhere; for now it - // happens here. - let mut sync_response = self.client.process_sliding_sync(&sliding_sync_response).await?; - - debug!("Sliding sync response has been processed"); - Observable::set(&mut self.pos.write().unwrap(), Some(sliding_sync_response.pos)); Observable::set(&mut self.delta_token.write().unwrap(), sliding_sync_response.delta_token); @@ -1009,8 +984,6 @@ impl SlidingSync { UpdateSummary { lists: updated_lists, rooms } }; - self.cache_to_storage().await?; - Ok(update_summary) } @@ -1099,7 +1072,34 @@ impl SlidingSync { debug!("Sliding sync response received"); - let updates = self.handle_response(response, stream_id, list_generators).await?; + match &response.txn_id { + None => { + error!(stream_id, "Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; it's missing"); + } + + Some(txn_id) if txn_id != stream_id => { + error!( + stream_id, + txn_id, + "Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; they differ" + ); + } + + _ => {} + } + + debug!("Sliding sync response has been processed"); + + // Handle and transform a Sliding Sync Response to a `SyncResponse`. + // + // We may not need the `sync_response` in the future (once `SyncResponse` will + // move to Sliding Sync, i.e. to `v4::Response`), but processing the + // `sliding_sync_response` is vital, so it must be done somewhere; for now it + // happens here. + let sync_response = self.client.process_sliding_sync(&response).await?; + let updates = self.handle_response(response, sync_response, list_generators)?; + + self.cache_to_storage().await?; debug!("Sliding sync response has been handled"); From 66d4ced90fc4739bbc318135a997d2dbdacc79db Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 8 Mar 2023 12:01:57 +0100 Subject: [PATCH 4/9] chore: Add some inline documentation. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index d9bc7b8cf..67aa4c521 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -1070,6 +1070,15 @@ impl SlidingSync { #[cfg(not(feature = "e2e-encryption"))] let response = request.await?; + // At this point, the request has been sent, and a response has been received. + // + // We must ensure the handling of the response cannot be stopped/ + // cancelled. It must be done entirely, otherwise we can have + // corrupted/incomplete states for Sliding Sync and other parts of + // the code. + // + // That's why we are running the handling of the response in a blocking + // mode since it cannot be cancelled abruptly. debug!("Sliding sync response received"); match &response.txn_id { @@ -1088,8 +1097,6 @@ impl SlidingSync { _ => {} } - debug!("Sliding sync response has been processed"); - // Handle and transform a Sliding Sync Response to a `SyncResponse`. // // We may not need the `sync_response` in the future (once `SyncResponse` will @@ -1097,6 +1104,9 @@ impl SlidingSync { // `sliding_sync_response` is vital, so it must be done somewhere; for now it // happens here. let sync_response = self.client.process_sliding_sync(&response).await?; + + debug!("Sliding sync response has been processed"); + let updates = self.handle_response(response, sync_response, list_generators)?; self.cache_to_storage().await?; From 832146b43dd17fede7ee5e2c53beec33aa294263 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 8 Mar 2023 12:12:58 +0100 Subject: [PATCH 5/9] feat(sdk): Create `SlidingSyncInner`. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move all `SlidingSync`'s fields into a new `SlidingSyncInner` type, and then update `SlidingSync` to contain a single `inner: Arc` field. First off, it's much simpler to understand what's behind an `Arc` for “clonability” of `SlidingSync`, and what's not. We don't need to worry about adding a new field that is not behind an `Arc` for a specific reason or anything. The pattern is clear now. Second, this is required for next commits. --- crates/matrix-sdk/src/sliding_sync/builder.rs | 32 ++-- crates/matrix-sdk/src/sliding_sync/mod.rs | 159 ++++++++++-------- 2 files changed, 103 insertions(+), 88 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 0eb35a262..6feaa3909 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -16,8 +16,8 @@ use tracing::trace; use url::Url; use super::{ - Error, FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList, - SlidingSyncListBuilder, SlidingSyncRoom, + Error, FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncInner, + SlidingSyncList, SlidingSyncListBuilder, SlidingSyncRoom, }; use crate::{Client, Result}; @@ -286,24 +286,26 @@ impl SlidingSyncBuilder { trace!(len = rooms_found.len(), "rooms unfrozen"); - let rooms = Arc::new(StdRwLock::new(rooms_found)); - let lists = Arc::new(StdRwLock::new(self.lists)); + let rooms = StdRwLock::new(rooms_found); + let lists = StdRwLock::new(self.lists); Ok(SlidingSync { - homeserver: self.homeserver, - client, - storage_key: self.storage_key, + inner: Arc::new(SlidingSyncInner { + homeserver: self.homeserver, + client, + storage_key: self.storage_key, - lists, - rooms, + lists, + rooms, - extensions: Mutex::new(self.extensions).into(), - reset_counter: Default::default(), + extensions: Mutex::new(self.extensions).into(), + reset_counter: Default::default(), - pos: Arc::new(StdRwLock::new(Observable::new(None))), - delta_token: Arc::new(StdRwLock::new(Observable::new(delta_token_inner))), - subscriptions: Arc::new(StdRwLock::new(self.subscriptions)), - unsubscribe: Default::default(), + pos: StdRwLock::new(Observable::new(None)), + delta_token: StdRwLock::new(Observable::new(delta_token_inner)), + subscriptions: StdRwLock::new(self.subscriptions), + unsubscribe: Default::default(), + }), }) } } diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 67aa4c521..725a0aec0 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -643,6 +643,11 @@ const MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION: u8 = 3; /// The Sliding Sync instance. #[derive(Clone, Debug)] pub struct SlidingSync { + inner: Arc, +} + +#[derive(Debug)] +pub(super) struct SlidingSyncInner { /// Customize the homeserver for sliding sync only homeserver: Option, @@ -653,55 +658,33 @@ pub struct SlidingSync { storage_key: Option, /// The `pos` marker. - pos: Arc>>>, + pos: StdRwLock>>, - delta_token: Arc>>>, + delta_token: StdRwLock>>, /// The lists of this Sliding Sync instance. - lists: Arc>>, + lists: StdRwLock>, /// The rooms details - rooms: Arc>>, + rooms: StdRwLock>, - subscriptions: Arc>>, - unsubscribe: Arc>>, + subscriptions: StdRwLock>, + unsubscribe: StdRwLock>, /// Number of times a Sliding Session session has been reset. - reset_counter: Arc, + reset_counter: AtomicU8, /// the intended state of the extensions being supplied to sliding /sync /// calls. May contain the latest next_batch for to_devices, etc. - extensions: Arc>>, -} - -#[derive(Serialize, Deserialize)] -struct FrozenSlidingSync { - #[serde(skip_serializing_if = "Option::is_none")] - to_device_since: Option, - #[serde(skip_serializing_if = "Option::is_none")] - delta_token: Option, -} - -impl From<&SlidingSync> for FrozenSlidingSync { - fn from(v: &SlidingSync) -> Self { - FrozenSlidingSync { - delta_token: v.delta_token.read().unwrap().clone(), - to_device_since: v - .extensions - .lock() - .unwrap() - .as_ref() - .and_then(|ext| ext.to_device.as_ref()?.since.clone()), - } - } + extensions: Mutex>, } impl SlidingSync { async fn cache_to_storage(&self) -> Result<(), crate::Error> { - let Some(storage_key) = self.storage_key.as_ref() else { return Ok(()) }; + let Some(storage_key) = self.inner.storage_key.as_ref() else { return Ok(()) }; trace!(storage_key, "Saving to storage for later use"); - let store = self.client.store(); + let store = self.inner.client.store(); // Write this `SlidingSync` instance, as a `FrozenSlidingSync` instance, inside // the client store. @@ -714,9 +697,10 @@ impl SlidingSync { // Write every `SlidingSyncList` inside the client the store. let frozen_lists = { - let rooms_lock = self.rooms.read().unwrap(); + let rooms_lock = self.inner.rooms.read().unwrap(); - self.lists + self.inner + .lists .read() .unwrap() .iter() @@ -747,16 +731,16 @@ impl SlidingSync { /// lists but without the current state. pub fn new_builder_copy(&self) -> SlidingSyncBuilder { let mut builder = Self::builder() - .client(self.client.clone()) - .subscriptions(self.subscriptions.read().unwrap().to_owned()); + .client(self.inner.client.clone()) + .subscriptions(self.inner.subscriptions.read().unwrap().to_owned()); - for list in self.lists.read().unwrap().values().map(|list| { + for list in self.inner.lists.read().unwrap().values().map(|list| { list.new_builder().build().expect("builder worked before, builder works now") }) { builder = builder.add_list(list); } - if let Some(homeserver) = &self.homeserver { + if let Some(homeserver) = &self.inner.homeserver { builder.homeserver(homeserver.clone()) } else { builder @@ -769,7 +753,7 @@ impl SlidingSync { /// poll the stream after you've altered this. If you do that during, it /// might take one round trip to take effect. pub fn subscribe(&self, room_id: OwnedRoomId, settings: Option) { - self.subscriptions.write().unwrap().insert(room_id, settings.unwrap_or_default()); + self.inner.subscriptions.write().unwrap().insert(room_id, settings.unwrap_or_default()); } /// Unsubscribe from a given room. @@ -778,14 +762,14 @@ impl SlidingSync { /// poll the stream after you've altered this. If you do that during, it /// might take one round trip to take effect. pub fn unsubscribe(&self, room_id: OwnedRoomId) { - if self.subscriptions.write().unwrap().remove(&room_id).is_some() { - self.unsubscribe.write().unwrap().push(room_id); + if self.inner.subscriptions.write().unwrap().remove(&room_id).is_some() { + self.inner.unsubscribe.write().unwrap().push(room_id); } } /// Add the common extensions if not already configured. pub fn add_common_extensions(&self) { - let mut lock = self.extensions.lock().unwrap(); + let mut lock = self.inner.extensions.lock().unwrap(); let mut cfg = lock.get_or_insert_with(Default::default); if cfg.to_device.is_none() { @@ -803,18 +787,19 @@ impl SlidingSync { /// Lookup a specific room pub fn get_room(&self, room_id: &RoomId) -> Option { - self.rooms.read().unwrap().get(room_id).cloned() + self.inner.rooms.read().unwrap().get(room_id).cloned() } /// Check the number of rooms. pub fn get_number_of_rooms(&self) -> usize { - self.rooms.read().unwrap().len() + self.inner.rooms.read().unwrap().len() } fn update_to_device_since(&self, since: String) { // FIXME: Find a better place where the to-device since token should be // persisted. - self.extensions + self.inner + .extensions .lock() .unwrap() .get_or_insert_with(Default::default) @@ -829,7 +814,7 @@ impl SlidingSync { /// listening to the stream and is therefor not necessarily up to date /// with the lists used for the stream. pub fn list(&self, list_name: &str) -> Option { - self.lists.read().unwrap().get(list_name).cloned() + self.inner.lists.read().unwrap().get(list_name).cloned() } /// Remove the SlidingSyncList named `list_name` from the lists list if @@ -839,7 +824,7 @@ impl SlidingSync { /// stream created after this. The old stream will still continue to use the /// previous set of lists. pub fn pop_list(&self, list_name: &String) -> Option { - self.lists.write().unwrap().remove(list_name) + self.inner.lists.write().unwrap().remove(list_name) } /// Add the list to the list of lists. @@ -852,7 +837,7 @@ impl SlidingSync { /// stream created after this. The old stream will still continue to use the /// previous set of lists. pub fn add_list(&self, list: SlidingSyncList) -> Option { - self.lists.write().unwrap().insert(list.name.clone(), list) + self.inner.lists.write().unwrap().insert(list.name.clone(), list) } /// Lookup a set of rooms @@ -860,14 +845,14 @@ impl SlidingSync { &self, room_ids: I, ) -> Vec> { - let rooms = self.rooms.read().unwrap(); + let rooms = self.inner.rooms.read().unwrap(); room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect() } /// Get all rooms. pub fn get_all_rooms(&self) -> Vec { - self.rooms.read().unwrap().values().cloned().collect() + self.inner.rooms.read().unwrap().values().cloned().collect() } fn prepare_extension_config(&self, pos: Option<&str>) -> ExtensionsConfig { @@ -875,7 +860,7 @@ impl SlidingSync { // The pos is `None`, it's either our initial sync or the proxy forgot about us // and sent us an `UnknownPos` error. We need to send out the config for our // extensions. - let mut extensions = self.extensions.lock().unwrap().clone().unwrap_or_default(); + let mut extensions = self.inner.extensions.lock().unwrap().clone().unwrap_or_default(); // Always enable to-device events and the e2ee-extension on the initial request, // no matter what the caller wants. @@ -898,6 +883,7 @@ impl SlidingSync { // We already enabled all the things, just fetch out the to-device since token // out of self.extensions and set it in a new, and empty, `ExtensionsConfig`. let since = self + .inner .extensions .lock() .unwrap() @@ -919,12 +905,15 @@ impl SlidingSync { mut sync_response: SyncResponse, list_generators: &mut BTreeMap, ) -> Result { - Observable::set(&mut self.pos.write().unwrap(), Some(sliding_sync_response.pos)); - Observable::set(&mut self.delta_token.write().unwrap(), sliding_sync_response.delta_token); + Observable::set(&mut self.inner.pos.write().unwrap(), Some(sliding_sync_response.pos)); + Observable::set( + &mut self.inner.delta_token.write().unwrap(), + sliding_sync_response.delta_token, + ); let update_summary = { let mut rooms = Vec::new(); - let mut rooms_map = self.rooms.write().unwrap(); + let mut rooms_map = self.inner.rooms.write().unwrap(); for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() { // `sync_response` contains the rooms with decrypted events if any, so look at @@ -948,7 +937,7 @@ impl SlidingSync { rooms_map.insert( room_id.clone(), SlidingSyncRoom::new( - self.client.clone(), + self.inner.client.clone(), room_id.clone(), room_data, timeline, @@ -1014,10 +1003,10 @@ impl SlidingSync { return Ok(None); } - let pos = self.pos.read().unwrap().clone(); - let delta_token = self.delta_token.read().unwrap().clone(); - let room_subscriptions = self.subscriptions.read().unwrap().clone(); - let unsubscribe_rooms = mem::take(&mut *self.unsubscribe.write().unwrap()); + let pos = self.inner.pos.read().unwrap().clone(); + let delta_token = self.inner.delta_token.read().unwrap().clone(); + let room_subscriptions = self.inner.subscriptions.read().unwrap().clone(); + let unsubscribe_rooms = mem::take(&mut *self.inner.unsubscribe.write().unwrap()); let timeout = Duration::from_secs(30); let extensions = self.prepare_extension_config(pos.as_deref()); @@ -1028,7 +1017,7 @@ impl SlidingSync { let request_config = RequestConfig::default().timeout(timeout + Duration::from_secs(30)); // Prepare the request. - let request = self.client.send_with_homeserver( + let request = self.inner.client.send_with_homeserver( assign!(v4::Request::new(), { pos, delta_token, @@ -1042,7 +1031,7 @@ impl SlidingSync { extensions, }), Some(request_config), - self.homeserver.as_ref().map(ToString::to_string), + self.inner.homeserver.as_ref().map(ToString::to_string), ); // Send the request and get a response with end-to-end encryption support. @@ -1057,7 +1046,8 @@ impl SlidingSync { #[cfg(feature = "e2e-encryption")] let response = { let (e2ee_uploads, response) = - futures_util::future::join(self.client.send_outgoing_requests(), request).await; + futures_util::future::join(self.inner.client.send_outgoing_requests(), request) + .await; if let Err(error) = e2ee_uploads { error!(?error, "Error while sending outgoing E2EE requests"); @@ -1103,7 +1093,7 @@ impl SlidingSync { // move to Sliding Sync, i.e. to `v4::Response`), but processing the // `sliding_sync_response` is vital, so it must be done somewhere; for now it // happens here. - let sync_response = self.client.process_sliding_sync(&response).await?; + let sync_response = self.inner.client.process_sliding_sync(&response).await?; debug!("Sliding sync response has been processed"); @@ -1120,12 +1110,12 @@ impl SlidingSync { /// /// This stream will send requests and will handle responses automatically, /// hence updating the lists. - #[instrument(name = "sync_stream", skip_all, parent = &self.client.root_span)] + #[instrument(name = "sync_stream", skip_all, parent = &self.inner.client.root_span)] pub fn stream(&self) -> impl Stream> + '_ { // Collect all the lists that need to be updated. let mut list_generators = { let mut list_generators = BTreeMap::new(); - let lock = self.lists.read().unwrap(); + let lock = self.inner.lists.read().unwrap(); for (name, lists) in lock.iter() { list_generators.insert(name.clone(), lists.request_generator()); @@ -1136,7 +1126,7 @@ impl SlidingSync { let stream_id = Uuid::new_v4().to_string(); - debug!(?self.extensions, stream_id, "About to run the sync stream"); + debug!(?self.inner.extensions, stream_id, "About to run the sync stream"); let instrument_span = Span::current(); @@ -1145,12 +1135,12 @@ impl SlidingSync { let sync_span = info_span!(parent: &instrument_span, "sync_once"); sync_span.in_scope(|| { - debug!(?self.extensions, "Sync stream loop is running"); + debug!(?self.inner.extensions, "Sync stream loop is running"); }); match self.sync_once(&stream_id, &mut list_generators).instrument(sync_span.clone()).await { Ok(Some(updates)) => { - self.reset_counter.store(0, Ordering::SeqCst); + self.inner.reset_counter.store(0, Ordering::SeqCst); yield Ok(updates); } @@ -1164,7 +1154,7 @@ impl SlidingSync { // The session has expired. // Has it expired too many times? - if self.reset_counter.fetch_add(1, Ordering::SeqCst) >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION { + if self.inner.reset_counter.fetch_add(1, Ordering::SeqCst) >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION { sync_span.in_scope(|| error!("Session expired {MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION} times in a row")); // The session has expired too many times, let's raise an error! @@ -1178,9 +1168,9 @@ impl SlidingSync { warn!("Session expired. Restarting Sliding Sync."); // To “restart” a Sliding Sync session, we set `pos` to its initial value. - Observable::set(&mut self.pos.write().unwrap(), None); + Observable::set(&mut self.inner.pos.write().unwrap(), None); - debug!(?self.extensions, "Sliding Sync has been reset"); + debug!(?self.inner.extensions, "Sliding Sync has been reset"); }); } @@ -1198,12 +1188,35 @@ impl SlidingSync { impl SlidingSync { /// Get a copy of the `pos` value. pub fn pos(&self) -> Option { - self.pos.read().unwrap().clone() + self.inner.pos.read().unwrap().clone() } /// Set a new value for `pos`. pub fn set_pos(&self, new_pos: String) { - Observable::set(&mut self.pos.write().unwrap(), Some(new_pos)); + Observable::set(&mut self.inner.pos.write().unwrap(), Some(new_pos)); + } +} + +#[derive(Serialize, Deserialize)] +struct FrozenSlidingSync { + #[serde(skip_serializing_if = "Option::is_none")] + to_device_since: Option, + #[serde(skip_serializing_if = "Option::is_none")] + delta_token: Option, +} + +impl From<&SlidingSync> for FrozenSlidingSync { + fn from(sliding_sync: &SlidingSync) -> Self { + FrozenSlidingSync { + delta_token: sliding_sync.inner.delta_token.read().unwrap().clone(), + to_device_since: sliding_sync + .inner + .extensions + .lock() + .unwrap() + .as_ref() + .and_then(|ext| ext.to_device.as_ref()?.since.clone()), + } } } From 64ae77ec70f30c8a1e26db2c6d336536520ccf80 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 8 Mar 2023 15:05:04 +0100 Subject: [PATCH 6/9] feat(sdk): Run SS response handling in a single non-cancellable block. --- crates/matrix-sdk/src/sliding_sync/builder.rs | 30 +++--- crates/matrix-sdk/src/sliding_sync/mod.rs | 98 ++++++++++++------- 2 files changed, 78 insertions(+), 50 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 6feaa3909..6f680399a 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -1,7 +1,7 @@ use std::{ collections::BTreeMap, fmt::Debug, - sync::{Arc, Mutex, RwLock as StdRwLock}, + sync::{Mutex, RwLock as StdRwLock}, }; use eyeball::Observable; @@ -289,23 +289,21 @@ impl SlidingSyncBuilder { let rooms = StdRwLock::new(rooms_found); let lists = StdRwLock::new(self.lists); - Ok(SlidingSync { - inner: Arc::new(SlidingSyncInner { - homeserver: self.homeserver, - client, - storage_key: self.storage_key, + Ok(SlidingSync::new(SlidingSyncInner { + homeserver: self.homeserver, + client, + storage_key: self.storage_key, - lists, - rooms, + lists, + rooms, - extensions: Mutex::new(self.extensions).into(), - reset_counter: Default::default(), + extensions: Mutex::new(self.extensions), + reset_counter: Default::default(), - pos: StdRwLock::new(Observable::new(None)), - delta_token: StdRwLock::new(Observable::new(delta_token_inner)), - subscriptions: StdRwLock::new(self.subscriptions), - unsubscribe: Default::default(), - }), - }) + pos: StdRwLock::new(Observable::new(None)), + delta_token: StdRwLock::new(Observable::new(delta_token_inner)), + subscriptions: StdRwLock::new(self.subscriptions), + unsubscribe: Default::default(), + })) } } diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 725a0aec0..567c52ac4 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -597,6 +597,7 @@ mod list; mod room; use std::{ + borrow::BorrowMut, collections::BTreeMap, fmt::Debug, mem, @@ -614,6 +615,7 @@ use eyeball::Observable; use futures_core::stream::Stream; pub use list::*; use matrix_sdk_base::sync::SyncResponse; +use matrix_sdk_common::locks::Mutex as AsyncMutex; pub use room::*; use ruma::{ api::client::{ @@ -625,6 +627,7 @@ use ruma::{ assign, OwnedRoomId, RoomId, }; use serde::{Deserialize, Serialize}; +use tokio::spawn; use tracing::{debug, error, info_span, instrument, trace, warn, Instrument, Span}; use url::Url; use uuid::Uuid; @@ -641,9 +644,15 @@ use crate::{config::RequestConfig, Client, Result}; const MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION: u8 = 3; /// The Sliding Sync instance. +/// +/// It is OK to clone this type as much as you need: cloning it is cheap. #[derive(Clone, Debug)] pub struct SlidingSync { + /// The Sliding Sync data. inner: Arc, + + /// A lock to ensure that responses are handled one at a time. + response_handling_lock: Arc>, } #[derive(Debug)] @@ -680,6 +689,10 @@ pub(super) struct SlidingSyncInner { } impl SlidingSync { + pub(super) fn new(inner: SlidingSyncInner) -> Self { + Self { inner: Arc::new(inner), response_handling_lock: Arc::new(AsyncMutex::new(())) } + } + async fn cache_to_storage(&self) -> Result<(), crate::Error> { let Some(storage_key) = self.inner.storage_key.as_ref() else { return Ok(()) }; trace!(storage_key, "Saving to storage for later use"); @@ -979,11 +992,13 @@ impl SlidingSync { async fn sync_once( &self, stream_id: &str, - list_generators: &mut BTreeMap, + list_generators: Arc>>, ) -> Result> { let mut lists = BTreeMap::new(); { + let mut list_generators_lock = list_generators.lock().unwrap(); + let list_generators = list_generators_lock.borrow_mut(); let mut lists_to_remove = Vec::new(); for (name, generator) in list_generators.iter_mut() { @@ -997,10 +1012,10 @@ impl SlidingSync { for list_name in lists_to_remove { list_generators.remove(&list_name); } - } - if list_generators.is_empty() { - return Ok(None); + if list_generators.is_empty() { + return Ok(None); + } } let pos = self.inner.pos.read().unwrap().clone(); @@ -1067,43 +1082,57 @@ impl SlidingSync { // corrupted/incomplete states for Sliding Sync and other parts of // the code. // - // That's why we are running the handling of the response in a blocking - // mode since it cannot be cancelled abruptly. - debug!("Sliding sync response received"); + // That's why we are running the handling of the response in a spawned + // future that cannot be cancelled by anything. + let this = self.clone(); + let stream_id = stream_id.to_owned(); - match &response.txn_id { - None => { - error!(stream_id, "Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; it's missing"); + // Spawn a new future to ensure that the code inside this future cannot be + // cancelled if this method is cancelled. + spawn(async move { + debug!("Sliding sync response received"); + + // In case the task running this future is detached, we must be + // ensure responses are handled one at a time, hence we lock the + // `response_handling_lock`. + let global_lock = this.response_handling_lock.lock().await; + + match &response.txn_id { + None => { + error!(stream_id, "Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; it's missing"); + } + + Some(txn_id) if txn_id != &stream_id => { + error!( + stream_id, + txn_id, + "Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; they differ" + ); + } + + _ => {} } - Some(txn_id) if txn_id != stream_id => { - error!( - stream_id, - txn_id, - "Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; they differ" - ); - } + // Handle and transform a Sliding Sync Response to a `SyncResponse`. + // + // We may not need the `sync_response` in the future (once `SyncResponse` will + // move to Sliding Sync, i.e. to `v4::Response`), but processing the + // `sliding_sync_response` is vital, so it must be done somewhere; for now it + // happens here. + let sync_response = this.inner.client.process_sliding_sync(&response).await?; - _ => {} - } + debug!("Sliding sync response has been processed"); - // Handle and transform a Sliding Sync Response to a `SyncResponse`. - // - // We may not need the `sync_response` in the future (once `SyncResponse` will - // move to Sliding Sync, i.e. to `v4::Response`), but processing the - // `sliding_sync_response` is vital, so it must be done somewhere; for now it - // happens here. - let sync_response = self.inner.client.process_sliding_sync(&response).await?; + let updates = this.handle_response(response, sync_response, list_generators.lock().unwrap().borrow_mut())?; - debug!("Sliding sync response has been processed"); + this.cache_to_storage().await?; - let updates = self.handle_response(response, sync_response, list_generators)?; + drop(global_lock); - self.cache_to_storage().await?; + debug!("Sliding sync response has been handled"); - debug!("Sliding sync response has been handled"); - - Ok(Some(updates)) + Ok(Some(updates)) + }).await.unwrap() } /// Create a _new_ Sliding Sync stream. @@ -1113,7 +1142,7 @@ impl SlidingSync { #[instrument(name = "sync_stream", skip_all, parent = &self.inner.client.root_span)] pub fn stream(&self) -> impl Stream> + '_ { // Collect all the lists that need to be updated. - let mut list_generators = { + let list_generators = { let mut list_generators = BTreeMap::new(); let lock = self.inner.lists.read().unwrap(); @@ -1129,6 +1158,7 @@ impl SlidingSync { debug!(?self.inner.extensions, stream_id, "About to run the sync stream"); let instrument_span = Span::current(); + let list_generators = Arc::new(Mutex::new(list_generators)); async_stream::stream! { loop { @@ -1138,7 +1168,7 @@ impl SlidingSync { debug!(?self.inner.extensions, "Sync stream loop is running"); }); - match self.sync_once(&stream_id, &mut list_generators).instrument(sync_span.clone()).await { + match self.sync_once(&stream_id, list_generators.clone()).instrument(sync_span.clone()).await { Ok(Some(updates)) => { self.inner.reset_counter.store(0, Ordering::SeqCst); From 2f637cda6f33536990cb074d73e438fa8310da87 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 8 Mar 2023 15:33:40 +0100 Subject: [PATCH 7/9] doc(sdk): Add note about fairness of the SS lock. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 567c52ac4..f502949e0 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -652,6 +652,8 @@ pub struct SlidingSync { inner: Arc, /// A lock to ensure that responses are handled one at a time. + /// [The lock][AsyncMutex] is fair, and this fairness property is important + /// to ensure responses are handled in the correct order. response_handling_lock: Arc>, } From 5dd404d2f1238ffddb6d17cc3e27e95b51171062 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 8 Mar 2023 16:09:05 +0100 Subject: [PATCH 8/9] doc(sdk): Precise in which order SS responses will be handled. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index f502949e0..ec1a5d0bd 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -653,7 +653,7 @@ pub struct SlidingSync { /// A lock to ensure that responses are handled one at a time. /// [The lock][AsyncMutex] is fair, and this fairness property is important - /// to ensure responses are handled in the correct order. + /// to ensure responses are handled by their arrival order. response_handling_lock: Arc>, } From ac863409bb27db8c4726778cb304508916b6b7b8 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 8 Mar 2023 16:20:50 +0100 Subject: [PATCH 9/9] doc(sdk): Remove notion of fairness. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index ec1a5d0bd..567c52ac4 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -652,8 +652,6 @@ pub struct SlidingSync { inner: Arc, /// A lock to ensure that responses are handled one at a time. - /// [The lock][AsyncMutex] is fair, and this fairness property is important - /// to ensure responses are handled by their arrival order. response_handling_lock: Arc>, }