From 16c9845a4784eedf52ca58e3997c0deccfeebb43 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 27 Feb 2023 12:09:14 +0100 Subject: [PATCH] chore(sdk): Rename `SlidingSync.failure_count` to `.reset_counter`. This patch cleans up the `SlidingSync::stream` method. It renames variables, improves log messages etc. This patch also creates a new `MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION` constant. This value was previously hardcoded and lost in the code, now it's easier to spot it for further updates. This patch finally renames the `failure_count` field to `reset_counter`, because it doesn't count the number of failure, but the number of `ErrorKind::UnknownPos` exactly, i.e. the number of times we reset the `SlidingSync` state. --- crates/matrix-sdk/src/sliding_sync/builder.rs | 2 +- crates/matrix-sdk/src/sliding_sync/mod.rs | 69 ++++++++++++------- 2 files changed, 46 insertions(+), 25 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 39a5af3a0..d6cddc8c2 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -298,7 +298,7 @@ impl SlidingSyncBuilder { extensions: Mutex::new(self.extensions).into(), sent_extensions: Mutex::new(None).into(), - failure_count: Default::default(), + reset_counter: Default::default(), pos: Arc::new(StdRwLock::new(Observable::new(None))), delta_token: Arc::new(StdRwLock::new(Observable::new(delta_token_inner))), diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 4a927a92c..088948e14 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -755,6 +755,15 @@ pub struct UpdateSummary { pub rooms: Vec, } +/// Number of times a Sliding Sync session can expire before raising an error. +/// +/// A Sliding Sync session can expire. In this case, it is reset. However, to +/// avoid entering an infinite loop of “it's expired, let's reset, it's expired, +/// let's reset…” (maybe if the network has an issue, or the server, or anything +/// else), we defined a maximum times a session can expire before +/// raising a proper error. +const MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION: u8 = 3; + /// The sliding sync instance #[derive(Clone, Debug)] pub struct SlidingSync { @@ -780,8 +789,8 @@ pub struct SlidingSync { subscriptions: Arc>>, unsubscribe: Arc>>, - /// keeping track of retries and failure counts - failure_count: Arc, + /// Number of times a Sliding Session session has been reset. + reset_counter: Arc, /// the intended state of the extensions being supplied to sliding /sync /// calls. May contain the latest next_batch for to_devices, etc. @@ -1174,66 +1183,78 @@ impl SlidingSync { Ok(Some(updates)) } - /// Create the inner stream for the view. + /// Create a _new_ Sliding Sync stream. /// - /// Run this stream to receive new updates from the server. + /// This stream will send requests and will handle responses automatically, + /// hence updating the views. #[instrument(name = "sync_stream", skip_all, parent = &self.client.root_span)] pub fn stream(&self) -> impl Stream> + '_ { + // Collect all the views that needsto be updated. let mut views = { let mut views = BTreeMap::new(); - let views_lock = self.views.read().unwrap(); - for (name, view) in views_lock.iter() { + let lock = self.views.read().unwrap(); + + for (name, view) in lock.iter() { views.insert(name.clone(), view.request_generator()); } + views }; - debug!(?self.extensions, "Setting view stream going"); - let stream_span = Span::current(); + debug!(?self.extensions, "About to run the sync stream"); + + let instrument_span = Span::current(); async_stream::stream! { loop { - let sync_span = info_span!(parent: &stream_span, "sync_once"); + let sync_span = info_span!(parent: &instrument_span, "sync_once"); sync_span.in_scope(|| { - debug!(?self.extensions, "Sync loop running"); + debug!(?self.extensions, "Sync stream loop is running"); }); match self.sync_once(&mut views).instrument(sync_span.clone()).await { Ok(Some(updates)) => { - self.failure_count.store(0, Ordering::SeqCst); + self.reset_counter.store(0, Ordering::SeqCst); - yield Ok(updates) + yield Ok(updates); } Ok(None) => { break; } - Err(e) => { - if e.client_api_error_kind() == Some(&ErrorKind::UnknownPos) { - // session expired, let's reset - if self.failure_count.fetch_add(1, Ordering::SeqCst) >= 3 { - sync_span.in_scope(|| error!("session expired three times in a row")); - yield Err(e.into()); + Err(error) => { + if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) { + // The session has expired. - break + // Has it expired too many times? + if self.reset_counter.fetch_add(1, Ordering::SeqCst) >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION { + sync_span.in_scope(|| error!("Session expired {MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION} times in a row")); + + // The session has expired too many times, let's raise an error! + yield Err(error.into()); + + break; } + // Let's reset the Sliding Sync session. sync_span.in_scope(|| { - warn!("Session expired. Restarting sliding sync."); + warn!("Session expired. Restarting Sliding Sync."); + + // To “restart” a Sliding Sync session, we set `pos` to its initial value. Observable::set(&mut self.pos.write().unwrap(), None); - // reset our extensions to the last known good ones. + // We also need to reset our extensions to the last known good ones. *self.extensions.lock().unwrap() = self.sent_extensions.lock().unwrap().take(); - debug!(?self.extensions, "Resetting view stream"); + debug!(?self.extensions, "Sliding Sync has been reset"); }); } - yield Err(e.into()); + yield Err(error.into()); - continue + continue; } } }