chore(sdk): Rename SlidingSync.failure_count to .reset_counter.

This patch cleans up the `SlidingSync::stream` method. It renames variables,
improves log messages etc.

This patch also creates a new `MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION`
constant. This value was previously hardcoded and lost in the code, now it's
easier to spot it for further updates.

This patch finally renames the `failure_count` field to `reset_counter`,
because it doesn't count the number of failure, but the number of
`ErrorKind::UnknownPos` exactly, i.e. the number of times we reset the
`SlidingSync` state.
This commit is contained in:
Ivan Enderlin
2023-02-27 12:09:14 +01:00
parent 4dbd1a7db4
commit 16c9845a47
2 changed files with 46 additions and 25 deletions

View File

@@ -298,7 +298,7 @@ impl SlidingSyncBuilder {
extensions: Mutex::new(self.extensions).into(),
sent_extensions: Mutex::new(None).into(),
failure_count: Default::default(),
reset_counter: Default::default(),
pos: Arc::new(StdRwLock::new(Observable::new(None))),
delta_token: Arc::new(StdRwLock::new(Observable::new(delta_token_inner))),

View File

@@ -755,6 +755,15 @@ pub struct UpdateSummary {
pub rooms: Vec<OwnedRoomId>,
}
/// Number of times a Sliding Sync session can expire before raising an error.
///
/// A Sliding Sync session can expire. In this case, it is reset. However, to
/// avoid entering an infinite loop of “it's expired, let's reset, it's expired,
/// let's reset…” (maybe if the network has an issue, or the server, or anything
/// else), we defined a maximum times a session can expire before
/// raising a proper error.
const MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION: u8 = 3;
/// The sliding sync instance
#[derive(Clone, Debug)]
pub struct SlidingSync {
@@ -780,8 +789,8 @@ pub struct SlidingSync {
subscriptions: Arc<StdRwLock<BTreeMap<OwnedRoomId, v4::RoomSubscription>>>,
unsubscribe: Arc<StdRwLock<Vec<OwnedRoomId>>>,
/// keeping track of retries and failure counts
failure_count: Arc<AtomicU8>,
/// Number of times a Sliding Session session has been reset.
reset_counter: Arc<AtomicU8>,
/// the intended state of the extensions being supplied to sliding /sync
/// calls. May contain the latest next_batch for to_devices, etc.
@@ -1174,66 +1183,78 @@ impl SlidingSync {
Ok(Some(updates))
}
/// Create the inner stream for the view.
/// Create a _new_ Sliding Sync stream.
///
/// Run this stream to receive new updates from the server.
/// This stream will send requests and will handle responses automatically,
/// hence updating the views.
#[instrument(name = "sync_stream", skip_all, parent = &self.client.root_span)]
pub fn stream(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
// Collect all the views that needsto be updated.
let mut views = {
let mut views = BTreeMap::new();
let views_lock = self.views.read().unwrap();
for (name, view) in views_lock.iter() {
let lock = self.views.read().unwrap();
for (name, view) in lock.iter() {
views.insert(name.clone(), view.request_generator());
}
views
};
debug!(?self.extensions, "Setting view stream going");
let stream_span = Span::current();
debug!(?self.extensions, "About to run the sync stream");
let instrument_span = Span::current();
async_stream::stream! {
loop {
let sync_span = info_span!(parent: &stream_span, "sync_once");
let sync_span = info_span!(parent: &instrument_span, "sync_once");
sync_span.in_scope(|| {
debug!(?self.extensions, "Sync loop running");
debug!(?self.extensions, "Sync stream loop is running");
});
match self.sync_once(&mut views).instrument(sync_span.clone()).await {
Ok(Some(updates)) => {
self.failure_count.store(0, Ordering::SeqCst);
self.reset_counter.store(0, Ordering::SeqCst);
yield Ok(updates)
yield Ok(updates);
}
Ok(None) => {
break;
}
Err(e) => {
if e.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
// session expired, let's reset
if self.failure_count.fetch_add(1, Ordering::SeqCst) >= 3 {
sync_span.in_scope(|| error!("session expired three times in a row"));
yield Err(e.into());
Err(error) => {
if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
// The session has expired.
break
// Has it expired too many times?
if self.reset_counter.fetch_add(1, Ordering::SeqCst) >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION {
sync_span.in_scope(|| error!("Session expired {MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION} times in a row"));
// The session has expired too many times, let's raise an error!
yield Err(error.into());
break;
}
// Let's reset the Sliding Sync session.
sync_span.in_scope(|| {
warn!("Session expired. Restarting sliding sync.");
warn!("Session expired. Restarting Sliding Sync.");
// To “restart” a Sliding Sync session, we set `pos` to its initial value.
Observable::set(&mut self.pos.write().unwrap(), None);
// reset our extensions to the last known good ones.
// We also need to reset our extensions to the last known good ones.
*self.extensions.lock().unwrap() = self.sent_extensions.lock().unwrap().take();
debug!(?self.extensions, "Resetting view stream");
debug!(?self.extensions, "Sliding Sync has been reset");
});
}
yield Err(e.into());
yield Err(error.into());
continue
continue;
}
}
}