perf(sliding-sync): run outgoing e2ee requests and sync in parallel.

And continue the loop if even we hit an error in between
This commit is contained in:
Benjamin Kampmann
2022-10-27 15:59:02 +01:00
parent d164165c7c
commit 278c6059ae

View File

@@ -514,18 +514,13 @@ impl SlidingSync {
let extensions = self.extensions.clone();
let client = self.client.clone();
Ok(async_stream::try_stream! {
Ok(async_stream::stream! {
let mut remaining_views = views.clone();
let mut remaining_generators: Vec<SlidingSyncViewRequestGenerator<'_>> = views
.iter()
.map(SlidingSyncView::request_generator)
.collect();
loop {
#[cfg(feature = "e2e-encryption")]
if let Err(e) = client.send_outgoing_requests().await {
tracing::error!(error = ?e, "Error while sending outgoing E2EE requests");
}
let mut requests = Vec::new();
let mut new_remaining_generators = Vec::new();
let mut new_remaining_views = Vec::new();
@@ -562,14 +557,39 @@ impl SlidingSync {
extensions: extensions.lock_mut().take().unwrap_or_default(), // extensions are sticky, we pop them here once
});
tracing::debug!("requesting");
let resp = client
.send_with_homeserver(req, None, self.homeserver.as_ref().map(ToString::to_string))
.await?;
let req = client.send_with_homeserver(req, None, self.homeserver.as_ref().map(ToString::to_string));
#[cfg(feature = "e2e-encryption")]
let resp_res = {
let (e2ee_uploads, resp) = futures_util::join!(client.send_outgoing_requests(), req);
if let Err(e) = e2ee_uploads {
tracing::error!(error = ?e, "Error while sending outgoing E2EE requests");
};
resp
};
#[cfg(not(feature = "e2e-encryption"))]
let resp_res = req.await;
let resp = match resp_res {
Ok(r) => r,
Err(e) => {
yield Err(e.into());
continue
}
};
tracing::debug!("received");
let updates = self.handle_response(resp, &remaining_views).await?;
let updates = match self.handle_response(resp, &remaining_views).await {
Ok(r) => r,
Err(e) => {
yield Err(e.into());
continue
}
};
tracing::debug!("handled");
yield updates;
yield Ok(updates);
}
})
}