From 2bbf6fc711d047a57749e95c5b4f8a07da55e8de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 13 Jun 2025 12:53:49 +0200 Subject: [PATCH 1/8] feat(sdk): Add a tasks that listens for historic room keys if they arrive out of order Historic room key bundles are uploaded as an encrypted file to the media repo and the key to decrypt the file is sent as a to-device message to the recipient device. In the nominal case, the invite and this to-device message should arrive at the same time and accepting the invite would download and import the bundle. If the to-device message arrives after the invite has already been accepted we would never download and import the bundle. To mitigate this problem, this patch introduces a task that listens for bundles that arrive. If the bundle is for a room that we have joined we will consider importing the bundle. --- crates/matrix-sdk/CHANGELOG.md | 8 +- .../src/authentication/matrix/mod.rs | 2 +- .../src/authentication/oauth/mod.rs | 4 +- .../src/authentication/oauth/qrcode/login.rs | 2 +- crates/matrix-sdk/src/client/mod.rs | 2 +- crates/matrix-sdk/src/encryption/mod.rs | 17 +++- crates/matrix-sdk/src/encryption/tasks.rs | 83 +++++++++++++++++-- 7 files changed, 101 insertions(+), 17 deletions(-) diff --git a/crates/matrix-sdk/CHANGELOG.md b/crates/matrix-sdk/CHANGELOG.md index f1ee44af7..e45a67934 100644 --- a/crates/matrix-sdk/CHANGELOG.md +++ b/crates/matrix-sdk/CHANGELOG.md @@ -6,9 +6,13 @@ All notable changes to this project will be documented in this file. ## [Unreleased] - ReleaseDate -### Breaking changes: +### Features -- `OAuth::login` now allows requesting additional scopes for the authorization code grant. +- Add support to accept historic room key bundles that arrive out of order, i.e. + the bundle arrives after the invite has already been accepted. + ([#5322](https://github.com/matrix-org/matrix-rust-sdk/pull/5322)) + +- [**breaking**] `OAuth::login` now allows requesting additional scopes for the authorization code grant. ([#5395](https://github.com/matrix-org/matrix-rust-sdk/pull/5395)) ## [0.13.0] - 2025-07-10 diff --git a/crates/matrix-sdk/src/authentication/matrix/mod.rs b/crates/matrix-sdk/src/authentication/matrix/mod.rs index ded0160e4..4ef5115f5 100644 --- a/crates/matrix-sdk/src/authentication/matrix/mod.rs +++ b/crates/matrix-sdk/src/authentication/matrix/mod.rs @@ -802,7 +802,7 @@ impl MatrixAuth { _ => None, }; - self.client.encryption().spawn_initialization_task(auth_data); + self.client.encryption().spawn_initialization_task(auth_data).await; } Ok(()) diff --git a/crates/matrix-sdk/src/authentication/oauth/mod.rs b/crates/matrix-sdk/src/authentication/oauth/mod.rs index 4d6b6fb22..dc9663e0c 100644 --- a/crates/matrix-sdk/src/authentication/oauth/mod.rs +++ b/crates/matrix-sdk/src/authentication/oauth/mod.rs @@ -817,7 +817,7 @@ impl OAuth { } #[cfg(feature = "e2e-encryption")] - self.client.encryption().spawn_initialization_task(None); + self.client.encryption().spawn_initialization_task(None).await; Ok(()) } @@ -1031,7 +1031,7 @@ impl OAuth { self.enable_cross_process_lock().await.map_err(OAuthError::from)?; #[cfg(feature = "e2e-encryption")] - self.client.encryption().spawn_initialization_task(None); + self.client.encryption().spawn_initialization_task(None).await; } Ok(()) diff --git a/crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs b/crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs index 0ec6910f7..b4263ea24 100644 --- a/crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs +++ b/crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs @@ -253,7 +253,7 @@ impl<'a> IntoFuture for LoginWithQrCode<'a> { // ourselves see us as verified and the recovery/backup states will // be known. If we did receive all the secrets in the secrets // bundle, then backups will be enabled after this step as well. - self.client.encryption().spawn_initialization_task(None); + self.client.encryption().spawn_initialization_task(None).await; self.client.encryption().wait_for_e2ee_initialization_tasks().await; trace!("successfully logged in and enabled E2EE."); diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 82c12e7cf..cdf9a7917 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -414,7 +414,7 @@ impl ClientInner { let client = Arc::new(client); #[cfg(feature = "e2e-encryption")] - client.e2ee.initialize_room_key_tasks(&client); + client.e2ee.initialize_tasks(&client); let _ = client .event_cache diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index 68632b83d..367311542 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -62,6 +62,7 @@ use ruma::{ #[cfg(feature = "experimental-send-custom-to-device")] use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices}; use serde::Deserialize; +use tasks::BundleReceiverTask; use tokio::sync::{Mutex, RwLockReadGuard}; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tracing::{debug, error, instrument, trace, warn}; @@ -134,7 +135,7 @@ impl EncryptionData { } } - pub fn initialize_room_key_tasks(&self, client: &Arc) { + pub fn initialize_tasks(&self, client: &Arc) { let weak_client = WeakClient::from_inner(client); let mut tasks = self.tasks.lock(); @@ -1685,10 +1686,20 @@ impl Encryption { /// there is a proposal (MSC3967) to remove this requirement, which would /// allow for the initial upload of cross-signing keys without /// authentication, rendering this parameter obsolete. - pub(crate) fn spawn_initialization_task(&self, auth_data: Option) { + pub(crate) async fn spawn_initialization_task(&self, auth_data: Option) { + // It's fine to be async here as we're only getting the lock protecting the + // `OlmMachine`. Since the lock shouldn't be that contested right after logging + // in we won't delay the login or restoration of the Client. + let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite { + Some(BundleReceiverTask::new(&self.client).await) + } else { + None + }; + let mut tasks = self.client.inner.e2ee.tasks.lock(); let this = self.clone(); + tasks.setup_e2ee = Some(spawn(async move { // Update the current state first, so we don't have to wait for the result of // network requests @@ -1707,6 +1718,8 @@ impl Encryption { error!("Couldn't setup and resume recovery {e:?}"); } })); + + tasks.receive_historic_room_key_bundles = bundle_receiver_task; } /// Waits for end-to-end encryption initialization tasks to finish, if any diff --git a/crates/matrix-sdk/src/encryption/tasks.rs b/crates/matrix-sdk/src/encryption/tasks.rs index b12bdc5de..29230374d 100644 --- a/crates/matrix-sdk/src/encryption/tasks.rs +++ b/crates/matrix-sdk/src/encryption/tasks.rs @@ -14,23 +14,24 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use futures_core::Stream; +use futures_util::{pin_mut, StreamExt}; +use matrix_sdk_base::{crypto::store::types::RoomKeyBundleInfo, RoomState}; use matrix_sdk_common::failures_cache::FailuresCache; use ruma::{ events::room::encrypted::{EncryptedEventScheme, OriginalSyncRoomEncryptedEvent}, serde::Raw, OwnedEventId, OwnedRoomId, }; -use tokio::sync::{ - mpsc::{self, UnboundedReceiver}, - Mutex, -}; -use tracing::{debug, trace, warn}; +use tokio::sync::{mpsc, Mutex}; +use tracing::{debug, info, instrument, trace, warn}; use crate::{ client::WeakClient, encryption::backups::UploadState, executor::{spawn, JoinHandle}, - Client, + room::shared_room_history, + Client, Room, }; /// A cache of room keys we already downloaded. @@ -41,6 +42,7 @@ pub(crate) struct ClientTasks { pub(crate) upload_room_keys: Option, pub(crate) download_room_keys: Option, pub(crate) update_recovery_state_after_backup: Option>, + pub(crate) receive_historic_room_key_bundles: Option, pub(crate) setup_e2ee: Option>, } @@ -72,7 +74,7 @@ impl BackupUploadingTask { let _ = self.sender.send(()); } - pub(crate) async fn listen(client: WeakClient, mut receiver: UnboundedReceiver<()>) { + pub(crate) async fn listen(client: WeakClient, mut receiver: mpsc::UnboundedReceiver<()>) { while receiver.recv().await.is_some() { if let Some(client) = client.get() { let upload_progress = &client.inner.e2ee.backup_state.upload_progress; @@ -176,7 +178,10 @@ impl BackupDownloadTask { /// # Arguments /// /// * `receiver` - The source of incoming [`RoomKeyDownloadRequest`]s. - async fn listen(client: WeakClient, mut receiver: UnboundedReceiver) { + async fn listen( + client: WeakClient, + mut receiver: mpsc::UnboundedReceiver, + ) { let state = Arc::new(Mutex::new(BackupDownloadTaskListenerState::new(client))); while let Some(room_key_download_request) = receiver.recv().await { @@ -385,6 +390,68 @@ impl BackupDownloadTaskListenerState { } } +pub(crate) struct BundleReceiverTask { + _handle: JoinHandle<()>, +} + +impl BundleReceiverTask { + pub async fn new(client: &Client) -> Self { + let stream = client.encryption().historic_room_key_stream().await.expect("E2EE tasks should only be initialized once we have logged in and have access to an OlmMachine"); + let weak_client = WeakClient::from_client(client); + let handle = spawn(Self::listen_task(weak_client, stream)); + + Self { _handle: handle } + } + + async fn listen_task(client: WeakClient, stream: impl Stream) { + pin_mut!(stream); + + // TODO: Listening to this stream is not enough for iOS due to the NSE killing + // our OlmMachine and thus also this stream. We need to add an event handler + // that will listen for the bundle event. To be able to add an event handler, + // we'll have to implement the bundle event in Ruma. + while let Some(bundle_info) = stream.next().await { + let Some(client) = client.get() else { + // The client was dropped while we were waiting on the stream. Let's end the + // loop, since this means that the application has shut down. + break; + }; + + let Some(room) = client.get_room(&bundle_info.room_id) else { + warn!(room_id = %bundle_info.room_id, "Received a historic room key bundle for an unknown room"); + continue; + }; + + Self::handle_bundle(&room, &bundle_info).await; + } + } + + #[instrument(skip(room), fields(room_id = %room.room_id()))] + async fn handle_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) { + if Self::should_accept_bundle(room, bundle_info) { + info!("Accepting a late key bundle."); + + if let Err(e) = + shared_room_history::maybe_accept_key_bundle(room, &bundle_info.sender).await + { + warn!("Couldn't accept a late room key bundle {e:?}"); + } + } else { + info!("Refusing to accept a historic room key bundle."); + } + } + + fn should_accept_bundle(room: &Room, _bundle_info: &RoomKeyBundleInfo) -> bool { + // TODO: Check that the person that invited us to this room is the same as the + // sender, of the bundle. Otherwise don't ignore the bundle. + // TODO: Check that we joined the room "recently". (How do you do this if you + // accept the invite on another client? I guess we remember when the transition + // from Invited to Joined happened, but can't the server force us into a joined + // state if we do this? + room.state() == RoomState::Joined + } +} + #[cfg(all(test, not(target_family = "wasm")))] mod test { use matrix_sdk_test::async_test; From ce66ee4a168aa03e1171d4859dfeb8309115d651 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 13 Jun 2025 12:53:49 +0200 Subject: [PATCH 2/8] test(sdk): Test the conditions under which we accept a historic room key bundle --- crates/matrix-sdk/src/encryption/tasks.rs | 46 +++++++++++++++++++++-- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/crates/matrix-sdk/src/encryption/tasks.rs b/crates/matrix-sdk/src/encryption/tasks.rs index 29230374d..839e7fe76 100644 --- a/crates/matrix-sdk/src/encryption/tasks.rs +++ b/crates/matrix-sdk/src/encryption/tasks.rs @@ -454,13 +454,13 @@ impl BundleReceiverTask { #[cfg(all(test, not(target_family = "wasm")))] mod test { - use matrix_sdk_test::async_test; - use ruma::{event_id, room_id}; + use matrix_sdk_test::{async_test, InvitedRoomBuilder, JoinedRoomBuilder}; + use ruma::{event_id, room_id, user_id}; use serde_json::json; use wiremock::MockServer; use super::*; - use crate::test_utils::logged_in_client; + use crate::test_utils::{logged_in_client, mocks::MatrixMockServer}; // Test that, if backups are not enabled, we don't incorrectly mark a room key // as downloaded. @@ -518,4 +518,44 @@ mod test { ) } } + + /// Test that ensures that we only accept a bundle if a certain set of + /// conditions is met. + #[async_test] + async fn test_should_accept_bundle() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().logged_in_with_oauth().build().await; + + let user_id = user_id!("@alice:localhost"); + let joined_room_id = room_id!("!joined:localhost"); + let invited_rom_id = room_id!("!invited:localhost"); + + server + .mock_sync() + .ok_and_run(&client, |builder| { + builder + .add_joined_room(JoinedRoomBuilder::new(joined_room_id)) + .add_invited_room(InvitedRoomBuilder::new(invited_rom_id)); + }) + .await; + + let room = + client.get_room(joined_room_id).expect("We should have access to our joined room now"); + + let bundle_info = + RoomKeyBundleInfo { sender: user_id.to_owned(), room_id: joined_room_id.to_owned() }; + + assert!(BundleReceiverTask::should_accept_bundle(&room, &bundle_info)); + + let invited_room = + client.get_room(invited_rom_id).expect("We should have access to our invited room now"); + + assert!( + !BundleReceiverTask::should_accept_bundle(&invited_room, &bundle_info), + "We should not accept a bundle if we didn't join the room." + ); + + // TODO: Add more cases here once we figure out the correct acceptance + // rules. + } } From b3c1ca1577407d2af8edcc417c37edaf711b61ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 7 Jul 2025 12:44:41 +0200 Subject: [PATCH 3/8] Use the invite details when deciding if we should accept a bundle --- crates/matrix-sdk/src/encryption/tasks.rs | 99 +++++++++++++++---- .../src/sync_builder/invited_room.rs | 3 +- 2 files changed, 80 insertions(+), 22 deletions(-) diff --git a/crates/matrix-sdk/src/encryption/tasks.rs b/crates/matrix-sdk/src/encryption/tasks.rs index 839e7fe76..660cabe23 100644 --- a/crates/matrix-sdk/src/encryption/tasks.rs +++ b/crates/matrix-sdk/src/encryption/tasks.rs @@ -16,7 +16,9 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; -use matrix_sdk_base::{crypto::store::types::RoomKeyBundleInfo, RoomState}; +use matrix_sdk_base::{ + crypto::store::types::RoomKeyBundleInfo, InviteAcceptanceDetails, RoomState, +}; use matrix_sdk_common::failures_cache::FailuresCache; use ruma::{ events::room::encrypted::{EncryptedEventScheme, OriginalSyncRoomEncryptedEvent}, @@ -441,20 +443,40 @@ impl BundleReceiverTask { } } - fn should_accept_bundle(room: &Room, _bundle_info: &RoomKeyBundleInfo) -> bool { - // TODO: Check that the person that invited us to this room is the same as the - // sender, of the bundle. Otherwise don't ignore the bundle. - // TODO: Check that we joined the room "recently". (How do you do this if you - // accept the invite on another client? I guess we remember when the transition - // from Invited to Joined happened, but can't the server force us into a joined - // state if we do this? - room.state() == RoomState::Joined + fn should_accept_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) -> bool { + // We accept historic room key bundles up to one day after we have accepted an + // invite. + const DAY: Duration = Duration::from_secs(24 * 60 * 60); + + // If we don't have any invite acceptance details, then this client wasn't the + // one that accepted the invite. + let Some(InviteAcceptanceDetails { invite_accepted_at, inviter }) = + room.invite_acceptance_details() + else { + return false; + }; + + let state = room.state(); + let elapsed_since_join = invite_accepted_at.to_system_time().and_then(|t| t.elapsed().ok()); + let bundle_sender = &bundle_info.sender; + + match (state, elapsed_since_join) { + (RoomState::Joined, Some(elapsed_since_join)) => { + elapsed_since_join < DAY && bundle_sender == &inviter + } + (RoomState::Joined, None) => false, + (RoomState::Left | RoomState::Invited | RoomState::Knocked | RoomState::Banned, _) => { + false + } + } } } #[cfg(all(test, not(target_family = "wasm")))] mod test { - use matrix_sdk_test::{async_test, InvitedRoomBuilder, JoinedRoomBuilder}; + use matrix_sdk_test::{ + async_test, event_factory::EventFactory, InvitedRoomBuilder, JoinedRoomBuilder, + }; use ruma::{event_id, room_id, user_id}; use serde_json::json; use wiremock::MockServer; @@ -524,28 +546,51 @@ mod test { #[async_test] async fn test_should_accept_bundle() { let server = MatrixMockServer::new().await; - let client = server.client_builder().logged_in_with_oauth().build().await; - let user_id = user_id!("@alice:localhost"); + let alice_user_id = user_id!("@alice:localhost"); + let bob_user_id = user_id!("@bob:localhost"); let joined_room_id = room_id!("!joined:localhost"); let invited_rom_id = room_id!("!invited:localhost"); + let client = server + .client_builder() + .logged_in_with_token("ABCD".to_owned(), alice_user_id.into(), "DEVICEID".into()) + .build() + .await; + + let event_factory = EventFactory::new().room(invited_rom_id); + let bob_member_event = event_factory.member(bob_user_id).into_raw_timeline(); + let alice_member_event = + event_factory.member(bob_user_id).invited(alice_user_id).into_raw_timeline(); + server .mock_sync() .ok_and_run(&client, |builder| { - builder - .add_joined_room(JoinedRoomBuilder::new(joined_room_id)) - .add_invited_room(InvitedRoomBuilder::new(invited_rom_id)); + builder.add_joined_room(JoinedRoomBuilder::new(joined_room_id)).add_invited_room( + InvitedRoomBuilder::new(invited_rom_id) + .add_state_event(bob_member_event.cast()) + .add_state_event(alice_member_event.cast()), + ); }) .await; let room = client.get_room(joined_room_id).expect("We should have access to our joined room now"); - let bundle_info = - RoomKeyBundleInfo { sender: user_id.to_owned(), room_id: joined_room_id.to_owned() }; + assert!( + room.invite_acceptance_details().is_none(), + "We shouldn't have any invite acceptance details if we didn't join the room on this Client" + ); - assert!(BundleReceiverTask::should_accept_bundle(&room, &bundle_info)); + let bundle_info = RoomKeyBundleInfo { + sender: bob_user_id.to_owned(), + room_id: joined_room_id.to_owned(), + }; + + assert!( + !BundleReceiverTask::should_accept_bundle(&room, &bundle_info), + "We should not acceept a bundle if we did not join the room from this Client" + ); let invited_room = client.get_room(invited_rom_id).expect("We should have access to our invited room now"); @@ -555,7 +600,21 @@ mod test { "We should not accept a bundle if we didn't join the room." ); - // TODO: Add more cases here once we figure out the correct acceptance - // rules. + server.mock_room_join(invited_rom_id).ok().mock_once().mount().await; + + let room = client + .join_room_by_id(invited_rom_id) + .await + .expect("We should be able to join the invited room"); + + let details = room + .invite_acceptance_details() + .expect("We should have stored the invite acceptance details"); + assert_eq!(details.inviter, bob_user_id, "We should have recorded that Bob has invited us"); + + assert!( + BundleReceiverTask::should_accept_bundle(&room, &bundle_info), + "We should accept a bundle if we just joined the room and did so from this very Client object" + ); } } diff --git a/testing/matrix-sdk-test/src/sync_builder/invited_room.rs b/testing/matrix-sdk-test/src/sync_builder/invited_room.rs index 67475bf32..842a86be3 100644 --- a/testing/matrix-sdk-test/src/sync_builder/invited_room.rs +++ b/testing/matrix-sdk-test/src/sync_builder/invited_room.rs @@ -3,7 +3,6 @@ use ruma::{ events::AnyStrippedStateEvent, serde::Raw, }; -use super::StrippedStateTestEvent; use crate::DEFAULT_TEST_ROOM_ID; pub struct InvitedRoomBuilder { @@ -26,7 +25,7 @@ impl InvitedRoomBuilder { } /// Add an event to the state. - pub fn add_state_event(mut self, event: StrippedStateTestEvent) -> Self { + pub fn add_state_event(mut self, event: impl Into>) -> Self { self.inner.invite_state.events.push(event.into()); self } From a6868386d069f1021c972a4ab1efdb930d42c2ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 7 Jul 2025 12:44:41 +0200 Subject: [PATCH 4/8] test(sdk): Allow the test client to enable shared history --- crates/matrix-sdk/src/test_utils/client.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/crates/matrix-sdk/src/test_utils/client.rs b/crates/matrix-sdk/src/test_utils/client.rs index e8fffaced..3c6b3ac84 100644 --- a/crates/matrix-sdk/src/test_utils/client.rs +++ b/crates/matrix-sdk/src/test_utils/client.rs @@ -63,6 +63,23 @@ impl MockClientBuilder { self } + /// Enable the share history on invite feature for the Client. + #[cfg(feature = "e2e-encryption")] + pub fn enable_share_history_on_invite(mut self) -> Self { + self.builder = self.builder.with_enable_share_history_on_invite(true); + self + } + + /// Use the given encryption settings with the test client. + #[cfg(feature = "e2e-encryption")] + pub fn with_encryption_settings( + mut self, + settings: crate::encryption::EncryptionSettings, + ) -> Self { + self.builder = self.builder.with_encryption_settings(settings); + self + } + /// Set the cached server versions in the client. pub fn server_versions(mut self, versions: Vec) -> Self { self.server_versions = ServerVersions::Custom(versions); From 65aec7ee7f99ff26d3ce14da1e3106e8269f53a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 7 Jul 2025 12:44:41 +0200 Subject: [PATCH 5/8] test(sdk): Add a method for two test clients to exchange E2EE identities --- .../src/test_utils/mocks/encryption.rs | 39 ++++++++++++------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/crates/matrix-sdk/src/test_utils/mocks/encryption.rs b/crates/matrix-sdk/src/test_utils/mocks/encryption.rs index da9dcfcbc..d3706badf 100644 --- a/crates/matrix-sdk/src/test_utils/mocks/encryption.rs +++ b/crates/matrix-sdk/src/test_utils/mocks/encryption.rs @@ -110,6 +110,29 @@ impl MatrixMockServer { known_otks.entry(user_id).or_default().entry(device_id).or_default().clear(); } + /// Ensure that the given clients are aware of each others public + /// identities. + pub async fn exchange_e2ee_identities(&self, alice: &Client, bob: &Client) { + let alice_user_id = alice.user_id().expect("Alice should have a user ID configured"); + let bob_user_id = bob.user_id().expect("Bob should have a user ID configured"); + + // Have Alice track Bob, so she queries his keys later. + alice.update_tracked_users_for_testing([bob_user_id]).await; + + // let bob be aware of Alice keys in order to be able to decrypt custom + // to-device (the device keys check are deferred for `m.room.key` so this is not + // needed for sending room messages for example). + bob.update_tracked_users_for_testing([alice_user_id]).await; + + // Have Alice and Bob upload their signed device keys. + self.mock_sync().ok_and_run(alice, |_x| {}).await; + self.mock_sync().ok_and_run(bob, |_x| {}).await; + + // Run a sync so we do send outgoing requests, including the /keys/query for + // getting bob's identity. + self.mock_sync().ok_and_run(alice, |_x| {}).await; + } + /// Utility to properly setup two clients. These two clients will know about /// each others (alice will have downloaded bob device keys). pub async fn set_up_alice_and_bob_for_encryption(&self) -> (Client, Client) { @@ -126,21 +149,7 @@ impl MatrixMockServer { let bob = self.client_builder_for_crypto_end_to_end(&bob_user_id, &bob_device_id).build().await; - // Have Alice track Bob, so she queries his keys later. - alice.update_tracked_users_for_testing([bob_user_id.as_ref()]).await; - - // let bob be aware of Alice keys in order to be able to decrypt custom - // to-device (the device keys check are deferred for `m.room.key` so this is not - // needed for sending room messages for example). - bob.update_tracked_users_for_testing([alice_user_id.as_ref()]).await; - - // Have Alice and Bob upload their signed device keys. - self.mock_sync().ok_and_run(&alice, |_x| {}).await; - self.mock_sync().ok_and_run(&bob, |_x| {}).await; - - // Run a sync so we do send outgoing requests, including the /keys/query for - // getting bob's identity. - self.mock_sync().ok_and_run(&alice, |_x| {}).await; + self.exchange_e2ee_identities(&alice, &bob).await; (alice, bob) } From e4849d5cabe3c76b77f690ca02dd859b5e4cf73d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 7 Jul 2025 12:44:41 +0200 Subject: [PATCH 6/8] test(sdk): Don't expect a default access token in a bunch of methods The mocks can be configured to expect a default access token separately, this seems to have been a copy/paste error. --- crates/matrix-sdk/src/test_utils/mocks/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/matrix-sdk/src/test_utils/mocks/mod.rs b/crates/matrix-sdk/src/test_utils/mocks/mod.rs index b0a35ef64..9d0c7f7ba 100644 --- a/crates/matrix-sdk/src/test_utils/mocks/mod.rs +++ b/crates/matrix-sdk/src/test_utils/mocks/mod.rs @@ -382,7 +382,7 @@ impl MatrixMockServer { pub fn mock_room_send(&self) -> MockEndpoint<'_, RoomSendEndpoint> { let mock = Mock::given(method("PUT")) .and(path_regex(r"^/_matrix/client/v3/rooms/.*/send/.*".to_owned())); - self.mock_endpoint(mock, RoomSendEndpoint).expect_default_access_token() + self.mock_endpoint(mock, RoomSendEndpoint) } /// Creates a prebuilt mock for sending a state event in a room. @@ -561,7 +561,7 @@ impl MatrixMockServer { /// Create a prebuilt mock for uploading media. pub fn mock_upload(&self) -> MockEndpoint<'_, UploadEndpoint> { let mock = Mock::given(method("POST")).and(path("/_matrix/media/v3/upload")); - self.mock_endpoint(mock, UploadEndpoint).expect_default_access_token() + self.mock_endpoint(mock, UploadEndpoint) } /// Create a prebuilt mock for resolving room aliases. @@ -1197,7 +1197,7 @@ impl MatrixMockServer { &self, ) -> MockEndpoint<'_, AuthenticatedMediaConfigEndpoint> { let mock = Mock::given(method("GET")).and(path("/_matrix/client/v1/media/config")); - self.mock_endpoint(mock, AuthenticatedMediaConfigEndpoint).expect_default_access_token() + self.mock_endpoint(mock, AuthenticatedMediaConfigEndpoint) } /// Create a prebuilt mock for the endpoint used to log into a session. From d42d0f3e175afc6dca9b25be9df94704b41f86db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 7 Jul 2025 12:44:41 +0200 Subject: [PATCH 7/8] test(sdk): Add a bunch more useful mock helpers --- crates/matrix-sdk/src/test_utils/mocks/mod.rs | 217 +++++++++++++++++- 1 file changed, 209 insertions(+), 8 deletions(-) diff --git a/crates/matrix-sdk/src/test_utils/mocks/mod.rs b/crates/matrix-sdk/src/test_utils/mocks/mod.rs index 9d0c7f7ba..80abc2996 100644 --- a/crates/matrix-sdk/src/test_utils/mocks/mod.rs +++ b/crates/matrix-sdk/src/test_utils/mocks/mod.rs @@ -35,17 +35,19 @@ use ruma::{ directory::PublicRoomsChunk, encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, events::{ - receipt::ReceiptThread, room::member::RoomMemberEvent, AnyStateEvent, AnyTimelineEvent, - GlobalAccountDataEventType, MessageLikeEventType, RoomAccountDataEventType, StateEventType, + receipt::ReceiptThread, room::member::RoomMemberEvent, AnyStateEvent, AnySyncTimelineEvent, + AnyTimelineEvent, GlobalAccountDataEventType, MessageLikeEventType, + RoomAccountDataEventType, StateEventType, }, media::Method, serde::Raw, time::Duration, - DeviceId, EventId, MxcUri, OwnedDeviceId, OwnedEventId, OwnedOneTimeKeyId, OwnedRoomId, - OwnedUserId, RoomId, ServerName, UserId, + DeviceId, EventId, MilliSecondsSinceUnixEpoch, MxcUri, OwnedDeviceId, OwnedEventId, + OwnedOneTimeKeyId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId, }; use serde::Deserialize; -use serde_json::{json, Value}; +use serde_json::{from_value, json, Value}; +use tokio::sync::oneshot::{self, Receiver}; use wiremock::{ matchers::{body_json, body_partial_json, header, method, path, path_regex, query_param}, Mock, MockBuilder, MockGuard, MockServer, Request, Respond, ResponseTemplate, Times, @@ -342,6 +344,36 @@ impl MatrixMockServer { .expect_default_access_token() } + /// Creates a prebuilt mock for joining a room. + /// + /// # Examples + /// + /// ``` + /// # tokio_test::block_on(async { + /// use matrix_sdk::{ruma::{room_id, event_id}, test_utils::mocks::MatrixMockServer}; + /// use serde_json::json; + /// + /// let mock_server = MatrixMockServer::new().await; + /// let client = mock_server.client_builder().build().await; + /// let room_id = room_id!("!test:localhost"); + /// + /// mock_server.mock_room_join(room_id).ok().mount(); + /// + /// let room = client.join_room_by_id(room_id).await?; + /// + /// assert_eq!( + /// room_id, + /// room.room_id(), + /// "The room ID we mocked should match the one we received when we joined the room" + /// ); + /// # anyhow::Ok(()) }); + /// ``` + pub fn mock_room_join(&self, room_id: &RoomId) -> MockEndpoint<'_, JoinRoomEndpoint> { + let mock = Mock::given(method("POST")) + .and(path_regex(format!("^/_matrix/client/v3/rooms/{room_id}/join"))); + self.mock_endpoint(mock, JoinRoomEndpoint { room_id: room_id.to_owned() }) + } + /// Creates a prebuilt mock for sending an event in a room. /// /// Note: works with *any* room. @@ -1832,6 +1864,102 @@ impl<'a> MockEndpoint<'a, RoomSendEndpoint> { pub fn ok(self, returned_event_id: impl Into) -> MatrixMock<'a> { self.ok_with_event_id(returned_event_id.into()) } + + /// Returns a send endpoint that emulates success, i.e. the event has been + /// sent with the given event id. + /// + /// The sent event is captured and can be accessed using the returned + /// [`Receiver`]. The [`Receiver`] is valid only for a send call. The given + /// `event_sender` are added to the event JSON. + /// + /// # Examples + /// + /// ```no_run + /// # tokio_test::block_on(async { + /// use matrix_sdk::{ + /// ruma::{ + /// event_id, events::room::message::RoomMessageEventContent, room_id, + /// }, + /// test_utils::mocks::MatrixMockServer, + /// }; + /// use matrix_sdk_test::JoinedRoomBuilder; + /// + /// let room_id = room_id!("!room_id:localhost"); + /// let event_id = event_id!("$some_id"); + /// + /// let server = MatrixMockServer::new().await; + /// let client = server.client_builder().build().await; + /// + /// let user_id = client.user_id().expect("We should have a user ID by now"); + /// + /// let (receiver, mock) = + /// server.mock_room_send().ok_with_capture(event_id, user_id); + /// + /// server + /// .mock_sync() + /// .ok_and_run(&client, |builder| { + /// builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + /// }) + /// .await; + /// + /// // Mock any additional endpoints that might be needed to send the message. + /// + /// let room = client + /// .get_room(room_id) + /// .expect("We should have access to our room now"); + /// + /// let event_id = room + /// .send(RoomMessageEventContent::text_plain("It's a secret to everybody")) + /// .await + /// .expect("We should be able to send an initial message") + /// .event_id; + /// + /// let event = receiver.await?; + /// # anyhow::Ok(()) }); + /// ``` + pub fn ok_with_capture( + self, + returned_event_id: impl Into, + event_sender: impl Into, + ) -> (Receiver>, MatrixMock<'a>) { + let event_id = returned_event_id.into(); + let event_sender = event_sender.into(); + + let (sender, receiver) = oneshot::channel(); + let sender = Arc::new(Mutex::new(Some(sender))); + + let ret = self.respond_with(move |request: &Request| { + if let Some(sender) = sender.lock().unwrap().take() { + let uri = &request.url; + let path_segments = uri.path_segments(); + let maybe_event_type = path_segments.and_then(|mut s| s.nth_back(1)); + let event_type = maybe_event_type + .as_ref() + .map(|&e| e.to_owned()) + .unwrap_or("m.room.message".to_owned()); + + let body: Value = + request.body_json().expect("The received body should be valid JSON"); + + let event = json!({ + "event_id": event_id.clone(), + "sender": event_sender, + "type": event_type, + "origin_server_ts": MilliSecondsSinceUnixEpoch::now(), + "content": body, + }); + + let event: Raw = from_value(event) + .expect("We should be able to create a raw event from the content"); + + sender.send(event).expect("We should be able to send the event to the receiver"); + } + + ResponseTemplate::new(200).set_body_json(json!({ "event_id": event_id.clone() })) + }); + + (receiver, ret) + } } /// A prebuilt mock for sending a state event in a room. @@ -2444,8 +2572,58 @@ impl<'a> MockEndpoint<'a, UploadEndpoint> { Self { mock: self.mock.and(header("content-type", content_type)), ..self } } - /// Returns a redact endpoint that emulates success, i.e. the redaction - /// event has been sent with the given event id. + /// Returns a upload endpoint that emulates success, i.e. the media has been + /// uploaded to the media server and can be accessed using the given + /// event has been sent with the given [`MxcUri`]. + /// + /// The uploaded content is captured and can be accessed using the returned + /// [`Receiver`]. The [`Receiver`] is valid only for a single media + /// upload. + /// + /// # Examples + /// + /// ```no_run + /// # tokio_test::block_on(async { + /// use matrix_sdk::{ + /// ruma::{event_id, mxc_uri, room_id}, + /// test_utils::mocks::MatrixMockServer, + /// }; + /// + /// let mxid = mxc_uri!("mxc://localhost/12345"); + /// + /// let server = MatrixMockServer::new().await; + /// let (receiver, upload_mock) = server.mock_upload().ok_with_capture(mxid); + /// let client = server.client_builder().build().await; + /// + /// client.media().upload(&mime::TEXT_PLAIN, vec![1, 2, 3, 4, 5], None).await?; + /// + /// let uploaded = receiver.await?; + /// + /// assert_eq!(uploaded, vec![1, 2, 3, 4, 5]); + /// # anyhow::Ok(()) }); + /// ``` + pub fn ok_with_capture(self, mxc_id: &MxcUri) -> (Receiver>, MatrixMock<'a>) { + let (sender, receiver) = oneshot::channel(); + let sender = Arc::new(Mutex::new(Some(sender))); + let response_body = json!({"content_uri": mxc_id}); + + let ret = self.respond_with(move |request: &Request| { + let maybe_sender = sender.lock().unwrap().take(); + + if let Some(sender) = maybe_sender { + let body = request.body.clone(); + let _ = sender.send(body); + } + + ResponseTemplate::new(200).set_body_json(response_body.clone()) + }); + + (receiver, ret) + } + + /// Returns a upload endpoint that emulates success, i.e. the media has been + /// uploaded to the media server and can be accessed using the given + /// event has been sent with the given [`MxcUri`]. pub fn ok(self, mxc_id: &MxcUri) -> MatrixMock<'a> { self.respond_with(ResponseTemplate::new(200).set_body_json(json!({ "content_uri": mxc_id @@ -3269,7 +3447,7 @@ impl<'a> MockEndpoint<'a, LoginEndpoint> { /// /// # Examples /// - /// ```rust + /// ``` /// use matrix_sdk::test_utils::mocks::{ /// LoginResponseTemplate200, MatrixMockServer, /// }; @@ -3483,6 +3661,13 @@ impl<'a> MockEndpoint<'a, MediaDownloadEndpoint> { self.respond_with(ResponseTemplate::new(200).set_body_string("Hello, World!")) } + /// Returns a successful response with the given bytes. + pub fn ok_bytes(self, bytes: Vec) -> MatrixMock<'a> { + self.respond_with( + ResponseTemplate::new(200).set_body_raw(bytes, "application/octet-stream"), + ) + } + /// Returns a successful response with a fake image content. pub fn ok_image(self) -> MatrixMock<'a> { self.respond_with( @@ -3531,3 +3716,19 @@ impl<'a> MockEndpoint<'a, AuthedMediaThumbnailEndpoint> { ) } } + +/// A prebuilt mock for `GET /client/v3/rooms/{room_id}/join` requests. +pub struct JoinRoomEndpoint { + room_id: OwnedRoomId, +} + +impl<'a> MockEndpoint<'a, JoinRoomEndpoint> { + /// Returns a successful response using the provided [`RoomId`]. + pub fn ok(self) -> MatrixMock<'a> { + let room_id = self.endpoint.room_id.to_owned(); + + self.respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "room_id": room_id, + }))) + } +} From f14994baa91e3cda3566c0e28815d0da06d88ac2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 7 Jul 2025 12:44:41 +0200 Subject: [PATCH 8/8] test(sdk): Test if we accept historic room key bundles arriving out of order --- .../tests/integration/encryption.rs | 1 + .../integration/encryption/shared_history.rs | 183 ++++++++++++++++++ 2 files changed, 184 insertions(+) create mode 100644 crates/matrix-sdk/tests/integration/encryption/shared_history.rs diff --git a/crates/matrix-sdk/tests/integration/encryption.rs b/crates/matrix-sdk/tests/integration/encryption.rs index 7460433b9..bf43db6ea 100644 --- a/crates/matrix-sdk/tests/integration/encryption.rs +++ b/crates/matrix-sdk/tests/integration/encryption.rs @@ -2,6 +2,7 @@ mod backups; mod cross_signing; mod recovery; mod secret_storage; +mod shared_history; mod to_device; mod verification; diff --git a/crates/matrix-sdk/tests/integration/encryption/shared_history.rs b/crates/matrix-sdk/tests/integration/encryption/shared_history.rs new file mode 100644 index 000000000..dd79bb0ce --- /dev/null +++ b/crates/matrix-sdk/tests/integration/encryption/shared_history.rs @@ -0,0 +1,183 @@ +use futures_util::{FutureExt, StreamExt}; +use matrix_sdk::{ + assert_decrypted_message_eq, assert_next_matches_with_timeout, + deserialized_responses::{TimelineEvent, UnableToDecryptInfo, UnableToDecryptReason}, + encryption::EncryptionSettings, + test_utils::mocks::MatrixMockServer, +}; +use matrix_sdk_test::{ + async_test, event_factory::EventFactory, InvitedRoomBuilder, JoinedRoomBuilder, StateTestEvent, +}; +use ruma::{ + device_id, event_id, events::room::message::RoomMessageEventContent, mxc_uri, room_id, user_id, +}; + +#[async_test] +async fn test_shared_history_out_of_order() { + let room_id = room_id!("!test:localhost"); + let mxid = mxc_uri!("mxc://localhost/12345"); + + let alice_user_id = user_id!("@alice:localhost"); + let alice_device_id = device_id!("ALICEDEVICE"); + let bob_user_id = user_id!("@bob:localhost"); + let bob_device_id = device_id!("BOBDEVICE"); + + let matrix_mock_server = MatrixMockServer::new().await; + matrix_mock_server.mock_crypto_endpoints_preset().await; + matrix_mock_server.mock_invite_user_by_id().ok().mock_once().mount().await; + + let encryption_settings = + EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() }; + + let alice = matrix_mock_server + .client_builder_for_crypto_end_to_end(alice_user_id, alice_device_id) + .enable_share_history_on_invite() + .with_encryption_settings(encryption_settings) + .build() + .await; + + let bob = matrix_mock_server + .client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id) + .enable_share_history_on_invite() + .with_encryption_settings(encryption_settings) + .build() + .await; + + matrix_mock_server.exchange_e2ee_identities(&alice, &bob).await; + + let event_factory = EventFactory::new().room(room_id); + let alice_member_event = event_factory.member(alice_user_id).into_raw_timeline(); + + matrix_mock_server + .mock_sync() + .ok_and_run(&alice, |builder| { + builder.add_joined_room( + JoinedRoomBuilder::new(room_id) + .add_state_event(StateTestEvent::Create) + .add_state_event(StateTestEvent::Encryption), + ); + }) + .await; + + let room = + alice.get_room(room_id).expect("Alice should have access to the room now that we synced"); + + let event_id = event_id!("$some_id"); + let (event_receiver, mock) = + matrix_mock_server.mock_room_send().ok_with_capture(event_id, alice_user_id); + + mock.mock_once().mount().await; + + matrix_mock_server + .mock_get_members() + .ok(vec![alice_member_event.clone().cast()]) + .mock_once() + .mount() + .await; + + let event_id = room + .send(RoomMessageEventContent::text_plain("It's a secret to everybody")) + .await + .expect("We should be able to send an initial message") + .event_id; + + matrix_mock_server.mock_authenticated_media_config().ok_default().mock_once().mount().await; + + let (receiver, upload_mock) = matrix_mock_server.mock_upload().ok_with_capture(mxid); + upload_mock.mock_once().mount().await; + + let (_guard, bundle_info) = matrix_mock_server.mock_capture_put_to_device(alice_user_id).await; + + room.invite_user_by_id(bob_user_id).await.expect("We should be able to invite Bob"); + let bundle = receiver.await.expect("We should have received a bundle now."); + + let mut bundle_stream = bob + .encryption() + .historic_room_key_stream() + .await + .expect("We should be able to get the bundle stream"); + + let bob_member_event = event_factory.member(alice_user_id).invited(bob_user_id); + + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_invited_room( + InvitedRoomBuilder::new(room_id) + .add_state_event(alice_member_event.cast()) + .add_state_event(bob_member_event.into_raw_timeline().cast()), + ); + }) + .await; + + let bob_room = bob.get_room(room_id).expect("Bob should have access to the invited room"); + + matrix_mock_server.mock_room_join(room_id).ok().mock_once().mount().await; + bob_room.join().await.expect("Bob should be able to join the room"); + + let details = bob_room + .invite_acceptance_details() + .expect("We should have stored invite acceptance details"); + + assert_eq!( + details.inviter, + alice.user_id().unwrap(), + "We should have recorded that Alice has invited us" + ); + + let bundle_info = bundle_info.await; + matrix_mock_server.mock_media_download().ok_bytes(bundle).mock_once().mount().await; + + let mut room_key_stream = bob + .encryption() + .room_keys_received_stream() + .await + .expect("We should be able to listen to received room keys"); + + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_to_device_event( + bundle_info + .deserialize_as() + .expect("We should be able to deserialize the bundle info"), + ); + }) + .await; + + let bundle_notification = bundle_stream + .next() + .now_or_never() + .flatten() + .expect("We should have been notiifed about the received bundle"); + + assert_eq!(bundle_notification.sender, alice_user_id); + assert_eq!(bundle_notification.room_id, room_id); + + assert_next_matches_with_timeout!(room_key_stream, 1000, Ok(room_key_infos) => assert_eq!(room_key_infos.len(), 1)); + + let event = event_receiver.await.expect("We should have received Alice's event"); + + matrix_mock_server + .mock_room_event() + .room(room_id) + .match_event_id() + .ok(TimelineEvent::from_utd( + event, + UnableToDecryptInfo { session_id: None, reason: UnableToDecryptReason::Unknown }, + )) + .mock_once() + .mount() + .await; + + let event = bob_room + .event(&event_id, None) + .await + .expect("Bob should be able to fetch the event Alice has sent"); + + assert_decrypted_message_eq!( + event, + "It's a secret to everybody", + "The decrypted event should match the message Alice has sent" + ); +}