From 278c6059ae7fcd511e72a1d87f99b51bb8048333 Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Thu, 27 Oct 2022 15:59:02 +0100 Subject: [PATCH] perf(sliding-sync): run outgoing e2ee requests and sync in parallel. And continue the loop if even we hit an error in between --- crates/matrix-sdk/src/sliding_sync.rs | 42 ++++++++++++++++++++------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync.rs b/crates/matrix-sdk/src/sliding_sync.rs index b7666fdec..066d677f9 100644 --- a/crates/matrix-sdk/src/sliding_sync.rs +++ b/crates/matrix-sdk/src/sliding_sync.rs @@ -514,18 +514,13 @@ impl SlidingSync { let extensions = self.extensions.clone(); let client = self.client.clone(); - Ok(async_stream::try_stream! { + Ok(async_stream::stream! { let mut remaining_views = views.clone(); let mut remaining_generators: Vec> = views .iter() .map(SlidingSyncView::request_generator) .collect(); loop { - #[cfg(feature = "e2e-encryption")] - if let Err(e) = client.send_outgoing_requests().await { - tracing::error!(error = ?e, "Error while sending outgoing E2EE requests"); - } - let mut requests = Vec::new(); let mut new_remaining_generators = Vec::new(); let mut new_remaining_views = Vec::new(); @@ -562,14 +557,39 @@ impl SlidingSync { extensions: extensions.lock_mut().take().unwrap_or_default(), // extensions are sticky, we pop them here once }); tracing::debug!("requesting"); - let resp = client - .send_with_homeserver(req, None, self.homeserver.as_ref().map(ToString::to_string)) - .await?; + + let req = client.send_with_homeserver(req, None, self.homeserver.as_ref().map(ToString::to_string)); + + #[cfg(feature = "e2e-encryption")] + let resp_res = { + let (e2ee_uploads, resp) = futures_util::join!(client.send_outgoing_requests(), req); + if let Err(e) = e2ee_uploads { + tracing::error!(error = ?e, "Error while sending outgoing E2EE requests"); + }; + resp + }; + #[cfg(not(feature = "e2e-encryption"))] + let resp_res = req.await; + + let resp = match resp_res { + Ok(r) => r, + Err(e) => { + yield Err(e.into()); + continue + } + }; + tracing::debug!("received"); - let updates = self.handle_response(resp, &remaining_views).await?; + let updates = match self.handle_response(resp, &remaining_views).await { + Ok(r) => r, + Err(e) => { + yield Err(e.into()); + continue + } + }; tracing::debug!("handled"); - yield updates; + yield Ok(updates); } }) }