mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-19 14:19:06 -04:00
feat(sdk): Ensure that the task sending E2EE requests cannot be detached.
This commit is contained in:
@@ -39,4 +39,8 @@ pub enum Error {
|
||||
/// The name of the Sliding Sync instance is too long.
|
||||
#[error("The Sliding Sync instance's identifier must be less than 16 chars long")]
|
||||
InvalidSlidingSyncIdentifier,
|
||||
|
||||
/// A task failed to execute to completion.
|
||||
#[error("A task failed to execute to completion; task description: {0}")]
|
||||
JoinError(String),
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ mod error;
|
||||
mod list;
|
||||
mod room;
|
||||
mod sticky_parameters;
|
||||
mod utils;
|
||||
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
@@ -52,6 +53,7 @@ use tokio::{
|
||||
};
|
||||
use tracing::{debug, error, instrument, warn, Instrument, Span};
|
||||
use url::Url;
|
||||
use utils::JoinHandleExt as _;
|
||||
|
||||
use self::{
|
||||
cache::restore_sliding_sync_state,
|
||||
@@ -490,18 +492,28 @@ impl SlidingSync {
|
||||
// Also, we don't want an interruption in the sliding sync request to abort
|
||||
// processing of the e2ee response.
|
||||
//
|
||||
// For those reasons, start the e2ee requests in a background task.
|
||||
// For those reasons, start the e2ee requests in a background task. If this is
|
||||
// aborted while everything is running, the same data might be sent later, which
|
||||
// is fine.
|
||||
|
||||
let client = self.inner.client.clone();
|
||||
let e2ee_uploads = spawn(async move { client.send_outgoing_requests().await });
|
||||
let e2ee_uploads = spawn(async move {
|
||||
if let Err(error) = client.send_outgoing_requests().await {
|
||||
error!(?error, "Error while sending outoging E2EE requests");
|
||||
}
|
||||
})
|
||||
// Ensure that the task is not running in detached mode. It is aborted when it's
|
||||
// dropped.
|
||||
.abort_on_drop();
|
||||
|
||||
// Wait on the sliding sync request success or failure early.
|
||||
let response = request.await?;
|
||||
|
||||
// Then only log the e2ee response, if needs be.
|
||||
if let Err(error) = e2ee_uploads.await {
|
||||
error!(?error, "Error while sending outgoing E2EE requests");
|
||||
}
|
||||
// At this point, if `request` has been resolved successfully, we wait on
|
||||
// `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
|
||||
// long. Otherwise —if `request` has failed— `e2ee_uploads` has
|
||||
// been dropped, so aborted.
|
||||
e2ee_uploads.await.map_err(|_| Error::JoinError("e2ee_uploads".to_string()))?;
|
||||
|
||||
response
|
||||
} else {
|
||||
|
||||
43
crates/matrix-sdk/src/sliding_sync/utils.rs
Normal file
43
crates/matrix-sdk/src/sliding_sync/utils.rs
Normal file
@@ -0,0 +1,43 @@
|
||||
//! Moaaar features for Sliding Sync.
|
||||
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use tokio::task::{JoinError, JoinHandle};
|
||||
|
||||
/// Private type to ensure a task is aborted on drop.
|
||||
pub(crate) struct AbortOnDrop<T>(JoinHandle<T>);
|
||||
|
||||
impl<T> AbortOnDrop<T> {
|
||||
pub fn new(join_handle: JoinHandle<T>) -> Self {
|
||||
Self(join_handle)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for AbortOnDrop<T> {
|
||||
fn drop(&mut self) {
|
||||
self.0.abort();
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Future for AbortOnDrop<T> {
|
||||
type Output = Result<T, JoinError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
Pin::new(&mut self.0).poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// Private trait to create a `AbortOnDrop` from a `JoinHandle`.
|
||||
pub(crate) trait JoinHandleExt<T> {
|
||||
fn abort_on_drop(self) -> AbortOnDrop<T>;
|
||||
}
|
||||
|
||||
impl<T> JoinHandleExt<T> for JoinHandle<T> {
|
||||
fn abort_on_drop(self) -> AbortOnDrop<T> {
|
||||
AbortOnDrop::new(self)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user