fix(sdk): Few fixes and cleans up of SlidingSync itself

fix+chore(sdk): Few fixes and cleans up of `SlidingSync` itself
This commit is contained in:
Ivan Enderlin
2023-02-27 13:50:48 +01:00
committed by GitHub
2 changed files with 156 additions and 96 deletions

View File

@@ -298,7 +298,7 @@ impl SlidingSyncBuilder {
extensions: Mutex::new(self.extensions).into(),
sent_extensions: Mutex::new(None).into(),
failure_count: Default::default(),
reset_counter: Default::default(),
pos: Arc::new(StdRwLock::new(Observable::new(None))),
delta_token: Arc::new(StdRwLock::new(Observable::new(delta_token_inner))),

View File

@@ -755,6 +755,15 @@ pub struct UpdateSummary {
pub rooms: Vec<OwnedRoomId>,
}
/// Number of times a Sliding Sync session can expire before raising an error.
///
/// A Sliding Sync session can expire. In this case, it is reset. However, to
/// avoid entering an infinite loop of “it's expired, let's reset, it's expired,
/// let's reset…” (maybe if the network has an issue, or the server, or anything
/// else), we defined a maximum times a session can expire before
/// raising a proper error.
const MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION: u8 = 3;
/// The sliding sync instance
#[derive(Clone, Debug)]
pub struct SlidingSync {
@@ -780,8 +789,8 @@ pub struct SlidingSync {
subscriptions: Arc<StdRwLock<BTreeMap<OwnedRoomId, v4::RoomSubscription>>>,
unsubscribe: Arc<StdRwLock<Vec<OwnedRoomId>>>,
/// keeping track of retries and failure counts
failure_count: Arc<AtomicU8>,
/// Number of times a Sliding Session session has been reset.
reset_counter: Arc<AtomicU8>,
/// the intended state of the extensions being supplied to sliding /sync
/// calls. May contain the latest next_batch for to_devices, etc.
@@ -815,32 +824,44 @@ impl From<&SlidingSync> for FrozenSlidingSync {
}
impl SlidingSync {
async fn cache_to_storage(&self) -> Result<()> {
async fn cache_to_storage(&self) -> Result<(), crate::Error> {
let Some(storage_key) = self.storage_key.as_ref() else { return Ok(()) };
trace!(storage_key, "saving to storage for later use");
let v = serde_json::to_vec(&FrozenSlidingSync::from(self))?;
self.client.store().set_custom_value(storage_key.as_bytes(), v).await?;
let store = self.client.store();
// Write this `SlidingSync` instance, as a `FrozenSlidingSync` instance, inside
// the client store.
store
.set_custom_value(
storage_key.as_bytes(),
serde_json::to_vec(&FrozenSlidingSync::from(self))?,
)
.await?;
// Write every `SlidingSyncView` inside the client the store.
let frozen_views = {
let rooms_lock = self.rooms.read().unwrap();
self.views
.read()
.unwrap()
.iter()
.map(|(name, view)| {
(name.clone(), FrozenSlidingSyncView::freeze(view, &rooms_lock))
Ok((
format!("{storage_key}::{name}"),
serde_json::to_vec(&FrozenSlidingSyncView::freeze(view, &rooms_lock))?,
))
})
.collect::<Vec<_>>()
.collect::<Result<Vec<_>, crate::Error>>()?
};
for (name, frozen) in frozen_views {
trace!(storage_key, name, "saving to view for later use");
self.client
.store()
.set_custom_value(
format!("{storage_key}::{name}").as_bytes(),
serde_json::to_vec(&frozen)?,
)
.await?; // FIXME: parallelize?
for (storage_key, frozen_view) in frozen_views {
trace!(storage_key, "Saving the frozen Sliding Sync View");
store.set_custom_value(storage_key.as_bytes(), frozen_view).await?;
}
Ok(())
}
@@ -849,19 +870,16 @@ impl SlidingSync {
SlidingSyncBuilder::new()
}
/// Generate a new SlidingSyncBuilder with the same inner settings and views
/// but without the current state
/// Generate a new [`SlidingSyncBuilder`] with the same inner settings and
/// views but without the current state.
pub fn new_builder_copy(&self) -> SlidingSyncBuilder {
let mut builder = Self::builder()
.client(self.client.clone())
.subscriptions(self.subscriptions.read().unwrap().to_owned());
for view in self
.views
.read()
.unwrap()
.values()
.map(|v| v.new_builder().build().expect("builder worked before, builder works now"))
{
for view in self.views.read().unwrap().values().map(|view| {
view.new_builder().build().expect("builder worked before, builder works now")
}) {
builder = builder.add_view(view);
}
@@ -892,10 +910,11 @@ impl SlidingSync {
}
}
/// Add the common extensions if not already configured
/// Add the common extensions if not already configured.
pub fn add_common_extensions(&self) {
let mut lock = self.extensions.lock().unwrap();
let mut cfg = lock.get_or_insert_with(Default::default);
if cfg.to_device.is_none() {
cfg.to_device = Some(assign!(ToDeviceConfig::default(), { enabled: Some(true) }));
}
@@ -943,7 +962,7 @@ impl SlidingSync {
///
/// Note: Remember that this change will only be applicable for any new
/// stream created after this. The old stream will still continue to use the
/// previous set of views
/// previous set of views.
pub fn pop_view(&self, view_name: &String) -> Option<SlidingSyncView> {
self.views.write().unwrap().remove(view_name)
}
@@ -956,7 +975,7 @@ impl SlidingSync {
///
/// Note: Remember that this change will only be applicable for any new
/// stream created after this. The old stream will still continue to use the
/// previous set of views
/// previous set of views.
pub fn add_view(&self, view: SlidingSyncView) -> Option<SlidingSyncView> {
self.views.write().unwrap().insert(view.name.clone(), view)
}
@@ -967,6 +986,7 @@ impl SlidingSync {
room_ids: I,
) -> Vec<Option<SlidingSyncRoom>> {
let rooms = self.rooms.read().unwrap();
room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect()
}
@@ -1000,56 +1020,63 @@ impl SlidingSync {
let mut rooms = Vec::new();
let mut rooms_map = self.rooms.write().unwrap();
for (id, mut room_data) in sliding_sync_response.rooms.into_iter() {
for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() {
// `sync_response` contains the rooms with decrypted events if any, so look at
// the timeline events here first if the room exists.
// Otherwise, let's look at the timeline inside the `sliding_sync_response`.
let timeline = if let Some(joined_room) = sync_response.rooms.join.remove(&id) {
let timeline = if let Some(joined_room) = sync_response.rooms.join.remove(&room_id)
{
joined_room.timeline.events
} else {
room_data.timeline.drain(..).map(Into::into).collect()
};
if let Some(mut room) = rooms_map.remove(&id) {
if let Some(mut room) = rooms_map.remove(&room_id) {
// The room existed before, let's update it.
room.update(room_data, timeline);
rooms_map.insert(id.clone(), room);
rooms_map.insert(room_id.clone(), room);
} else {
// First time we need this room, let's create it.
rooms_map.insert(
id.clone(),
SlidingSyncRoom::new(self.client.clone(), id.clone(), room_data, timeline),
room_id.clone(),
SlidingSyncRoom::new(
self.client.clone(),
room_id.clone(),
room_data,
timeline,
),
);
}
rooms.push(id);
rooms.push(room_id);
}
let mut updated_views = Vec::new();
for (name, updates) in sliding_sync_response.lists {
let Some(generator) = views.get_mut(&name) else {
error!("Response for view {name} - unknown to us. skipping");
error!("Response for view `{name}` - unknown to us; skipping");
continue
};
let count: u32 =
updates.count.try_into().expect("the list total count convertible into u32");
if generator.handle_response(count, &updates.ops, &rooms)? {
updated_views.push(name.clone());
}
}
// Update the `to-device` next-batch if found.
if let Some(to_device_since) =
sliding_sync_response.extensions.to_device.map(|t| t.next_batch)
{
self.update_to_device_since(to_device_since)
// Update the `to-device` next-batch if any.
if let Some(to_device) = sliding_sync_response.extensions.to_device {
self.update_to_device_since(to_device.next_batch);
}
// track the most recently successfully sent extensions (needed for sticky
// semantics)
// Track the most recently successfully sent extensions (needed for sticky
// semantics).
if extensions.is_some() {
*self.sent_extensions.lock().unwrap() = extensions;
}
@@ -1066,19 +1093,22 @@ impl SlidingSync {
&self,
views: &mut BTreeMap<String, SlidingSyncViewRequestGenerator>,
) -> Result<Option<UpdateSummary>> {
let mut requests = BTreeMap::new();
let mut to_remove = Vec::new();
let mut lists_of_requests = BTreeMap::new();
for (name, generator) in views.iter_mut() {
if let Some(request) = generator.next() {
requests.insert(name.clone(), request);
} else {
to_remove.push(name.clone());
{
let mut views_to_remove = Vec::new();
for (name, generator) in views.iter_mut() {
if let Some(request) = generator.next() {
lists_of_requests.insert(name.clone(), request);
} else {
views_to_remove.push(name.clone());
}
}
}
for n in to_remove {
views.remove(&n);
for view_name in views_to_remove {
views.remove(&view_name);
}
}
if views.is_empty() {
@@ -1091,10 +1121,11 @@ impl SlidingSync {
let unsubscribe_rooms = mem::take(&mut *self.unsubscribe.write().unwrap());
let timeout = Duration::from_secs(30);
// implement stickiness by only sending extensions if they have
// Implement stickiness by only sending extensions if they have
// changed since the last time we sent them
let extensions = {
let extensions = self.extensions.lock().unwrap();
if *extensions == *self.sent_extensions.lock().unwrap() {
None
} else {
@@ -1102,104 +1133,133 @@ impl SlidingSync {
}
};
let request = assign!(v4::Request::new(), {
lists: requests,
pos,
delta_token,
timeout: Some(timeout),
room_subscriptions,
unsubscribe_rooms,
extensions: extensions.clone().unwrap_or_default(),
});
debug!("requesting");
debug!("Sending the sliding sync request");
// 30s for the long poll + 30s for network delays
// Configure long-polling. We need 30 seconds for the long-poll itself, in
// addition to 30 more extra seconds for the network delays.
let request_config = RequestConfig::default().timeout(timeout + Duration::from_secs(30));
// Prepare the request.
let request = self.client.send_with_homeserver(
request,
assign!(v4::Request::new(), {
lists: lists_of_requests,
pos,
delta_token,
timeout: Some(timeout),
room_subscriptions,
unsubscribe_rooms,
extensions: extensions.clone().unwrap_or_default(),
}),
Some(request_config),
self.homeserver.as_ref().map(ToString::to_string),
);
// Send the request and get a response with end-to-end encryption support.
//
// Sending the `/sync` request out when end-to-end encryption is enabled means
// that we need to also send out any outgoing e2ee related request out
// coming from the `OlmMachine::outgoing_requests()` method.
//
// FIXME: Processing outgiong requests at the same time while a `/sync` is in
// flight is currently not supported.
// More info: [#1386](https://github.com/matrix-org/matrix-rust-sdk/issues/1386).
#[cfg(feature = "e2e-encryption")]
let response = {
let (e2ee_uploads, resp) =
futures_util::join!(self.client.send_outgoing_requests(), request);
if let Err(e) = e2ee_uploads {
error!(error = ?e, "Error while sending outgoing E2EE requests");
let (e2ee_uploads, response) =
futures_util::future::join(self.client.send_outgoing_requests(), request).await;
if let Err(error) = e2ee_uploads {
error!(?error, "Error while sending outgoing E2EE requests");
}
resp
response
}?;
// Send the request and get a response _without_ end-to-end encryption support.
#[cfg(not(feature = "e2e-encryption"))]
let response = request.await?;
debug!("received");
debug!("Sliding sync response received");
let updates = self.handle_response(response, extensions, views).await?;
debug!("handled");
debug!("Sliding sync response has been handled");
Ok(Some(updates))
}
/// Create the inner stream for the view.
/// Create a _new_ Sliding Sync stream.
///
/// Run this stream to receive new updates from the server.
/// This stream will send requests and will handle responses automatically,
/// hence updating the views.
#[instrument(name = "sync_stream", skip_all, parent = &self.client.root_span)]
pub fn stream(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
// Collect all the views that needsto be updated.
let mut views = {
let mut views = BTreeMap::new();
let views_lock = self.views.read().unwrap();
for (name, view) in views_lock.iter() {
let lock = self.views.read().unwrap();
for (name, view) in lock.iter() {
views.insert(name.clone(), view.request_generator());
}
views
};
debug!(?self.extensions, "Setting view stream going");
let stream_span = Span::current();
debug!(?self.extensions, "About to run the sync stream");
let instrument_span = Span::current();
async_stream::stream! {
loop {
let sync_span = info_span!(parent: &stream_span, "sync_once");
let sync_span = info_span!(parent: &instrument_span, "sync_once");
sync_span.in_scope(|| {
debug!(?self.extensions, "Sync loop running");
debug!(?self.extensions, "Sync stream loop is running");
});
match self.sync_once(&mut views).instrument(sync_span.clone()).await {
Ok(Some(updates)) => {
self.failure_count.store(0, Ordering::SeqCst);
self.reset_counter.store(0, Ordering::SeqCst);
yield Ok(updates)
yield Ok(updates);
}
Ok(None) => {
break;
}
Err(e) => {
if e.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
// session expired, let's reset
if self.failure_count.fetch_add(1, Ordering::SeqCst) >= 3 {
sync_span.in_scope(|| error!("session expired three times in a row"));
yield Err(e.into());
Err(error) => {
if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
// The session has expired.
break
// Has it expired too many times?
if self.reset_counter.fetch_add(1, Ordering::SeqCst) >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION {
sync_span.in_scope(|| error!("Session expired {MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION} times in a row"));
// The session has expired too many times, let's raise an error!
yield Err(error.into());
break;
}
// Let's reset the Sliding Sync session.
sync_span.in_scope(|| {
warn!("Session expired. Restarting sliding sync.");
warn!("Session expired. Restarting Sliding Sync.");
// To “restart” a Sliding Sync session, we set `pos` to its initial value.
Observable::set(&mut self.pos.write().unwrap(), None);
// reset our extensions to the last known good ones.
// We also need to reset our extensions to the last known good ones.
*self.extensions.lock().unwrap() = self.sent_extensions.lock().unwrap().take();
debug!(?self.extensions, "Resetting view stream");
debug!(?self.extensions, "Sliding Sync has been reset");
});
}
yield Err(e.into());
yield Err(error.into());
continue
continue;
}
}
}