diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 39a5af3a0..d6cddc8c2 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -298,7 +298,7 @@ impl SlidingSyncBuilder { extensions: Mutex::new(self.extensions).into(), sent_extensions: Mutex::new(None).into(), - failure_count: Default::default(), + reset_counter: Default::default(), pos: Arc::new(StdRwLock::new(Observable::new(None))), delta_token: Arc::new(StdRwLock::new(Observable::new(delta_token_inner))), diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 5f85b72f5..f108a76a1 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -755,6 +755,15 @@ pub struct UpdateSummary { pub rooms: Vec, } +/// Number of times a Sliding Sync session can expire before raising an error. +/// +/// A Sliding Sync session can expire. In this case, it is reset. However, to +/// avoid entering an infinite loop of “it's expired, let's reset, it's expired, +/// let's reset…” (maybe if the network has an issue, or the server, or anything +/// else), we defined a maximum times a session can expire before +/// raising a proper error. +const MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION: u8 = 3; + /// The sliding sync instance #[derive(Clone, Debug)] pub struct SlidingSync { @@ -780,8 +789,8 @@ pub struct SlidingSync { subscriptions: Arc>>, unsubscribe: Arc>>, - /// keeping track of retries and failure counts - failure_count: Arc, + /// Number of times a Sliding Session session has been reset. + reset_counter: Arc, /// the intended state of the extensions being supplied to sliding /sync /// calls. May contain the latest next_batch for to_devices, etc. @@ -815,32 +824,44 @@ impl From<&SlidingSync> for FrozenSlidingSync { } impl SlidingSync { - async fn cache_to_storage(&self) -> Result<()> { + async fn cache_to_storage(&self) -> Result<(), crate::Error> { let Some(storage_key) = self.storage_key.as_ref() else { return Ok(()) }; trace!(storage_key, "saving to storage for later use"); - let v = serde_json::to_vec(&FrozenSlidingSync::from(self))?; - self.client.store().set_custom_value(storage_key.as_bytes(), v).await?; + + let store = self.client.store(); + + // Write this `SlidingSync` instance, as a `FrozenSlidingSync` instance, inside + // the client store. + store + .set_custom_value( + storage_key.as_bytes(), + serde_json::to_vec(&FrozenSlidingSync::from(self))?, + ) + .await?; + + // Write every `SlidingSyncView` inside the client the store. let frozen_views = { let rooms_lock = self.rooms.read().unwrap(); + self.views .read() .unwrap() .iter() .map(|(name, view)| { - (name.clone(), FrozenSlidingSyncView::freeze(view, &rooms_lock)) + Ok(( + format!("{storage_key}::{name}"), + serde_json::to_vec(&FrozenSlidingSyncView::freeze(view, &rooms_lock))?, + )) }) - .collect::>() + .collect::, crate::Error>>()? }; - for (name, frozen) in frozen_views { - trace!(storage_key, name, "saving to view for later use"); - self.client - .store() - .set_custom_value( - format!("{storage_key}::{name}").as_bytes(), - serde_json::to_vec(&frozen)?, - ) - .await?; // FIXME: parallelize? + + for (storage_key, frozen_view) in frozen_views { + trace!(storage_key, "Saving the frozen Sliding Sync View"); + + store.set_custom_value(storage_key.as_bytes(), frozen_view).await?; } + Ok(()) } @@ -849,19 +870,16 @@ impl SlidingSync { SlidingSyncBuilder::new() } - /// Generate a new SlidingSyncBuilder with the same inner settings and views - /// but without the current state + /// Generate a new [`SlidingSyncBuilder`] with the same inner settings and + /// views but without the current state. pub fn new_builder_copy(&self) -> SlidingSyncBuilder { let mut builder = Self::builder() .client(self.client.clone()) .subscriptions(self.subscriptions.read().unwrap().to_owned()); - for view in self - .views - .read() - .unwrap() - .values() - .map(|v| v.new_builder().build().expect("builder worked before, builder works now")) - { + + for view in self.views.read().unwrap().values().map(|view| { + view.new_builder().build().expect("builder worked before, builder works now") + }) { builder = builder.add_view(view); } @@ -892,10 +910,11 @@ impl SlidingSync { } } - /// Add the common extensions if not already configured + /// Add the common extensions if not already configured. pub fn add_common_extensions(&self) { let mut lock = self.extensions.lock().unwrap(); let mut cfg = lock.get_or_insert_with(Default::default); + if cfg.to_device.is_none() { cfg.to_device = Some(assign!(ToDeviceConfig::default(), { enabled: Some(true) })); } @@ -943,7 +962,7 @@ impl SlidingSync { /// /// Note: Remember that this change will only be applicable for any new /// stream created after this. The old stream will still continue to use the - /// previous set of views + /// previous set of views. pub fn pop_view(&self, view_name: &String) -> Option { self.views.write().unwrap().remove(view_name) } @@ -956,7 +975,7 @@ impl SlidingSync { /// /// Note: Remember that this change will only be applicable for any new /// stream created after this. The old stream will still continue to use the - /// previous set of views + /// previous set of views. pub fn add_view(&self, view: SlidingSyncView) -> Option { self.views.write().unwrap().insert(view.name.clone(), view) } @@ -967,6 +986,7 @@ impl SlidingSync { room_ids: I, ) -> Vec> { let rooms = self.rooms.read().unwrap(); + room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect() } @@ -1000,56 +1020,63 @@ impl SlidingSync { let mut rooms = Vec::new(); let mut rooms_map = self.rooms.write().unwrap(); - for (id, mut room_data) in sliding_sync_response.rooms.into_iter() { + for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() { // `sync_response` contains the rooms with decrypted events if any, so look at // the timeline events here first if the room exists. // Otherwise, let's look at the timeline inside the `sliding_sync_response`. - let timeline = if let Some(joined_room) = sync_response.rooms.join.remove(&id) { + let timeline = if let Some(joined_room) = sync_response.rooms.join.remove(&room_id) + { joined_room.timeline.events } else { room_data.timeline.drain(..).map(Into::into).collect() }; - if let Some(mut room) = rooms_map.remove(&id) { + if let Some(mut room) = rooms_map.remove(&room_id) { // The room existed before, let's update it. room.update(room_data, timeline); - rooms_map.insert(id.clone(), room); + rooms_map.insert(room_id.clone(), room); } else { // First time we need this room, let's create it. rooms_map.insert( - id.clone(), - SlidingSyncRoom::new(self.client.clone(), id.clone(), room_data, timeline), + room_id.clone(), + SlidingSyncRoom::new( + self.client.clone(), + room_id.clone(), + room_data, + timeline, + ), ); } - rooms.push(id); + rooms.push(room_id); } let mut updated_views = Vec::new(); for (name, updates) in sliding_sync_response.lists { let Some(generator) = views.get_mut(&name) else { - error!("Response for view {name} - unknown to us. skipping"); + error!("Response for view `{name}` - unknown to us; skipping"); + continue }; + let count: u32 = updates.count.try_into().expect("the list total count convertible into u32"); + if generator.handle_response(count, &updates.ops, &rooms)? { updated_views.push(name.clone()); } } - // Update the `to-device` next-batch if found. - if let Some(to_device_since) = - sliding_sync_response.extensions.to_device.map(|t| t.next_batch) - { - self.update_to_device_since(to_device_since) + // Update the `to-device` next-batch if any. + if let Some(to_device) = sliding_sync_response.extensions.to_device { + self.update_to_device_since(to_device.next_batch); } - // track the most recently successfully sent extensions (needed for sticky - // semantics) + // Track the most recently successfully sent extensions (needed for sticky + // semantics). if extensions.is_some() { *self.sent_extensions.lock().unwrap() = extensions; } @@ -1066,19 +1093,22 @@ impl SlidingSync { &self, views: &mut BTreeMap, ) -> Result> { - let mut requests = BTreeMap::new(); - let mut to_remove = Vec::new(); + let mut lists_of_requests = BTreeMap::new(); - for (name, generator) in views.iter_mut() { - if let Some(request) = generator.next() { - requests.insert(name.clone(), request); - } else { - to_remove.push(name.clone()); + { + let mut views_to_remove = Vec::new(); + + for (name, generator) in views.iter_mut() { + if let Some(request) = generator.next() { + lists_of_requests.insert(name.clone(), request); + } else { + views_to_remove.push(name.clone()); + } } - } - for n in to_remove { - views.remove(&n); + for view_name in views_to_remove { + views.remove(&view_name); + } } if views.is_empty() { @@ -1091,10 +1121,11 @@ impl SlidingSync { let unsubscribe_rooms = mem::take(&mut *self.unsubscribe.write().unwrap()); let timeout = Duration::from_secs(30); - // implement stickiness by only sending extensions if they have + // Implement stickiness by only sending extensions if they have // changed since the last time we sent them let extensions = { let extensions = self.extensions.lock().unwrap(); + if *extensions == *self.sent_extensions.lock().unwrap() { None } else { @@ -1102,104 +1133,133 @@ impl SlidingSync { } }; - let request = assign!(v4::Request::new(), { - lists: requests, - pos, - delta_token, - timeout: Some(timeout), - room_subscriptions, - unsubscribe_rooms, - extensions: extensions.clone().unwrap_or_default(), - }); - debug!("requesting"); + debug!("Sending the sliding sync request"); - // 30s for the long poll + 30s for network delays + // Configure long-polling. We need 30 seconds for the long-poll itself, in + // addition to 30 more extra seconds for the network delays. let request_config = RequestConfig::default().timeout(timeout + Duration::from_secs(30)); + + // Prepare the request. let request = self.client.send_with_homeserver( - request, + assign!(v4::Request::new(), { + lists: lists_of_requests, + pos, + delta_token, + timeout: Some(timeout), + room_subscriptions, + unsubscribe_rooms, + extensions: extensions.clone().unwrap_or_default(), + }), Some(request_config), self.homeserver.as_ref().map(ToString::to_string), ); + // Send the request and get a response with end-to-end encryption support. + // + // Sending the `/sync` request out when end-to-end encryption is enabled means + // that we need to also send out any outgoing e2ee related request out + // coming from the `OlmMachine::outgoing_requests()` method. + // + // FIXME: Processing outgiong requests at the same time while a `/sync` is in + // flight is currently not supported. + // More info: [#1386](https://github.com/matrix-org/matrix-rust-sdk/issues/1386). #[cfg(feature = "e2e-encryption")] let response = { - let (e2ee_uploads, resp) = - futures_util::join!(self.client.send_outgoing_requests(), request); - if let Err(e) = e2ee_uploads { - error!(error = ?e, "Error while sending outgoing E2EE requests"); + let (e2ee_uploads, response) = + futures_util::future::join(self.client.send_outgoing_requests(), request).await; + + if let Err(error) = e2ee_uploads { + error!(?error, "Error while sending outgoing E2EE requests"); } - resp + + response }?; + + // Send the request and get a response _without_ end-to-end encryption support. #[cfg(not(feature = "e2e-encryption"))] let response = request.await?; - debug!("received"); + + debug!("Sliding sync response received"); let updates = self.handle_response(response, extensions, views).await?; - debug!("handled"); + + debug!("Sliding sync response has been handled"); Ok(Some(updates)) } - /// Create the inner stream for the view. + /// Create a _new_ Sliding Sync stream. /// - /// Run this stream to receive new updates from the server. + /// This stream will send requests and will handle responses automatically, + /// hence updating the views. #[instrument(name = "sync_stream", skip_all, parent = &self.client.root_span)] pub fn stream(&self) -> impl Stream> + '_ { + // Collect all the views that needsto be updated. let mut views = { let mut views = BTreeMap::new(); - let views_lock = self.views.read().unwrap(); - for (name, view) in views_lock.iter() { + let lock = self.views.read().unwrap(); + + for (name, view) in lock.iter() { views.insert(name.clone(), view.request_generator()); } + views }; - debug!(?self.extensions, "Setting view stream going"); - let stream_span = Span::current(); + debug!(?self.extensions, "About to run the sync stream"); + + let instrument_span = Span::current(); async_stream::stream! { loop { - let sync_span = info_span!(parent: &stream_span, "sync_once"); + let sync_span = info_span!(parent: &instrument_span, "sync_once"); sync_span.in_scope(|| { - debug!(?self.extensions, "Sync loop running"); + debug!(?self.extensions, "Sync stream loop is running"); }); match self.sync_once(&mut views).instrument(sync_span.clone()).await { Ok(Some(updates)) => { - self.failure_count.store(0, Ordering::SeqCst); + self.reset_counter.store(0, Ordering::SeqCst); - yield Ok(updates) + yield Ok(updates); } Ok(None) => { break; } - Err(e) => { - if e.client_api_error_kind() == Some(&ErrorKind::UnknownPos) { - // session expired, let's reset - if self.failure_count.fetch_add(1, Ordering::SeqCst) >= 3 { - sync_span.in_scope(|| error!("session expired three times in a row")); - yield Err(e.into()); + Err(error) => { + if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) { + // The session has expired. - break + // Has it expired too many times? + if self.reset_counter.fetch_add(1, Ordering::SeqCst) >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION { + sync_span.in_scope(|| error!("Session expired {MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION} times in a row")); + + // The session has expired too many times, let's raise an error! + yield Err(error.into()); + + break; } + // Let's reset the Sliding Sync session. sync_span.in_scope(|| { - warn!("Session expired. Restarting sliding sync."); + warn!("Session expired. Restarting Sliding Sync."); + + // To “restart” a Sliding Sync session, we set `pos` to its initial value. Observable::set(&mut self.pos.write().unwrap(), None); - // reset our extensions to the last known good ones. + // We also need to reset our extensions to the last known good ones. *self.extensions.lock().unwrap() = self.sent_extensions.lock().unwrap().take(); - debug!(?self.extensions, "Resetting view stream"); + debug!(?self.extensions, "Sliding Sync has been reset"); }); } - yield Err(e.into()); + yield Err(error.into()); - continue + continue; } } }