From ac6ccd3384dd106d99a5de40a8aea8034339b969 Mon Sep 17 00:00:00 2001 From: Skye Elliot Date: Wed, 25 Feb 2026 15:21:24 +0000 Subject: [PATCH 1/9] refactor(sdk): Move `should_accept_bundle` into `shared_room_history` Signed-off-by: Skye Elliot --- crates/matrix-sdk/src/encryption/tasks.rs | 130 +--------------- .../src/room/shared_room_history.rs | 147 +++++++++++++++++- 2 files changed, 150 insertions(+), 127 deletions(-) diff --git a/crates/matrix-sdk/src/encryption/tasks.rs b/crates/matrix-sdk/src/encryption/tasks.rs index 6aa39203e..1977c8f07 100644 --- a/crates/matrix-sdk/src/encryption/tasks.rs +++ b/crates/matrix-sdk/src/encryption/tasks.rs @@ -16,14 +16,11 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; use futures_core::Stream; use futures_util::{StreamExt, pin_mut}; +use matrix_sdk_base::crypto::store::types::RoomKeyBundleInfo; #[cfg(feature = "experimental-encrypted-state-events")] use matrix_sdk_base::crypto::types::events::room::encrypted::{ EncryptedEvent, RoomEventEncryptionScheme, }; -use matrix_sdk_base::{ - RoomState, - crypto::store::types::{RoomKeyBundleInfo, RoomPendingKeyBundleDetails}, -}; use matrix_sdk_common::failures_cache::FailuresCache; #[cfg(not(feature = "experimental-encrypted-state-events"))] use ruma::events::room::encrypted::{EncryptedEventScheme, OriginalSyncRoomEncryptedEvent}; @@ -488,7 +485,7 @@ impl BundleReceiverTask { /// thread will process it. #[instrument(skip(room), fields(room_id = %room.room_id()))] async fn handle_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) { - if Self::should_accept_bundle(room, bundle_info).await { + if shared_room_history::should_accept_key_bundle(room, bundle_info).await { info!("Accepting a late key bundle."); if let Err(e) = @@ -500,51 +497,19 @@ impl BundleReceiverTask { info!("Refusing to accept a historic room key bundle."); } } - - async fn should_accept_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) -> bool { - // We accept historic room key bundles up to one day after we have accepted an - // invite. - const DAY: Duration = Duration::from_secs(24 * 60 * 60); - - // If we don't have any invite acceptance details, then this client wasn't the - // one that accepted the invite. - let Ok(Some(RoomPendingKeyBundleDetails { invite_accepted_at, inviter, .. })) = - room.client.base_client().get_pending_key_bundle_details_for_room(room.room_id()).await - else { - debug!("Not accepting key bundle as there are no recorded invite acceptance details"); - return false; - }; - - let state = room.state(); - let elapsed_since_join = invite_accepted_at.to_system_time().and_then(|t| t.elapsed().ok()); - let bundle_sender = &bundle_info.sender; - - match (state, elapsed_since_join) { - (RoomState::Joined, Some(elapsed_since_join)) => { - elapsed_since_join < DAY && bundle_sender == &inviter - } - (RoomState::Joined, None) => false, - (RoomState::Left | RoomState::Invited | RoomState::Knocked | RoomState::Banned, _) => { - false - } - } - } } #[cfg(all(test, not(target_family = "wasm")))] mod test { - use matrix_sdk_test::{ - InvitedRoomBuilder, JoinedRoomBuilder, async_test, event_factory::EventFactory, - }; + use matrix_sdk_test::async_test; #[cfg(not(feature = "experimental-encrypted-state-events"))] use ruma::events::room::encrypted::OriginalSyncRoomEncryptedEvent; - use ruma::{event_id, room_id, user_id}; + use ruma::{event_id, room_id}; use serde_json::json; - use vodozemac::Curve25519PublicKey; use wiremock::MockServer; use super::*; - use crate::test_utils::{logged_in_client, mocks::MatrixMockServer}; + use crate::test_utils::logged_in_client; // Test that, if backups are not enabled, we don't incorrectly mark a room key // as downloaded. @@ -606,89 +571,4 @@ mod test { ) } } - - /// Test that ensures that we only accept a bundle if a certain set of - /// conditions is met. - #[async_test] - async fn test_should_accept_bundle() { - let server = MatrixMockServer::new().await; - - let alice_user_id = user_id!("@alice:localhost"); - let bob_user_id = user_id!("@bob:localhost"); - let joined_room_id = room_id!("!joined:localhost"); - let invited_rom_id = room_id!("!invited:localhost"); - - let client = server - .client_builder() - .logged_in_with_token("ABCD".to_owned(), alice_user_id.into(), "DEVICEID".into()) - .build() - .await; - - let event_factory = EventFactory::new().room(invited_rom_id); - let bob_member_event = event_factory.member(bob_user_id); - let alice_member_event = event_factory.member(bob_user_id).invited(alice_user_id); - - server - .mock_sync() - .ok_and_run(&client, |builder| { - builder.add_joined_room(JoinedRoomBuilder::new(joined_room_id)).add_invited_room( - InvitedRoomBuilder::new(invited_rom_id) - .add_state_event(bob_member_event) - .add_state_event(alice_member_event), - ); - }) - .await; - - let room = - client.get_room(joined_room_id).expect("We should have access to our joined room now"); - - assert!( - client - .base_client() - .get_pending_key_bundle_details_for_room(room.room_id()) - .await - .unwrap() - .is_none(), - "We shouldn't have any invite acceptance details if we didn't join the room on this Client" - ); - - let bundle_info = RoomKeyBundleInfo { - sender: bob_user_id.to_owned(), - sender_key: Curve25519PublicKey::from_bytes([0u8; 32]), - room_id: joined_room_id.to_owned(), - }; - - assert!( - !BundleReceiverTask::should_accept_bundle(&room, &bundle_info).await, - "We should not accept a bundle if we did not join the room from this Client" - ); - - let invited_room = - client.get_room(invited_rom_id).expect("We should have access to our invited room now"); - - assert!( - !BundleReceiverTask::should_accept_bundle(&invited_room, &bundle_info).await, - "We should not accept a bundle if we didn't join the room." - ); - - server.mock_room_join(invited_rom_id).ok().mock_once().mount().await; - - let room = client - .join_room_by_id(invited_rom_id) - .await - .expect("We should be able to join the invited room"); - - let details = client - .base_client() - .get_pending_key_bundle_details_for_room(room.room_id()) - .await - .unwrap() - .expect("We should have stored the invite acceptance details"); - assert_eq!(details.inviter, bob_user_id, "We should have recorded that Bob has invited us"); - - assert!( - BundleReceiverTask::should_accept_bundle(&room, &bundle_info).await, - "We should accept a bundle if we just joined the room and did so from this very Client object" - ); - } } diff --git a/crates/matrix-sdk/src/room/shared_room_history.rs b/crates/matrix-sdk/src/room/shared_room_history.rs index 509998aa4..0fe42ec62 100644 --- a/crates/matrix-sdk/src/room/shared_room_history.rs +++ b/crates/matrix-sdk/src/room/shared_room_history.rs @@ -12,10 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::HashSet, iter}; +use std::{collections::HashSet, iter, time::Duration}; use matrix_sdk_base::{ - crypto::{store::types::Changes, types::events::room_key_bundle::RoomKeyBundleContent}, + RoomState, + crypto::{ + store::types::{Changes, RoomKeyBundleInfo, RoomPendingKeyBundleDetails}, + types::events::room_key_bundle::RoomKeyBundleContent, + }, media::{MediaFormat, MediaRequestParameters}, }; use ruma::{OwnedUserId, UserId, events::room::MediaSource}; @@ -111,6 +115,48 @@ pub(super) async fn share_room_history(room: &Room, user_id: OwnedUserId) -> Res Ok(()) } +/// Determines whether a room key bundle should be accepted for a given room. +/// +/// This function checks if the client has recorded invite acceptance details +/// for the room and ensures that the bundle sender matches the inviter. +/// Additionally, it verifies that the room is in a joined state and that the +/// bundle is received within one day of the invite being accepted. +/// +/// # Arguments +/// +/// * `room` - The room for which the key bundle acceptance is being evaluated. +/// * `bundle_info` - Information about the room key bundle being evaluated. +/// +/// # Returns +/// +/// Returns `true` if the key bundle should be accepted, otherwise `false`. +pub(crate) async fn should_accept_key_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) -> bool { + // We accept historic room key bundles up to one day after we have accepted an + // invite. + const DAY: Duration = Duration::from_secs(24 * 60 * 60); + + // If we don't have any invite acceptance details, then this client wasn't the + // one that accepted the invite. + let Ok(Some(RoomPendingKeyBundleDetails { invite_accepted_at, inviter, .. })) = + room.client.base_client().get_pending_key_bundle_details_for_room(room.room_id()).await + else { + debug!("Not accepting key bundle as there are no recorded invite acceptance details"); + return false; + }; + + let state = room.state(); + let elapsed_since_join = invite_accepted_at.to_system_time().and_then(|t| t.elapsed().ok()); + let bundle_sender = &bundle_info.sender; + + match (state, elapsed_since_join) { + (RoomState::Joined, Some(elapsed_since_join)) => { + elapsed_since_join < DAY && bundle_sender == &inviter + } + (RoomState::Joined, None) => false, + (RoomState::Left | RoomState::Invited | RoomState::Knocked | RoomState::Banned, _) => false, + } +} + /// Having accepted an invite for the given room from the given user, attempt to /// find a information about a room key bundle and, if found, download the /// bundle and import the room keys, as per [MSC4268]. @@ -194,3 +240,100 @@ pub(crate) async fn maybe_accept_key_bundle(room: &Room, inviter: &UserId) -> Re Ok(()) } + +#[cfg(all(test, not(target_family = "wasm")))] +mod test { + use matrix_sdk_base::crypto::store::types::RoomKeyBundleInfo; + use matrix_sdk_test::{ + InvitedRoomBuilder, JoinedRoomBuilder, async_test, event_factory::EventFactory, + }; + use ruma::{room_id, user_id}; + use vodozemac::Curve25519PublicKey; + + use crate::{room::shared_room_history, test_utils::mocks::MatrixMockServer}; + + /// Test that ensures that we only accept a bundle if a certain set of + /// conditions is met. + #[async_test] + async fn test_should_accept_bundle() { + let server = MatrixMockServer::new().await; + + let alice_user_id = user_id!("@alice:localhost"); + let bob_user_id = user_id!("@bob:localhost"); + let joined_room_id = room_id!("!joined:localhost"); + let invited_rom_id = room_id!("!invited:localhost"); + + let client = server + .client_builder() + .logged_in_with_token("ABCD".to_owned(), alice_user_id.into(), "DEVICEID".into()) + .build() + .await; + + let event_factory = EventFactory::new().room(invited_rom_id); + let bob_member_event = event_factory.member(bob_user_id); + let alice_member_event = event_factory.member(bob_user_id).invited(alice_user_id); + + server + .mock_sync() + .ok_and_run(&client, |builder| { + builder.add_joined_room(JoinedRoomBuilder::new(joined_room_id)).add_invited_room( + InvitedRoomBuilder::new(invited_rom_id) + .add_state_event(bob_member_event) + .add_state_event(alice_member_event), + ); + }) + .await; + + let room = + client.get_room(joined_room_id).expect("We should have access to our joined room now"); + + assert!( + client + .base_client() + .get_pending_key_bundle_details_for_room(room.room_id()) + .await + .unwrap() + .is_none(), + "We shouldn't have any invite acceptance details if we didn't join the room on this Client" + ); + + let bundle_info = RoomKeyBundleInfo { + sender: bob_user_id.to_owned(), + sender_key: Curve25519PublicKey::from_bytes([0u8; 32]), + room_id: joined_room_id.to_owned(), + }; + + assert!( + !shared_room_history::should_accept_key_bundle(&room, &bundle_info).await, + "We should not accept a bundle if we did not join the room from this Client" + ); + + let invited_room = + client.get_room(invited_rom_id).expect("We should have access to our invited room now"); + + assert!( + !shared_room_history::should_accept_key_bundle(&invited_room, &bundle_info).await, + "We should not accept a bundle if we didn't join the room." + ); + + server.mock_room_join(invited_rom_id).ok().mock_once().mount().await; + + let room = client + .join_room_by_id(invited_rom_id) + .await + .expect("We should be able to join the invited room"); + + let details = client + .base_client() + .get_pending_key_bundle_details_for_room(room.room_id()) + .await + .unwrap() + .expect("We should have stored the invite acceptance details"); + assert_eq!(details.inviter, bob_user_id, "We should have recorded that Bob has invited us"); + + assert!( + shared_room_history::should_accept_key_bundle(&room, &bundle_info).await, + "We should accept a bundle if we just joined the room and did so from this very Client object" + ); + } +} From 7fbc3e78e9cbb0b0724578f6bae0600b11c36aa6 Mon Sep 17 00:00:00 2001 From: Skye Elliot Date: Wed, 25 Feb 2026 15:48:10 +0000 Subject: [PATCH 2/9] feat(sdk): Try import stored room key bundles on startup Signed-off-by: Skye Elliot --- crates/matrix-sdk/src/encryption/tasks.rs | 36 ++++++++++++++++++- .../src/room/shared_room_history.rs | 4 +++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/encryption/tasks.rs b/crates/matrix-sdk/src/encryption/tasks.rs index 1977c8f07..82f94845d 100644 --- a/crates/matrix-sdk/src/encryption/tasks.rs +++ b/crates/matrix-sdk/src/encryption/tasks.rs @@ -16,7 +16,7 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; use futures_core::Stream; use futures_util::{StreamExt, pin_mut}; -use matrix_sdk_base::crypto::store::types::RoomKeyBundleInfo; +use matrix_sdk_base::crypto::store::types::{RoomKeyBundleInfo, RoomPendingKeyBundleDetails}; #[cfg(feature = "experimental-encrypted-state-events")] use matrix_sdk_base::crypto::types::events::room::encrypted::{ EncryptedEvent, RoomEventEncryptionScheme, @@ -446,6 +446,10 @@ impl BundleReceiverTask { async fn listen_task(client: WeakClient, stream: impl Stream) { pin_mut!(stream); + // Before we listen for new bundles, we check if we have existing ones + // unimported. + Self::try_import_stored_bundles(&client).await; + // TODO: Listening to this stream is not enough for iOS due to the NSE killing // our OlmMachine and thus also this stream. We need to add an event handler // that will listen for the bundle event. To be able to add an event handler, @@ -466,6 +470,36 @@ impl BundleReceiverTask { } } + /// Retrieves a list of all rooms pending key bundles, then cross-references + /// this with bundles held in the crypto store. If all conditions outlined + /// in [`shared_room_history::maybe_accept_key_bundle`], then the bundle + /// will be imported. + async fn try_import_stored_bundles(client: &WeakClient) { + let Some(client) = client.get() else { + return; + }; + + let olm_machine = client.olm_machine().await; + let Some(olm_machine) = olm_machine.as_ref() else { + return; + }; + let Ok(room_details) = olm_machine.store().get_all_rooms_pending_key_bundles().await else { + return; + }; + + for RoomPendingKeyBundleDetails { room_id, inviter, .. } in &room_details { + let Some(room) = client.get_room(room_id) else { + continue; + }; + let Ok(Some(bundle)) = + olm_machine.store().get_received_room_key_bundle_data(room_id, inviter).await + else { + continue; + }; + Self::handle_bundle(&room, &(&bundle).into()).await; + } + } + /// We have received a key bundle for a given room: check if we recently /// accepted an invite from the sender of the bundle, and if so, join /// the room. diff --git a/crates/matrix-sdk/src/room/shared_room_history.rs b/crates/matrix-sdk/src/room/shared_room_history.rs index 0fe42ec62..3dcbdfa9e 100644 --- a/crates/matrix-sdk/src/room/shared_room_history.rs +++ b/crates/matrix-sdk/src/room/shared_room_history.rs @@ -238,6 +238,10 @@ pub(crate) async fn maybe_accept_key_bundle(room: &Room, inviter: &UserId) -> Re // olm_machine.store().clear_received_room_key_bundle_data(room.room_id(), // user_id).await?; + // If we have reached this point, the bundle was successfully imported, so + // we can clear its pending state. + olm_machine.store().clear_room_pending_key_bundle(room.room_id()).await?; + Ok(()) } From 2dd43fc9de321a93ac84daa1c0b737965078f996 Mon Sep 17 00:00:00 2001 From: Skye Elliot Date: Thu, 5 Mar 2026 11:45:04 +0000 Subject: [PATCH 3/9] feat(sdk): Add proper error logging and comments Signed-off-by: Skye Elliot --- crates/matrix-sdk/src/encryption/tasks.rs | 52 +++++++++++++++---- .../src/room/shared_room_history.rs | 5 +- 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/crates/matrix-sdk/src/encryption/tasks.rs b/crates/matrix-sdk/src/encryption/tasks.rs index 82f94845d..372f7f2a3 100644 --- a/crates/matrix-sdk/src/encryption/tasks.rs +++ b/crates/matrix-sdk/src/encryption/tasks.rs @@ -474,28 +474,62 @@ impl BundleReceiverTask { /// this with bundles held in the crypto store. If all conditions outlined /// in [`shared_room_history::maybe_accept_key_bundle`], then the bundle /// will be imported. + #[tracing::instrument(skip_all)] async fn try_import_stored_bundles(client: &WeakClient) { + tracing::debug!("Checking for unimported stored room key bundles..."); + let Some(client) = client.get() else { + // The client was dropped before the worker future was first polled. return; }; let olm_machine = client.olm_machine().await; let Some(olm_machine) = olm_machine.as_ref() else { - return; - }; - let Ok(room_details) = olm_machine.store().get_all_rooms_pending_key_bundles().await else { + // The Olm machine was not initialized by the time this task is ready + // to perform its work. This is likely a bug, as this worker is only + // expected to be spawned once the client is fully ready and the + // Olm machine is available. + tracing::warn!("Skipping startup bundle checks because the Olm machine is unavailable"); return; }; + let room_details = match olm_machine.store().get_all_rooms_pending_key_bundles().await { + Ok(room_details) => room_details, + Err(e) => { + tracing::warn!("Error while fetching rooms pending key bundles: {e:?}"); + return; + } + }; + + tracing::debug!("Found {} rooms that are still pending key bundles", room_details.len()); + + // Iterate over the details that are valid for processing. For each valid + // details, check if we have the corresponding key bundle data in the + // store. If the data exists, attempt to re-import the bundle. for RoomPendingKeyBundleDetails { room_id, inviter, .. } in &room_details { let Some(room) = client.get_room(room_id) else { + // Skip processing if the room is not cached in the state store. + tracing::trace!(?room_id, "Room not available in state store, skipping..."); continue; }; - let Ok(Some(bundle)) = - olm_machine.store().get_received_room_key_bundle_data(room_id, inviter).await - else { - continue; - }; + let bundle = + match olm_machine.store().get_received_room_key_bundle_data(room_id, inviter).await + { + Ok(Some(bundle)) => bundle, + Ok(None) => { + // If the bundle data is not available, skip processing. The listener task + // will handle this case when the bundle arrives. + tracing::trace!(?room_id, "No bundle available, skipping..."); + continue; + } + Err(err) => { + tracing::warn!( + ?room_id, + "Failed to fetch received room key bundle data: {err:?}" + ); + continue; + } + }; Self::handle_bundle(&room, &(&bundle).into()).await; } } @@ -520,7 +554,7 @@ impl BundleReceiverTask { #[instrument(skip(room), fields(room_id = %room.room_id()))] async fn handle_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) { if shared_room_history::should_accept_key_bundle(room, bundle_info).await { - info!("Accepting a late key bundle."); + info!(room_id = %room.room_id(), "Accepting a late key bundle."); if let Err(e) = shared_room_history::maybe_accept_key_bundle(room, &bundle_info.sender).await diff --git a/crates/matrix-sdk/src/room/shared_room_history.rs b/crates/matrix-sdk/src/room/shared_room_history.rs index 3dcbdfa9e..03edefdac 100644 --- a/crates/matrix-sdk/src/room/shared_room_history.rs +++ b/crates/matrix-sdk/src/room/shared_room_history.rs @@ -238,8 +238,9 @@ pub(crate) async fn maybe_accept_key_bundle(room: &Room, inviter: &UserId) -> Re // olm_machine.store().clear_received_room_key_bundle_data(room.room_id(), // user_id).await?; - // If we have reached this point, the bundle was successfully imported, so - // we can clear its pending state. + // If we have reached this point, the bundle was either successfully imported, + // or was malformed and failed to deserialise. In either case, we can clear + // the room pending state. olm_machine.store().clear_room_pending_key_bundle(room.room_id()).await?; Ok(()) From 29e3b7766dd82c8f20b51c9c6f46a744c470bd05 Mon Sep 17 00:00:00 2001 From: Skye Elliot Date: Thu, 5 Mar 2026 12:09:51 +0000 Subject: [PATCH 4/9] feat(sdk): Separate startup bundle import to different task Signed-off-by: Skye Elliot --- crates/matrix-sdk/src/encryption/tasks.rs | 16 +++--- .../src/room/shared_room_history.rs | 52 ++++++++++++++----- 2 files changed, 47 insertions(+), 21 deletions(-) diff --git a/crates/matrix-sdk/src/encryption/tasks.rs b/crates/matrix-sdk/src/encryption/tasks.rs index 372f7f2a3..b408cd59d 100644 --- a/crates/matrix-sdk/src/encryption/tasks.rs +++ b/crates/matrix-sdk/src/encryption/tasks.rs @@ -431,25 +431,23 @@ impl BackupDownloadTaskListenerState { } pub(crate) struct BundleReceiverTask { - _handle: JoinHandle<()>, + _startup_handle: JoinHandle<()>, + _listen_handle: JoinHandle<()>, } impl BundleReceiverTask { pub async fn new(client: &Client) -> Self { let stream = client.encryption().historic_room_key_stream().await.expect("E2EE tasks should only be initialized once we have logged in and have access to an OlmMachine"); let weak_client = WeakClient::from_client(client); - let handle = spawn(Self::listen_task(weak_client, stream)); - - Self { _handle: handle } + Self { + _listen_handle: spawn(Self::listen_task(weak_client.clone(), stream)), + _startup_handle: spawn(Self::startup_task(weak_client)), + } } async fn listen_task(client: WeakClient, stream: impl Stream) { pin_mut!(stream); - // Before we listen for new bundles, we check if we have existing ones - // unimported. - Self::try_import_stored_bundles(&client).await; - // TODO: Listening to this stream is not enough for iOS due to the NSE killing // our OlmMachine and thus also this stream. We need to add an event handler // that will listen for the bundle event. To be able to add an event handler, @@ -475,7 +473,7 @@ impl BundleReceiverTask { /// in [`shared_room_history::maybe_accept_key_bundle`], then the bundle /// will be imported. #[tracing::instrument(skip_all)] - async fn try_import_stored_bundles(client: &WeakClient) { + async fn startup_task(client: WeakClient) { tracing::debug!("Checking for unimported stored room key bundles..."); let Some(client) = client.get() else { diff --git a/crates/matrix-sdk/src/room/shared_room_history.rs b/crates/matrix-sdk/src/room/shared_room_history.rs index 03edefdac..eb8b768c4 100644 --- a/crates/matrix-sdk/src/room/shared_room_history.rs +++ b/crates/matrix-sdk/src/room/shared_room_history.rs @@ -131,32 +131,60 @@ pub(super) async fn share_room_history(room: &Room, user_id: OwnedUserId) -> Res /// /// Returns `true` if the key bundle should be accepted, otherwise `false`. pub(crate) async fn should_accept_key_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) -> bool { - // We accept historic room key bundles up to one day after we have accepted an - // invite. - const DAY: Duration = Duration::from_secs(24 * 60 * 60); - // If we don't have any invite acceptance details, then this client wasn't the // one that accepted the invite. - let Ok(Some(RoomPendingKeyBundleDetails { invite_accepted_at, inviter, .. })) = + let Ok(Some(details)) = room.client.base_client().get_pending_key_bundle_details_for_room(room.room_id()).await else { debug!("Not accepting key bundle as there are no recorded invite acceptance details"); return false; }; + if !should_process_room_pending_key_bundle_details(&details) { + return false; + } + let state = room.state(); - let elapsed_since_join = invite_accepted_at.to_system_time().and_then(|t| t.elapsed().ok()); let bundle_sender = &bundle_info.sender; - match (state, elapsed_since_join) { - (RoomState::Joined, Some(elapsed_since_join)) => { - elapsed_since_join < DAY && bundle_sender == &inviter - } - (RoomState::Joined, None) => false, - (RoomState::Left | RoomState::Invited | RoomState::Knocked | RoomState::Banned, _) => false, + match state { + RoomState::Joined => bundle_sender == &details.inviter, + RoomState::Left | RoomState::Invited | RoomState::Knocked | RoomState::Banned => false, } } +/// Determines whether the pending key bundle details for a room should be +/// processed. +/// +/// This function checks if the invite acceptance timestamp is within the +/// allowed time window (one day). If the elapsed time since the invite was +/// accepted exceeds this window, the pending key bundle details will not be +/// processed. +/// +/// # Arguments +/// +/// * `details` - The details of the pending key bundle, including the invite +/// acceptance timestamp. +/// +/// # Returns +/// +/// Returns `true` if the pending key bundle details should be processed, +/// otherwise `false`. +pub(crate) fn should_process_room_pending_key_bundle_details( + details: &RoomPendingKeyBundleDetails, +) -> bool { + // We accept historic room key bundles up to one day after we have accepted an + // invite. + const DAY: Duration = Duration::from_secs(24 * 60 * 60); + + details + .invite_accepted_at + .to_system_time() + .and_then(|t| t.elapsed().ok()) + .map(|elapsed_since_join| elapsed_since_join < DAY) + .unwrap_or(false) +} + /// Having accepted an invite for the given room from the given user, attempt to /// find a information about a room key bundle and, if found, download the /// bundle and import the room keys, as per [MSC4268]. From 29ca03bb81cb8d85cc70d70f31229b4b88f75570 Mon Sep 17 00:00:00 2001 From: Skye Elliot Date: Thu, 5 Mar 2026 12:10:37 +0000 Subject: [PATCH 5/9] feat(sdk): Clear room pending key bundle for expired details Signed-off-by: Skye Elliot --- crates/matrix-sdk/src/encryption/tasks.rs | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/encryption/tasks.rs b/crates/matrix-sdk/src/encryption/tasks.rs index b408cd59d..ee63ee4d5 100644 --- a/crates/matrix-sdk/src/encryption/tasks.rs +++ b/crates/matrix-sdk/src/encryption/tasks.rs @@ -499,12 +499,22 @@ impl BundleReceiverTask { } }; - tracing::debug!("Found {} rooms that are still pending key bundles", room_details.len()); + // Partition the room details into two categories: those that should be + // processed and those that should be removed. + let (valid, invalid): (Vec<_>, Vec<_>) = room_details.iter().partition(|details| { + shared_room_history::should_process_room_pending_key_bundle_details(details) + }); + + tracing::debug!( + "Found {} valid and {} invalid rooms that are still pending key bundles", + valid.len(), + invalid.len(), + ); // Iterate over the details that are valid for processing. For each valid // details, check if we have the corresponding key bundle data in the // store. If the data exists, attempt to re-import the bundle. - for RoomPendingKeyBundleDetails { room_id, inviter, .. } in &room_details { + for RoomPendingKeyBundleDetails { room_id, inviter, .. } in valid { let Some(room) = client.get_room(room_id) else { // Skip processing if the room is not cached in the state store. tracing::trace!(?room_id, "Room not available in state store, skipping..."); @@ -530,6 +540,15 @@ impl BundleReceiverTask { }; Self::handle_bundle(&room, &(&bundle).into()).await; } + + // For each invalid details, clear the pending key bundle information from the + // respective room to avoid re-checking it in the future. + for RoomPendingKeyBundleDetails { room_id, .. } in &invalid { + tracing::trace!(?room_id, "Clearing pending flag for room"); + if let Err(e) = olm_machine.store().clear_room_pending_key_bundle(room_id).await { + tracing::warn!("Error clearing room pending key bundle: {e:?}"); + } + } } /// We have received a key bundle for a given room: check if we recently From 471e045ea36c730c6b5212971a1ab0bb5994d578 Mon Sep 17 00:00:00 2001 From: Skye Elliot Date: Thu, 5 Mar 2026 10:52:09 +0000 Subject: [PATCH 6/9] feat(sdk): Remove pending key bundle on HTTP failure Signed-off-by: Skye Elliot --- .../src/room/shared_room_history.rs | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/crates/matrix-sdk/src/room/shared_room_history.rs b/crates/matrix-sdk/src/room/shared_room_history.rs index eb8b768c4..fd6b008c8 100644 --- a/crates/matrix-sdk/src/room/shared_room_history.rs +++ b/crates/matrix-sdk/src/room/shared_room_history.rs @@ -22,7 +22,7 @@ use matrix_sdk_base::{ }, media::{MediaFormat, MediaRequestParameters}, }; -use ruma::{OwnedUserId, UserId, events::room::MediaSource}; +use ruma::{OwnedUserId, UserId, api::client::error::ErrorKind, events::room::MediaSource}; use tracing::{debug, info, instrument, warn}; use crate::{Error, Result, Room}; @@ -233,7 +233,7 @@ pub(crate) async fn maybe_accept_key_bundle(room: &Room, inviter: &UserId) -> Re room.client.keys_query(&req_id, request.device_keys).await?; } - let bundle_content = client + let bundle_content = match client .media() .get_media_content( &MediaRequestParameters { @@ -242,7 +242,31 @@ pub(crate) async fn maybe_accept_key_bundle(room: &Room, inviter: &UserId) -> Re }, false, ) - .await?; + .await + { + Ok(bundle_content) => bundle_content, + Err(err) => { + // If we encountered an HTTP client error, we should check the status code to + // see if we have been sent a bogus link. + let Some(err) = err + .as_ruma_api_error() + .and_then(|e| e.as_client_api_error()) + .and_then(|e| e.error_kind()) + else { + // Some other error occurred, which we may be able to recover from at the next + // client startup. + return Ok(()); + }; + + if ErrorKind::NotFound == *err { + // Clear the pending flag since checking these details again at startup are + // guaranteed to fail. + olm_machine.store().clear_room_pending_key_bundle(room.room_id()).await?; + } + + return Ok(()); + } + }; match serde_json::from_slice(&bundle_content) { Ok(bundle) => { From c62da4a026cb61792aba5e00dba0bf7c5a60a14a Mon Sep 17 00:00:00 2001 From: Skye Elliot Date: Wed, 25 Feb 2026 17:53:24 +0000 Subject: [PATCH 7/9] tests(sdk): Add integration test for bundle import crash recovery Signed-off-by: Skye Elliot --- crates/matrix-sdk/src/encryption/mod.rs | 8 + crates/matrix-sdk/src/encryption/tasks.rs | 6 + .../integration/encryption/shared_history.rs | 253 ++++++++++++++++++ 3 files changed, 267 insertions(+) diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index f8600c902..f4c50e6aa 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -785,6 +785,14 @@ impl Client { pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option> { self.olm_machine().await } + + /// Aborts the client's bundle receiver task, for testing purposes only. + pub fn abort_bundle_receiver_task(&self) { + let tasks = self.inner.e2ee.tasks.lock(); + if let Some(task) = tasks.receive_historic_room_key_bundles.as_ref() { + task.abort() + } + } } /// A high-level API to manage the client's encryption. diff --git a/crates/matrix-sdk/src/encryption/tasks.rs b/crates/matrix-sdk/src/encryption/tasks.rs index ee63ee4d5..2eb774c63 100644 --- a/crates/matrix-sdk/src/encryption/tasks.rs +++ b/crates/matrix-sdk/src/encryption/tasks.rs @@ -582,6 +582,12 @@ impl BundleReceiverTask { info!("Refusing to accept a historic room key bundle."); } } + + #[cfg(any(feature = "testing", test))] + pub(crate) fn abort(&self) { + self._startup_handle.abort(); + self._listen_handle.abort(); + } } #[cfg(all(test, not(target_family = "wasm")))] diff --git a/crates/matrix-sdk/tests/integration/encryption/shared_history.rs b/crates/matrix-sdk/tests/integration/encryption/shared_history.rs index abde7f362..2c714b125 100644 --- a/crates/matrix-sdk/tests/integration/encryption/shared_history.rs +++ b/crates/matrix-sdk/tests/integration/encryption/shared_history.rs @@ -12,6 +12,7 @@ use ruma::{ RoomVersionId, device_id, event_id, events::room::message::RoomMessageEventContent, mxc_uri, room_id, user_id, }; +use tempfile::tempdir; #[async_test] async fn test_shared_history_out_of_order() { @@ -211,3 +212,255 @@ async fn test_shared_history_out_of_order() { "The decrypted event should match the message Alice has sent" ); } + +#[cfg(feature = "sqlite")] +#[async_test] +async fn test_shared_history_crash_before_import() { + let room_id = room_id!("!test:localhost"); + let mxid = mxc_uri!("mxc://localhost/12345"); + + let alice_user_id = user_id!("@alice:localhost"); + let alice_device_id = device_id!("ALICEDEVICE"); + let bob_user_id = user_id!("@bob:localhost"); + let bob_device_id = device_id!("BOBDEVICE"); + + let matrix_mock_server = MatrixMockServer::new().await; + matrix_mock_server.mock_crypto_endpoints_preset().await; + matrix_mock_server.mock_invite_user_by_id().ok().mock_once().mount().await; + + let encryption_settings = + EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() }; + + let alice = matrix_mock_server + .client_builder_for_crypto_end_to_end(alice_user_id, alice_device_id) + .on_builder(|builder| { + builder + .with_enable_share_history_on_invite(true) + .with_encryption_settings(encryption_settings) + }) + .build() + .await; + + // Use a common store path for Bob so we can persist invite acceptance details + // over the crash. + let bob_sqlite_path = tempdir().unwrap(); + + let bob = matrix_mock_server + .client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id) + .on_builder(|builder| { + builder + .sqlite_store(bob_sqlite_path.path(), None) + .with_enable_share_history_on_invite(true) + .with_encryption_settings(encryption_settings) + }) + .build() + .await; + + matrix_mock_server.exchange_e2ee_identities(&alice, &bob).await; + + let event_factory = EventFactory::new().room(room_id).sender(alice_user_id); + let alice_member_event = event_factory.member(alice_user_id).into_raw(); + + matrix_mock_server + .mock_sync() + .ok_and_run(&alice, |builder| { + builder.add_joined_room( + JoinedRoomBuilder::new(room_id) + .add_state_event(event_factory.create(alice_user_id, RoomVersionId::V1)) + .add_state_event(event_factory.room_encryption()), + ); + }) + .await; + + let room = + alice.get_room(room_id).expect("Alice should have access to the room now that we synced"); + + let event_id = event_id!("$some_id"); + let (event_receiver, mock) = + matrix_mock_server.mock_room_send().ok_with_capture(event_id, alice_user_id); + + mock.mock_once().named("send").mount().await; + + matrix_mock_server + .mock_get_members() + .ok(vec![alice_member_event.clone()]) + .mock_once() + .mount() + .await; + + let event_id = room + .send(RoomMessageEventContent::text_plain("It's a secret to everybody")) + .await + .expect("We should be able to send an initial message") + .response + .event_id; + + matrix_mock_server + .mock_authenticated_media_config() + .ok_default() + .mock_once() + .named("media_config") + .mount() + .await; + + let (receiver, upload_mock) = matrix_mock_server.mock_upload().ok_with_capture(mxid); + upload_mock.mock_once().mount().await; + + let (_guard, bundle_info) = matrix_mock_server.mock_capture_put_to_device(alice_user_id).await; + + room.invite_user_by_id(bob_user_id).await.expect("We should be able to invite Bob"); + let bundle = receiver.await.expect("We should have received a bundle now."); + + let bob_member_event = event_factory.member(alice_user_id).invited(bob_user_id); + + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_invited_room( + InvitedRoomBuilder::new(room_id) + .add_state_event(alice_member_event.cast()) + .add_state_event(bob_member_event), + ); + }) + .await; + + let bob_room = bob.get_room(room_id).expect("Bob should have access to the invited room"); + + matrix_mock_server.mock_room_join(room_id).ok().mock_once().named("join").mount().await; + bob_room.join().await.expect("Bob should be able to join the room"); + + // Bob should have persisted the room bundle state. + let details = bob + .get_pending_key_bundle_details_for_room(room_id) + .await + .expect("Bob should be able to get the pending key bundle details for the room") + .expect("We should have stored invite acceptance details"); + + assert_eq!( + details.inviter, + alice.user_id().unwrap(), + "We should have recorded that Alice has invited us" + ); + + let bundle_info = bundle_info.await; + matrix_mock_server + .mock_authed_media_download() + .expect_any_access_token() + .ok_bytes(bundle) + .mock_once() + .named("media_download") + .mount() + .await; + + let mut bundle_stream = bob + .encryption() + .historic_room_key_stream() + .await + .expect("We should be able to get the bundle stream"); + + // Abort the bundle receiver, so we can properly test the crash recovery + // mechanism. + bob.abort_bundle_receiver_task(); + + // Bob now receives the bundle info ... + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_to_device_event( + bundle_info + .deserialize_as() + .expect("We should be able to deserialize the bundle info"), + ); + }) + .await; + + let bundle_notification = bundle_stream + .next() + .now_or_never() + .flatten() + .expect("We should have been notified about the received bundle"); + + // .. but crashes before they finish importing the bundle. + drop(bob); + + // Bob restarts their client. + let bob_restarted = matrix_mock_server + .client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id) + .on_builder(|builder| { + builder + .sqlite_store(bob_sqlite_path.path(), None) + .with_enable_share_history_on_invite(true) + .with_encryption_settings(encryption_settings) + }) + .build() + .await; + + // Bob's restarted client should be able to access the persisted pending key + // bundle details. + let details = bob_restarted + .get_pending_key_bundle_details_for_room(room_id) + .await + .expect("Bob should be able to get the pending key bundle details for the room") + .expect("We should have stored invite acceptance details"); + + assert_eq!( + details.inviter, + alice.user_id().unwrap(), + "The persisted pending key bundle details should be identical" + ); + + assert_eq!(bundle_notification.sender, alice_user_id); + assert_eq!(bundle_notification.room_id, room_id); + + // Bob's restarted client should successfully import the room keys. + let mut room_key_stream = bob_restarted + .encryption() + .room_keys_received_stream() + .await + .expect("We should be able to listen to received room keys"); + + assert_next_matches_with_timeout!(room_key_stream, 1000, Ok(room_key_infos) => assert_eq!(room_key_infos.len(), 1)); + + let event = event_receiver.await.expect("We should have received Alice's event"); + + matrix_mock_server + .mock_room_event() + .room(room_id) + .match_event_id() + .ok(TimelineEvent::from_utd( + event, + UnableToDecryptInfo { session_id: None, reason: UnableToDecryptReason::Unknown }, + )) + .mock_once() + .mount() + .await; + + let event = bob_restarted + .get_room(room_id) + .expect("Bob should have access to the room after restart") + .event(&event_id, None) + .await + .expect("Bob should be able to fetch the event Alice has sent"); + + let encryption_info = event.encryption_info().expect("Event did not have encryption info"); + + // Check Bob stored information about the key forwarder. + let forwarder_info = encryption_info.forwarder.as_ref().unwrap(); + assert_eq!(forwarder_info.user_id, alice_user_id); + assert_eq!(forwarder_info.device_id, alice_device_id); + + assert_decrypted_message_eq!( + event, + "It's a secret to everybody", + "The decrypted event should match the message Alice has sent" + ); + + assert!( + bob_restarted + .get_pending_key_bundle_details_for_room(room_id) + .await + .expect("Bob should be able to get the pending key bundle details for the room") + .is_none(), + "Bob should have cleared the pending key bundle details for the room" + ); +} From c864fac82319aa7efa288e26bee877b89a05a545 Mon Sep 17 00:00:00 2001 From: Skye Elliot Date: Wed, 4 Mar 2026 14:19:59 +0000 Subject: [PATCH 8/9] test(sdk): De-duplicate shared history integration tests Signed-off-by: Skye Elliot --- .../integration/encryption/shared_history.rs | 204 +++++++----------- 1 file changed, 73 insertions(+), 131 deletions(-) diff --git a/crates/matrix-sdk/tests/integration/encryption/shared_history.rs b/crates/matrix-sdk/tests/integration/encryption/shared_history.rs index 2c714b125..13d00533f 100644 --- a/crates/matrix-sdk/tests/integration/encryption/shared_history.rs +++ b/crates/matrix-sdk/tests/integration/encryption/shared_history.rs @@ -1,21 +1,53 @@ use futures_util::{FutureExt, StreamExt}; use matrix_sdk::{ - assert_decrypted_message_eq, assert_next_matches_with_timeout, + Client, assert_decrypted_message_eq, assert_next_matches_with_timeout, deserialized_responses::{TimelineEvent, UnableToDecryptInfo, UnableToDecryptReason}, encryption::EncryptionSettings, test_utils::mocks::MatrixMockServer, }; +use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedToDeviceEvent; use matrix_sdk_test::{ InvitedRoomBuilder, JoinedRoomBuilder, async_test, event_factory::EventFactory, }; use ruma::{ - RoomVersionId, device_id, event_id, events::room::message::RoomMessageEventContent, mxc_uri, - room_id, user_id, + OwnedEventId, RoomVersionId, device_id, event_id, + events::{AnySyncTimelineEvent, room::message::RoomMessageEventContent}, + mxc_uri, room_id, + serde::Raw, + user_id, }; use tempfile::tempdir; -#[async_test] -async fn test_shared_history_out_of_order() { +/// Helper struct to collect test data together. +struct Test { + matrix_mock_server: MatrixMockServer, + alice: Client, + bob: Client, + bob_room: matrix_sdk::Room, + /// The encrypted event Alice sent before Bob joined. + event_id: OwnedEventId, + /// The raw bytes of the uploaded key bundle. + bundle: Vec, + /// The captured to-device event carrying bundle info. + bundle_info: Raw, + /// Receiver for the original event sent by Alice. + event_receiver: tokio::sync::oneshot::Receiver>, +} + +/// Sets up the shared-history scenario up to the point where Bob has joined the +/// room and the bundle info to-device event is ready to be delivered. +/// +/// Both tests below share this identical preamble: +/// +/// - Server + Alice + Bob clients created +/// - E2EE identities exchanged +/// - Alice creates an encrypted room and sends a message +/// - Alice invites Bob, triggering key bundle upload +/// - Bob syncs the invite and joins +/// - Bundle details are verified +async fn setup_shared_history( + bob_builder_fn: impl FnOnce(matrix_sdk::ClientBuilder) -> matrix_sdk::ClientBuilder, +) -> Test { let room_id = room_id!("!test:localhost"); let mxid = mxc_uri!("mxc://localhost/12345"); @@ -44,7 +76,7 @@ async fn test_shared_history_out_of_order() { let bob = matrix_mock_server .client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id) .on_builder(|builder| { - builder + bob_builder_fn(builder) .with_enable_share_history_on_invite(true) .with_encryption_settings(encryption_settings) }) @@ -106,12 +138,6 @@ async fn test_shared_history_out_of_order() { room.invite_user_by_id(bob_user_id).await.expect("We should be able to invite Bob"); let bundle = receiver.await.expect("We should have received a bundle now."); - let mut bundle_stream = bob - .encryption() - .historic_room_key_stream() - .await - .expect("We should be able to get the bundle stream"); - let bob_member_event = event_factory.member(alice_user_id).invited(bob_user_id); matrix_mock_server @@ -143,6 +169,33 @@ async fn test_shared_history_out_of_order() { ); let bundle_info = bundle_info.await; + + Test { matrix_mock_server, alice, bob, bob_room, event_id, bundle, bundle_info, event_receiver } +} + +#[async_test] +async fn test_shared_history_out_of_order() { + let room_id = room_id!("!test:localhost"); + let alice_user_id = user_id!("@alice:localhost"); + let alice_device_id = device_id!("ALICEDEVICE"); + + let Test { + matrix_mock_server, + bob, + bob_room, + event_id, + bundle, + bundle_info, + event_receiver, + .. + } = setup_shared_history(|builder| builder).await; + + let mut bundle_stream = bob + .encryption() + .historic_room_key_stream() + .await + .expect("We should be able to get the bundle stream"); + matrix_mock_server .mock_authed_media_download() .expect_any_access_token() @@ -217,132 +270,21 @@ async fn test_shared_history_out_of_order() { #[async_test] async fn test_shared_history_crash_before_import() { let room_id = room_id!("!test:localhost"); - let mxid = mxc_uri!("mxc://localhost/12345"); - let alice_user_id = user_id!("@alice:localhost"); let alice_device_id = device_id!("ALICEDEVICE"); let bob_user_id = user_id!("@bob:localhost"); let bob_device_id = device_id!("BOBDEVICE"); - let matrix_mock_server = MatrixMockServer::new().await; - matrix_mock_server.mock_crypto_endpoints_preset().await; - matrix_mock_server.mock_invite_user_by_id().ok().mock_once().mount().await; - - let encryption_settings = - EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() }; - - let alice = matrix_mock_server - .client_builder_for_crypto_end_to_end(alice_user_id, alice_device_id) - .on_builder(|builder| { - builder - .with_enable_share_history_on_invite(true) - .with_encryption_settings(encryption_settings) - }) - .build() - .await; - // Use a common store path for Bob so we can persist invite acceptance details // over the crash. let bob_sqlite_path = tempdir().unwrap(); + let encryption_settings = + EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() }; - let bob = matrix_mock_server - .client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id) - .on_builder(|builder| { - builder - .sqlite_store(bob_sqlite_path.path(), None) - .with_enable_share_history_on_invite(true) - .with_encryption_settings(encryption_settings) - }) - .build() - .await; + let Test { + matrix_mock_server, alice, bob, event_id, bundle, bundle_info, event_receiver, .. + } = setup_shared_history(|builder| builder.sqlite_store(bob_sqlite_path.path(), None)).await; - matrix_mock_server.exchange_e2ee_identities(&alice, &bob).await; - - let event_factory = EventFactory::new().room(room_id).sender(alice_user_id); - let alice_member_event = event_factory.member(alice_user_id).into_raw(); - - matrix_mock_server - .mock_sync() - .ok_and_run(&alice, |builder| { - builder.add_joined_room( - JoinedRoomBuilder::new(room_id) - .add_state_event(event_factory.create(alice_user_id, RoomVersionId::V1)) - .add_state_event(event_factory.room_encryption()), - ); - }) - .await; - - let room = - alice.get_room(room_id).expect("Alice should have access to the room now that we synced"); - - let event_id = event_id!("$some_id"); - let (event_receiver, mock) = - matrix_mock_server.mock_room_send().ok_with_capture(event_id, alice_user_id); - - mock.mock_once().named("send").mount().await; - - matrix_mock_server - .mock_get_members() - .ok(vec![alice_member_event.clone()]) - .mock_once() - .mount() - .await; - - let event_id = room - .send(RoomMessageEventContent::text_plain("It's a secret to everybody")) - .await - .expect("We should be able to send an initial message") - .response - .event_id; - - matrix_mock_server - .mock_authenticated_media_config() - .ok_default() - .mock_once() - .named("media_config") - .mount() - .await; - - let (receiver, upload_mock) = matrix_mock_server.mock_upload().ok_with_capture(mxid); - upload_mock.mock_once().mount().await; - - let (_guard, bundle_info) = matrix_mock_server.mock_capture_put_to_device(alice_user_id).await; - - room.invite_user_by_id(bob_user_id).await.expect("We should be able to invite Bob"); - let bundle = receiver.await.expect("We should have received a bundle now."); - - let bob_member_event = event_factory.member(alice_user_id).invited(bob_user_id); - - matrix_mock_server - .mock_sync() - .ok_and_run(&bob, |builder| { - builder.add_invited_room( - InvitedRoomBuilder::new(room_id) - .add_state_event(alice_member_event.cast()) - .add_state_event(bob_member_event), - ); - }) - .await; - - let bob_room = bob.get_room(room_id).expect("Bob should have access to the invited room"); - - matrix_mock_server.mock_room_join(room_id).ok().mock_once().named("join").mount().await; - bob_room.join().await.expect("Bob should be able to join the room"); - - // Bob should have persisted the room bundle state. - let details = bob - .get_pending_key_bundle_details_for_room(room_id) - .await - .expect("Bob should be able to get the pending key bundle details for the room") - .expect("We should have stored invite acceptance details"); - - assert_eq!( - details.inviter, - alice.user_id().unwrap(), - "We should have recorded that Alice has invited us" - ); - - let bundle_info = bundle_info.await; matrix_mock_server .mock_authed_media_download() .expect_any_access_token() @@ -380,6 +322,9 @@ async fn test_shared_history_crash_before_import() { .flatten() .expect("We should have been notified about the received bundle"); + assert_eq!(bundle_notification.sender, alice_user_id); + assert_eq!(bundle_notification.room_id, room_id); + // .. but crashes before they finish importing the bundle. drop(bob); @@ -409,9 +354,6 @@ async fn test_shared_history_crash_before_import() { "The persisted pending key bundle details should be identical" ); - assert_eq!(bundle_notification.sender, alice_user_id); - assert_eq!(bundle_notification.room_id, room_id); - // Bob's restarted client should successfully import the room keys. let mut room_key_stream = bob_restarted .encryption() From 2ca717b4af97be09ea1e7cfdb6017d9f6ceba83d Mon Sep 17 00:00:00 2001 From: Skye Elliot Date: Wed, 25 Feb 2026 16:37:45 +0000 Subject: [PATCH 9/9] docs: Update CHANGELOGs Signed-off-by: Skye Elliot --- crates/matrix-sdk/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/matrix-sdk/CHANGELOG.md b/crates/matrix-sdk/CHANGELOG.md index 136c6674a..eec6e5ce4 100644 --- a/crates/matrix-sdk/CHANGELOG.md +++ b/crates/matrix-sdk/CHANGELOG.md @@ -8,6 +8,9 @@ All notable changes to this project will be documented in this file. ### Features +- Attempt to import stored room key bundles for rooms awaiting bundles at + client startup. + ([#6215](https://github.com/matrix-org/matrix-rust-sdk/pull/6215)) - Add `OAuth::cached_server_metadata()` that caches the authorization server metadata for a while. ([#6217](https://github.com/matrix-org/matrix-rust-sdk/pull/6217))