diff --git a/crates/matrix-sdk/CHANGELOG.md b/crates/matrix-sdk/CHANGELOG.md index f1ee44af7..e45a67934 100644 --- a/crates/matrix-sdk/CHANGELOG.md +++ b/crates/matrix-sdk/CHANGELOG.md @@ -6,9 +6,13 @@ All notable changes to this project will be documented in this file. ## [Unreleased] - ReleaseDate -### Breaking changes: +### Features -- `OAuth::login` now allows requesting additional scopes for the authorization code grant. +- Add support to accept historic room key bundles that arrive out of order, i.e. + the bundle arrives after the invite has already been accepted. + ([#5322](https://github.com/matrix-org/matrix-rust-sdk/pull/5322)) + +- [**breaking**] `OAuth::login` now allows requesting additional scopes for the authorization code grant. ([#5395](https://github.com/matrix-org/matrix-rust-sdk/pull/5395)) ## [0.13.0] - 2025-07-10 diff --git a/crates/matrix-sdk/src/authentication/matrix/mod.rs b/crates/matrix-sdk/src/authentication/matrix/mod.rs index ded0160e4..4ef5115f5 100644 --- a/crates/matrix-sdk/src/authentication/matrix/mod.rs +++ b/crates/matrix-sdk/src/authentication/matrix/mod.rs @@ -802,7 +802,7 @@ impl MatrixAuth { _ => None, }; - self.client.encryption().spawn_initialization_task(auth_data); + self.client.encryption().spawn_initialization_task(auth_data).await; } Ok(()) diff --git a/crates/matrix-sdk/src/authentication/oauth/mod.rs b/crates/matrix-sdk/src/authentication/oauth/mod.rs index 4d6b6fb22..dc9663e0c 100644 --- a/crates/matrix-sdk/src/authentication/oauth/mod.rs +++ b/crates/matrix-sdk/src/authentication/oauth/mod.rs @@ -817,7 +817,7 @@ impl OAuth { } #[cfg(feature = "e2e-encryption")] - self.client.encryption().spawn_initialization_task(None); + self.client.encryption().spawn_initialization_task(None).await; Ok(()) } @@ -1031,7 +1031,7 @@ impl OAuth { self.enable_cross_process_lock().await.map_err(OAuthError::from)?; #[cfg(feature = "e2e-encryption")] - self.client.encryption().spawn_initialization_task(None); + self.client.encryption().spawn_initialization_task(None).await; } Ok(()) diff --git a/crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs b/crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs index 0ec6910f7..b4263ea24 100644 --- a/crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs +++ b/crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs @@ -253,7 +253,7 @@ impl<'a> IntoFuture for LoginWithQrCode<'a> { // ourselves see us as verified and the recovery/backup states will // be known. If we did receive all the secrets in the secrets // bundle, then backups will be enabled after this step as well. - self.client.encryption().spawn_initialization_task(None); + self.client.encryption().spawn_initialization_task(None).await; self.client.encryption().wait_for_e2ee_initialization_tasks().await; trace!("successfully logged in and enabled E2EE."); diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 82c12e7cf..cdf9a7917 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -414,7 +414,7 @@ impl ClientInner { let client = Arc::new(client); #[cfg(feature = "e2e-encryption")] - client.e2ee.initialize_room_key_tasks(&client); + client.e2ee.initialize_tasks(&client); let _ = client .event_cache diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index 68632b83d..367311542 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -62,6 +62,7 @@ use ruma::{ #[cfg(feature = "experimental-send-custom-to-device")] use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices}; use serde::Deserialize; +use tasks::BundleReceiverTask; use tokio::sync::{Mutex, RwLockReadGuard}; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tracing::{debug, error, instrument, trace, warn}; @@ -134,7 +135,7 @@ impl EncryptionData { } } - pub fn initialize_room_key_tasks(&self, client: &Arc) { + pub fn initialize_tasks(&self, client: &Arc) { let weak_client = WeakClient::from_inner(client); let mut tasks = self.tasks.lock(); @@ -1685,10 +1686,20 @@ impl Encryption { /// there is a proposal (MSC3967) to remove this requirement, which would /// allow for the initial upload of cross-signing keys without /// authentication, rendering this parameter obsolete. - pub(crate) fn spawn_initialization_task(&self, auth_data: Option) { + pub(crate) async fn spawn_initialization_task(&self, auth_data: Option) { + // It's fine to be async here as we're only getting the lock protecting the + // `OlmMachine`. Since the lock shouldn't be that contested right after logging + // in we won't delay the login or restoration of the Client. + let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite { + Some(BundleReceiverTask::new(&self.client).await) + } else { + None + }; + let mut tasks = self.client.inner.e2ee.tasks.lock(); let this = self.clone(); + tasks.setup_e2ee = Some(spawn(async move { // Update the current state first, so we don't have to wait for the result of // network requests @@ -1707,6 +1718,8 @@ impl Encryption { error!("Couldn't setup and resume recovery {e:?}"); } })); + + tasks.receive_historic_room_key_bundles = bundle_receiver_task; } /// Waits for end-to-end encryption initialization tasks to finish, if any diff --git a/crates/matrix-sdk/src/encryption/tasks.rs b/crates/matrix-sdk/src/encryption/tasks.rs index b12bdc5de..29230374d 100644 --- a/crates/matrix-sdk/src/encryption/tasks.rs +++ b/crates/matrix-sdk/src/encryption/tasks.rs @@ -14,23 +14,24 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use futures_core::Stream; +use futures_util::{pin_mut, StreamExt}; +use matrix_sdk_base::{crypto::store::types::RoomKeyBundleInfo, RoomState}; use matrix_sdk_common::failures_cache::FailuresCache; use ruma::{ events::room::encrypted::{EncryptedEventScheme, OriginalSyncRoomEncryptedEvent}, serde::Raw, OwnedEventId, OwnedRoomId, }; -use tokio::sync::{ - mpsc::{self, UnboundedReceiver}, - Mutex, -}; -use tracing::{debug, trace, warn}; +use tokio::sync::{mpsc, Mutex}; +use tracing::{debug, info, instrument, trace, warn}; use crate::{ client::WeakClient, encryption::backups::UploadState, executor::{spawn, JoinHandle}, - Client, + room::shared_room_history, + Client, Room, }; /// A cache of room keys we already downloaded. @@ -41,6 +42,7 @@ pub(crate) struct ClientTasks { pub(crate) upload_room_keys: Option, pub(crate) download_room_keys: Option, pub(crate) update_recovery_state_after_backup: Option>, + pub(crate) receive_historic_room_key_bundles: Option, pub(crate) setup_e2ee: Option>, } @@ -72,7 +74,7 @@ impl BackupUploadingTask { let _ = self.sender.send(()); } - pub(crate) async fn listen(client: WeakClient, mut receiver: UnboundedReceiver<()>) { + pub(crate) async fn listen(client: WeakClient, mut receiver: mpsc::UnboundedReceiver<()>) { while receiver.recv().await.is_some() { if let Some(client) = client.get() { let upload_progress = &client.inner.e2ee.backup_state.upload_progress; @@ -176,7 +178,10 @@ impl BackupDownloadTask { /// # Arguments /// /// * `receiver` - The source of incoming [`RoomKeyDownloadRequest`]s. - async fn listen(client: WeakClient, mut receiver: UnboundedReceiver) { + async fn listen( + client: WeakClient, + mut receiver: mpsc::UnboundedReceiver, + ) { let state = Arc::new(Mutex::new(BackupDownloadTaskListenerState::new(client))); while let Some(room_key_download_request) = receiver.recv().await { @@ -385,6 +390,68 @@ impl BackupDownloadTaskListenerState { } } +pub(crate) struct BundleReceiverTask { + _handle: JoinHandle<()>, +} + +impl BundleReceiverTask { + pub async fn new(client: &Client) -> Self { + let stream = client.encryption().historic_room_key_stream().await.expect("E2EE tasks should only be initialized once we have logged in and have access to an OlmMachine"); + let weak_client = WeakClient::from_client(client); + let handle = spawn(Self::listen_task(weak_client, stream)); + + Self { _handle: handle } + } + + async fn listen_task(client: WeakClient, stream: impl Stream) { + pin_mut!(stream); + + // TODO: Listening to this stream is not enough for iOS due to the NSE killing + // our OlmMachine and thus also this stream. We need to add an event handler + // that will listen for the bundle event. To be able to add an event handler, + // we'll have to implement the bundle event in Ruma. + while let Some(bundle_info) = stream.next().await { + let Some(client) = client.get() else { + // The client was dropped while we were waiting on the stream. Let's end the + // loop, since this means that the application has shut down. + break; + }; + + let Some(room) = client.get_room(&bundle_info.room_id) else { + warn!(room_id = %bundle_info.room_id, "Received a historic room key bundle for an unknown room"); + continue; + }; + + Self::handle_bundle(&room, &bundle_info).await; + } + } + + #[instrument(skip(room), fields(room_id = %room.room_id()))] + async fn handle_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) { + if Self::should_accept_bundle(room, bundle_info) { + info!("Accepting a late key bundle."); + + if let Err(e) = + shared_room_history::maybe_accept_key_bundle(room, &bundle_info.sender).await + { + warn!("Couldn't accept a late room key bundle {e:?}"); + } + } else { + info!("Refusing to accept a historic room key bundle."); + } + } + + fn should_accept_bundle(room: &Room, _bundle_info: &RoomKeyBundleInfo) -> bool { + // TODO: Check that the person that invited us to this room is the same as the + // sender, of the bundle. Otherwise don't ignore the bundle. + // TODO: Check that we joined the room "recently". (How do you do this if you + // accept the invite on another client? I guess we remember when the transition + // from Invited to Joined happened, but can't the server force us into a joined + // state if we do this? + room.state() == RoomState::Joined + } +} + #[cfg(all(test, not(target_family = "wasm")))] mod test { use matrix_sdk_test::async_test;