Merge pull request #5322 from matrix-org/poljar/shared-history/out-of-order

feat(sdk): Add a tasks that listens for historic room keys if they arrive out of order
This commit is contained in:
Damir Jelić
2025-07-17 16:52:13 +02:00
committed by GitHub
13 changed files with 641 additions and 48 deletions

View File

@@ -6,9 +6,13 @@ All notable changes to this project will be documented in this file.
## [Unreleased] - ReleaseDate
### Breaking changes:
### Features
- `OAuth::login` now allows requesting additional scopes for the authorization code grant.
- Add support to accept historic room key bundles that arrive out of order, i.e.
the bundle arrives after the invite has already been accepted.
([#5322](https://github.com/matrix-org/matrix-rust-sdk/pull/5322))
- [**breaking**] `OAuth::login` now allows requesting additional scopes for the authorization code grant.
([#5395](https://github.com/matrix-org/matrix-rust-sdk/pull/5395))
## [0.13.0] - 2025-07-10

View File

@@ -802,7 +802,7 @@ impl MatrixAuth {
_ => None,
};
self.client.encryption().spawn_initialization_task(auth_data);
self.client.encryption().spawn_initialization_task(auth_data).await;
}
Ok(())

View File

@@ -817,7 +817,7 @@ impl OAuth {
}
#[cfg(feature = "e2e-encryption")]
self.client.encryption().spawn_initialization_task(None);
self.client.encryption().spawn_initialization_task(None).await;
Ok(())
}
@@ -1031,7 +1031,7 @@ impl OAuth {
self.enable_cross_process_lock().await.map_err(OAuthError::from)?;
#[cfg(feature = "e2e-encryption")]
self.client.encryption().spawn_initialization_task(None);
self.client.encryption().spawn_initialization_task(None).await;
}
Ok(())

View File

@@ -253,7 +253,7 @@ impl<'a> IntoFuture for LoginWithQrCode<'a> {
// ourselves see us as verified and the recovery/backup states will
// be known. If we did receive all the secrets in the secrets
// bundle, then backups will be enabled after this step as well.
self.client.encryption().spawn_initialization_task(None);
self.client.encryption().spawn_initialization_task(None).await;
self.client.encryption().wait_for_e2ee_initialization_tasks().await;
trace!("successfully logged in and enabled E2EE.");

View File

@@ -414,7 +414,7 @@ impl ClientInner {
let client = Arc::new(client);
#[cfg(feature = "e2e-encryption")]
client.e2ee.initialize_room_key_tasks(&client);
client.e2ee.initialize_tasks(&client);
let _ = client
.event_cache

View File

@@ -62,6 +62,7 @@ use ruma::{
#[cfg(feature = "experimental-send-custom-to-device")]
use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
use serde::Deserialize;
use tasks::BundleReceiverTask;
use tokio::sync::{Mutex, RwLockReadGuard};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tracing::{debug, error, instrument, trace, warn};
@@ -134,7 +135,7 @@ impl EncryptionData {
}
}
pub fn initialize_room_key_tasks(&self, client: &Arc<ClientInner>) {
pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
let weak_client = WeakClient::from_inner(client);
let mut tasks = self.tasks.lock();
@@ -1685,10 +1686,20 @@ impl Encryption {
/// there is a proposal (MSC3967) to remove this requirement, which would
/// allow for the initial upload of cross-signing keys without
/// authentication, rendering this parameter obsolete.
pub(crate) fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
// It's fine to be async here as we're only getting the lock protecting the
// `OlmMachine`. Since the lock shouldn't be that contested right after logging
// in we won't delay the login or restoration of the Client.
let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
Some(BundleReceiverTask::new(&self.client).await)
} else {
None
};
let mut tasks = self.client.inner.e2ee.tasks.lock();
let this = self.clone();
tasks.setup_e2ee = Some(spawn(async move {
// Update the current state first, so we don't have to wait for the result of
// network requests
@@ -1707,6 +1718,8 @@ impl Encryption {
error!("Couldn't setup and resume recovery {e:?}");
}
}));
tasks.receive_historic_room_key_bundles = bundle_receiver_task;
}
/// Waits for end-to-end encryption initialization tasks to finish, if any

View File

@@ -14,23 +14,26 @@
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use matrix_sdk_base::{
crypto::store::types::RoomKeyBundleInfo, InviteAcceptanceDetails, RoomState,
};
use matrix_sdk_common::failures_cache::FailuresCache;
use ruma::{
events::room::encrypted::{EncryptedEventScheme, OriginalSyncRoomEncryptedEvent},
serde::Raw,
OwnedEventId, OwnedRoomId,
};
use tokio::sync::{
mpsc::{self, UnboundedReceiver},
Mutex,
};
use tracing::{debug, trace, warn};
use tokio::sync::{mpsc, Mutex};
use tracing::{debug, info, instrument, trace, warn};
use crate::{
client::WeakClient,
encryption::backups::UploadState,
executor::{spawn, JoinHandle},
Client,
room::shared_room_history,
Client, Room,
};
/// A cache of room keys we already downloaded.
@@ -41,6 +44,7 @@ pub(crate) struct ClientTasks {
pub(crate) upload_room_keys: Option<BackupUploadingTask>,
pub(crate) download_room_keys: Option<BackupDownloadTask>,
pub(crate) update_recovery_state_after_backup: Option<JoinHandle<()>>,
pub(crate) receive_historic_room_key_bundles: Option<BundleReceiverTask>,
pub(crate) setup_e2ee: Option<JoinHandle<()>>,
}
@@ -72,7 +76,7 @@ impl BackupUploadingTask {
let _ = self.sender.send(());
}
pub(crate) async fn listen(client: WeakClient, mut receiver: UnboundedReceiver<()>) {
pub(crate) async fn listen(client: WeakClient, mut receiver: mpsc::UnboundedReceiver<()>) {
while receiver.recv().await.is_some() {
if let Some(client) = client.get() {
let upload_progress = &client.inner.e2ee.backup_state.upload_progress;
@@ -176,7 +180,10 @@ impl BackupDownloadTask {
/// # Arguments
///
/// * `receiver` - The source of incoming [`RoomKeyDownloadRequest`]s.
async fn listen(client: WeakClient, mut receiver: UnboundedReceiver<RoomKeyDownloadRequest>) {
async fn listen(
client: WeakClient,
mut receiver: mpsc::UnboundedReceiver<RoomKeyDownloadRequest>,
) {
let state = Arc::new(Mutex::new(BackupDownloadTaskListenerState::new(client)));
while let Some(room_key_download_request) = receiver.recv().await {
@@ -385,15 +392,97 @@ impl BackupDownloadTaskListenerState {
}
}
pub(crate) struct BundleReceiverTask {
_handle: JoinHandle<()>,
}
impl BundleReceiverTask {
pub async fn new(client: &Client) -> Self {
let stream = client.encryption().historic_room_key_stream().await.expect("E2EE tasks should only be initialized once we have logged in and have access to an OlmMachine");
let weak_client = WeakClient::from_client(client);
let handle = spawn(Self::listen_task(weak_client, stream));
Self { _handle: handle }
}
async fn listen_task(client: WeakClient, stream: impl Stream<Item = RoomKeyBundleInfo>) {
pin_mut!(stream);
// TODO: Listening to this stream is not enough for iOS due to the NSE killing
// our OlmMachine and thus also this stream. We need to add an event handler
// that will listen for the bundle event. To be able to add an event handler,
// we'll have to implement the bundle event in Ruma.
while let Some(bundle_info) = stream.next().await {
let Some(client) = client.get() else {
// The client was dropped while we were waiting on the stream. Let's end the
// loop, since this means that the application has shut down.
break;
};
let Some(room) = client.get_room(&bundle_info.room_id) else {
warn!(room_id = %bundle_info.room_id, "Received a historic room key bundle for an unknown room");
continue;
};
Self::handle_bundle(&room, &bundle_info).await;
}
}
#[instrument(skip(room), fields(room_id = %room.room_id()))]
async fn handle_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) {
if Self::should_accept_bundle(room, bundle_info) {
info!("Accepting a late key bundle.");
if let Err(e) =
shared_room_history::maybe_accept_key_bundle(room, &bundle_info.sender).await
{
warn!("Couldn't accept a late room key bundle {e:?}");
}
} else {
info!("Refusing to accept a historic room key bundle.");
}
}
fn should_accept_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) -> bool {
// We accept historic room key bundles up to one day after we have accepted an
// invite.
const DAY: Duration = Duration::from_secs(24 * 60 * 60);
// If we don't have any invite acceptance details, then this client wasn't the
// one that accepted the invite.
let Some(InviteAcceptanceDetails { invite_accepted_at, inviter }) =
room.invite_acceptance_details()
else {
return false;
};
let state = room.state();
let elapsed_since_join = invite_accepted_at.to_system_time().and_then(|t| t.elapsed().ok());
let bundle_sender = &bundle_info.sender;
match (state, elapsed_since_join) {
(RoomState::Joined, Some(elapsed_since_join)) => {
elapsed_since_join < DAY && bundle_sender == &inviter
}
(RoomState::Joined, None) => false,
(RoomState::Left | RoomState::Invited | RoomState::Knocked | RoomState::Banned, _) => {
false
}
}
}
}
#[cfg(all(test, not(target_family = "wasm")))]
mod test {
use matrix_sdk_test::async_test;
use ruma::{event_id, room_id};
use matrix_sdk_test::{
async_test, event_factory::EventFactory, InvitedRoomBuilder, JoinedRoomBuilder,
};
use ruma::{event_id, room_id, user_id};
use serde_json::json;
use wiremock::MockServer;
use super::*;
use crate::test_utils::logged_in_client;
use crate::test_utils::{logged_in_client, mocks::MatrixMockServer};
// Test that, if backups are not enabled, we don't incorrectly mark a room key
// as downloaded.
@@ -451,4 +540,81 @@ mod test {
)
}
}
/// Test that ensures that we only accept a bundle if a certain set of
/// conditions is met.
#[async_test]
async fn test_should_accept_bundle() {
let server = MatrixMockServer::new().await;
let alice_user_id = user_id!("@alice:localhost");
let bob_user_id = user_id!("@bob:localhost");
let joined_room_id = room_id!("!joined:localhost");
let invited_rom_id = room_id!("!invited:localhost");
let client = server
.client_builder()
.logged_in_with_token("ABCD".to_owned(), alice_user_id.into(), "DEVICEID".into())
.build()
.await;
let event_factory = EventFactory::new().room(invited_rom_id);
let bob_member_event = event_factory.member(bob_user_id).into_raw_timeline();
let alice_member_event =
event_factory.member(bob_user_id).invited(alice_user_id).into_raw_timeline();
server
.mock_sync()
.ok_and_run(&client, |builder| {
builder.add_joined_room(JoinedRoomBuilder::new(joined_room_id)).add_invited_room(
InvitedRoomBuilder::new(invited_rom_id)
.add_state_event(bob_member_event.cast())
.add_state_event(alice_member_event.cast()),
);
})
.await;
let room =
client.get_room(joined_room_id).expect("We should have access to our joined room now");
assert!(
room.invite_acceptance_details().is_none(),
"We shouldn't have any invite acceptance details if we didn't join the room on this Client"
);
let bundle_info = RoomKeyBundleInfo {
sender: bob_user_id.to_owned(),
room_id: joined_room_id.to_owned(),
};
assert!(
!BundleReceiverTask::should_accept_bundle(&room, &bundle_info),
"We should not acceept a bundle if we did not join the room from this Client"
);
let invited_room =
client.get_room(invited_rom_id).expect("We should have access to our invited room now");
assert!(
!BundleReceiverTask::should_accept_bundle(&invited_room, &bundle_info),
"We should not accept a bundle if we didn't join the room."
);
server.mock_room_join(invited_rom_id).ok().mock_once().mount().await;
let room = client
.join_room_by_id(invited_rom_id)
.await
.expect("We should be able to join the invited room");
let details = room
.invite_acceptance_details()
.expect("We should have stored the invite acceptance details");
assert_eq!(details.inviter, bob_user_id, "We should have recorded that Bob has invited us");
assert!(
BundleReceiverTask::should_accept_bundle(&room, &bundle_info),
"We should accept a bundle if we just joined the room and did so from this very Client object"
);
}
}

View File

@@ -63,6 +63,23 @@ impl MockClientBuilder {
self
}
/// Enable the share history on invite feature for the Client.
#[cfg(feature = "e2e-encryption")]
pub fn enable_share_history_on_invite(mut self) -> Self {
self.builder = self.builder.with_enable_share_history_on_invite(true);
self
}
/// Use the given encryption settings with the test client.
#[cfg(feature = "e2e-encryption")]
pub fn with_encryption_settings(
mut self,
settings: crate::encryption::EncryptionSettings,
) -> Self {
self.builder = self.builder.with_encryption_settings(settings);
self
}
/// Set the cached server versions in the client.
pub fn server_versions(mut self, versions: Vec<MatrixVersion>) -> Self {
self.server_versions = ServerVersions::Custom(versions);

View File

@@ -110,6 +110,29 @@ impl MatrixMockServer {
known_otks.entry(user_id).or_default().entry(device_id).or_default().clear();
}
/// Ensure that the given clients are aware of each others public
/// identities.
pub async fn exchange_e2ee_identities(&self, alice: &Client, bob: &Client) {
let alice_user_id = alice.user_id().expect("Alice should have a user ID configured");
let bob_user_id = bob.user_id().expect("Bob should have a user ID configured");
// Have Alice track Bob, so she queries his keys later.
alice.update_tracked_users_for_testing([bob_user_id]).await;
// let bob be aware of Alice keys in order to be able to decrypt custom
// to-device (the device keys check are deferred for `m.room.key` so this is not
// needed for sending room messages for example).
bob.update_tracked_users_for_testing([alice_user_id]).await;
// Have Alice and Bob upload their signed device keys.
self.mock_sync().ok_and_run(alice, |_x| {}).await;
self.mock_sync().ok_and_run(bob, |_x| {}).await;
// Run a sync so we do send outgoing requests, including the /keys/query for
// getting bob's identity.
self.mock_sync().ok_and_run(alice, |_x| {}).await;
}
/// Utility to properly setup two clients. These two clients will know about
/// each others (alice will have downloaded bob device keys).
pub async fn set_up_alice_and_bob_for_encryption(&self) -> (Client, Client) {
@@ -126,21 +149,7 @@ impl MatrixMockServer {
let bob =
self.client_builder_for_crypto_end_to_end(&bob_user_id, &bob_device_id).build().await;
// Have Alice track Bob, so she queries his keys later.
alice.update_tracked_users_for_testing([bob_user_id.as_ref()]).await;
// let bob be aware of Alice keys in order to be able to decrypt custom
// to-device (the device keys check are deferred for `m.room.key` so this is not
// needed for sending room messages for example).
bob.update_tracked_users_for_testing([alice_user_id.as_ref()]).await;
// Have Alice and Bob upload their signed device keys.
self.mock_sync().ok_and_run(&alice, |_x| {}).await;
self.mock_sync().ok_and_run(&bob, |_x| {}).await;
// Run a sync so we do send outgoing requests, including the /keys/query for
// getting bob's identity.
self.mock_sync().ok_and_run(&alice, |_x| {}).await;
self.exchange_e2ee_identities(&alice, &bob).await;
(alice, bob)
}

View File

@@ -35,17 +35,19 @@ use ruma::{
directory::PublicRoomsChunk,
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
events::{
receipt::ReceiptThread, room::member::RoomMemberEvent, AnyStateEvent, AnyTimelineEvent,
GlobalAccountDataEventType, MessageLikeEventType, RoomAccountDataEventType, StateEventType,
receipt::ReceiptThread, room::member::RoomMemberEvent, AnyStateEvent, AnySyncTimelineEvent,
AnyTimelineEvent, GlobalAccountDataEventType, MessageLikeEventType,
RoomAccountDataEventType, StateEventType,
},
media::Method,
serde::Raw,
time::Duration,
DeviceId, EventId, MxcUri, OwnedDeviceId, OwnedEventId, OwnedOneTimeKeyId, OwnedRoomId,
OwnedUserId, RoomId, ServerName, UserId,
DeviceId, EventId, MilliSecondsSinceUnixEpoch, MxcUri, OwnedDeviceId, OwnedEventId,
OwnedOneTimeKeyId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId,
};
use serde::Deserialize;
use serde_json::{json, Value};
use serde_json::{from_value, json, Value};
use tokio::sync::oneshot::{self, Receiver};
use wiremock::{
matchers::{body_json, body_partial_json, header, method, path, path_regex, query_param},
Mock, MockBuilder, MockGuard, MockServer, Request, Respond, ResponseTemplate, Times,
@@ -342,6 +344,36 @@ impl MatrixMockServer {
.expect_default_access_token()
}
/// Creates a prebuilt mock for joining a room.
///
/// # Examples
///
/// ```
/// # tokio_test::block_on(async {
/// use matrix_sdk::{ruma::{room_id, event_id}, test_utils::mocks::MatrixMockServer};
/// use serde_json::json;
///
/// let mock_server = MatrixMockServer::new().await;
/// let client = mock_server.client_builder().build().await;
/// let room_id = room_id!("!test:localhost");
///
/// mock_server.mock_room_join(room_id).ok().mount();
///
/// let room = client.join_room_by_id(room_id).await?;
///
/// assert_eq!(
/// room_id,
/// room.room_id(),
/// "The room ID we mocked should match the one we received when we joined the room"
/// );
/// # anyhow::Ok(()) });
/// ```
pub fn mock_room_join(&self, room_id: &RoomId) -> MockEndpoint<'_, JoinRoomEndpoint> {
let mock = Mock::given(method("POST"))
.and(path_regex(format!("^/_matrix/client/v3/rooms/{room_id}/join")));
self.mock_endpoint(mock, JoinRoomEndpoint { room_id: room_id.to_owned() })
}
/// Creates a prebuilt mock for sending an event in a room.
///
/// Note: works with *any* room.
@@ -382,7 +414,7 @@ impl MatrixMockServer {
pub fn mock_room_send(&self) -> MockEndpoint<'_, RoomSendEndpoint> {
let mock = Mock::given(method("PUT"))
.and(path_regex(r"^/_matrix/client/v3/rooms/.*/send/.*".to_owned()));
self.mock_endpoint(mock, RoomSendEndpoint).expect_default_access_token()
self.mock_endpoint(mock, RoomSendEndpoint)
}
/// Creates a prebuilt mock for sending a state event in a room.
@@ -561,7 +593,7 @@ impl MatrixMockServer {
/// Create a prebuilt mock for uploading media.
pub fn mock_upload(&self) -> MockEndpoint<'_, UploadEndpoint> {
let mock = Mock::given(method("POST")).and(path("/_matrix/media/v3/upload"));
self.mock_endpoint(mock, UploadEndpoint).expect_default_access_token()
self.mock_endpoint(mock, UploadEndpoint)
}
/// Create a prebuilt mock for resolving room aliases.
@@ -1197,7 +1229,7 @@ impl MatrixMockServer {
&self,
) -> MockEndpoint<'_, AuthenticatedMediaConfigEndpoint> {
let mock = Mock::given(method("GET")).and(path("/_matrix/client/v1/media/config"));
self.mock_endpoint(mock, AuthenticatedMediaConfigEndpoint).expect_default_access_token()
self.mock_endpoint(mock, AuthenticatedMediaConfigEndpoint)
}
/// Create a prebuilt mock for the endpoint used to log into a session.
@@ -1832,6 +1864,102 @@ impl<'a> MockEndpoint<'a, RoomSendEndpoint> {
pub fn ok(self, returned_event_id: impl Into<OwnedEventId>) -> MatrixMock<'a> {
self.ok_with_event_id(returned_event_id.into())
}
/// Returns a send endpoint that emulates success, i.e. the event has been
/// sent with the given event id.
///
/// The sent event is captured and can be accessed using the returned
/// [`Receiver`]. The [`Receiver`] is valid only for a send call. The given
/// `event_sender` are added to the event JSON.
///
/// # Examples
///
/// ```no_run
/// # tokio_test::block_on(async {
/// use matrix_sdk::{
/// ruma::{
/// event_id, events::room::message::RoomMessageEventContent, room_id,
/// },
/// test_utils::mocks::MatrixMockServer,
/// };
/// use matrix_sdk_test::JoinedRoomBuilder;
///
/// let room_id = room_id!("!room_id:localhost");
/// let event_id = event_id!("$some_id");
///
/// let server = MatrixMockServer::new().await;
/// let client = server.client_builder().build().await;
///
/// let user_id = client.user_id().expect("We should have a user ID by now");
///
/// let (receiver, mock) =
/// server.mock_room_send().ok_with_capture(event_id, user_id);
///
/// server
/// .mock_sync()
/// .ok_and_run(&client, |builder| {
/// builder.add_joined_room(JoinedRoomBuilder::new(room_id));
/// })
/// .await;
///
/// // Mock any additional endpoints that might be needed to send the message.
///
/// let room = client
/// .get_room(room_id)
/// .expect("We should have access to our room now");
///
/// let event_id = room
/// .send(RoomMessageEventContent::text_plain("It's a secret to everybody"))
/// .await
/// .expect("We should be able to send an initial message")
/// .event_id;
///
/// let event = receiver.await?;
/// # anyhow::Ok(()) });
/// ```
pub fn ok_with_capture(
self,
returned_event_id: impl Into<OwnedEventId>,
event_sender: impl Into<OwnedUserId>,
) -> (Receiver<Raw<AnySyncTimelineEvent>>, MatrixMock<'a>) {
let event_id = returned_event_id.into();
let event_sender = event_sender.into();
let (sender, receiver) = oneshot::channel();
let sender = Arc::new(Mutex::new(Some(sender)));
let ret = self.respond_with(move |request: &Request| {
if let Some(sender) = sender.lock().unwrap().take() {
let uri = &request.url;
let path_segments = uri.path_segments();
let maybe_event_type = path_segments.and_then(|mut s| s.nth_back(1));
let event_type = maybe_event_type
.as_ref()
.map(|&e| e.to_owned())
.unwrap_or("m.room.message".to_owned());
let body: Value =
request.body_json().expect("The received body should be valid JSON");
let event = json!({
"event_id": event_id.clone(),
"sender": event_sender,
"type": event_type,
"origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
"content": body,
});
let event: Raw<AnySyncTimelineEvent> = from_value(event)
.expect("We should be able to create a raw event from the content");
sender.send(event).expect("We should be able to send the event to the receiver");
}
ResponseTemplate::new(200).set_body_json(json!({ "event_id": event_id.clone() }))
});
(receiver, ret)
}
}
/// A prebuilt mock for sending a state event in a room.
@@ -2444,8 +2572,58 @@ impl<'a> MockEndpoint<'a, UploadEndpoint> {
Self { mock: self.mock.and(header("content-type", content_type)), ..self }
}
/// Returns a redact endpoint that emulates success, i.e. the redaction
/// event has been sent with the given event id.
/// Returns a upload endpoint that emulates success, i.e. the media has been
/// uploaded to the media server and can be accessed using the given
/// event has been sent with the given [`MxcUri`].
///
/// The uploaded content is captured and can be accessed using the returned
/// [`Receiver`]. The [`Receiver`] is valid only for a single media
/// upload.
///
/// # Examples
///
/// ```no_run
/// # tokio_test::block_on(async {
/// use matrix_sdk::{
/// ruma::{event_id, mxc_uri, room_id},
/// test_utils::mocks::MatrixMockServer,
/// };
///
/// let mxid = mxc_uri!("mxc://localhost/12345");
///
/// let server = MatrixMockServer::new().await;
/// let (receiver, upload_mock) = server.mock_upload().ok_with_capture(mxid);
/// let client = server.client_builder().build().await;
///
/// client.media().upload(&mime::TEXT_PLAIN, vec![1, 2, 3, 4, 5], None).await?;
///
/// let uploaded = receiver.await?;
///
/// assert_eq!(uploaded, vec![1, 2, 3, 4, 5]);
/// # anyhow::Ok(()) });
/// ```
pub fn ok_with_capture(self, mxc_id: &MxcUri) -> (Receiver<Vec<u8>>, MatrixMock<'a>) {
let (sender, receiver) = oneshot::channel();
let sender = Arc::new(Mutex::new(Some(sender)));
let response_body = json!({"content_uri": mxc_id});
let ret = self.respond_with(move |request: &Request| {
let maybe_sender = sender.lock().unwrap().take();
if let Some(sender) = maybe_sender {
let body = request.body.clone();
let _ = sender.send(body);
}
ResponseTemplate::new(200).set_body_json(response_body.clone())
});
(receiver, ret)
}
/// Returns a upload endpoint that emulates success, i.e. the media has been
/// uploaded to the media server and can be accessed using the given
/// event has been sent with the given [`MxcUri`].
pub fn ok(self, mxc_id: &MxcUri) -> MatrixMock<'a> {
self.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"content_uri": mxc_id
@@ -3269,7 +3447,7 @@ impl<'a> MockEndpoint<'a, LoginEndpoint> {
///
/// # Examples
///
/// ```rust
/// ```
/// use matrix_sdk::test_utils::mocks::{
/// LoginResponseTemplate200, MatrixMockServer,
/// };
@@ -3483,6 +3661,13 @@ impl<'a> MockEndpoint<'a, MediaDownloadEndpoint> {
self.respond_with(ResponseTemplate::new(200).set_body_string("Hello, World!"))
}
/// Returns a successful response with the given bytes.
pub fn ok_bytes(self, bytes: Vec<u8>) -> MatrixMock<'a> {
self.respond_with(
ResponseTemplate::new(200).set_body_raw(bytes, "application/octet-stream"),
)
}
/// Returns a successful response with a fake image content.
pub fn ok_image(self) -> MatrixMock<'a> {
self.respond_with(
@@ -3531,3 +3716,19 @@ impl<'a> MockEndpoint<'a, AuthedMediaThumbnailEndpoint> {
)
}
}
/// A prebuilt mock for `GET /client/v3/rooms/{room_id}/join` requests.
pub struct JoinRoomEndpoint {
room_id: OwnedRoomId,
}
impl<'a> MockEndpoint<'a, JoinRoomEndpoint> {
/// Returns a successful response using the provided [`RoomId`].
pub fn ok(self) -> MatrixMock<'a> {
let room_id = self.endpoint.room_id.to_owned();
self.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"room_id": room_id,
})))
}
}

View File

@@ -2,6 +2,7 @@ mod backups;
mod cross_signing;
mod recovery;
mod secret_storage;
mod shared_history;
mod to_device;
mod verification;

View File

@@ -0,0 +1,183 @@
use futures_util::{FutureExt, StreamExt};
use matrix_sdk::{
assert_decrypted_message_eq, assert_next_matches_with_timeout,
deserialized_responses::{TimelineEvent, UnableToDecryptInfo, UnableToDecryptReason},
encryption::EncryptionSettings,
test_utils::mocks::MatrixMockServer,
};
use matrix_sdk_test::{
async_test, event_factory::EventFactory, InvitedRoomBuilder, JoinedRoomBuilder, StateTestEvent,
};
use ruma::{
device_id, event_id, events::room::message::RoomMessageEventContent, mxc_uri, room_id, user_id,
};
#[async_test]
async fn test_shared_history_out_of_order() {
let room_id = room_id!("!test:localhost");
let mxid = mxc_uri!("mxc://localhost/12345");
let alice_user_id = user_id!("@alice:localhost");
let alice_device_id = device_id!("ALICEDEVICE");
let bob_user_id = user_id!("@bob:localhost");
let bob_device_id = device_id!("BOBDEVICE");
let matrix_mock_server = MatrixMockServer::new().await;
matrix_mock_server.mock_crypto_endpoints_preset().await;
matrix_mock_server.mock_invite_user_by_id().ok().mock_once().mount().await;
let encryption_settings =
EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() };
let alice = matrix_mock_server
.client_builder_for_crypto_end_to_end(alice_user_id, alice_device_id)
.enable_share_history_on_invite()
.with_encryption_settings(encryption_settings)
.build()
.await;
let bob = matrix_mock_server
.client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id)
.enable_share_history_on_invite()
.with_encryption_settings(encryption_settings)
.build()
.await;
matrix_mock_server.exchange_e2ee_identities(&alice, &bob).await;
let event_factory = EventFactory::new().room(room_id);
let alice_member_event = event_factory.member(alice_user_id).into_raw_timeline();
matrix_mock_server
.mock_sync()
.ok_and_run(&alice, |builder| {
builder.add_joined_room(
JoinedRoomBuilder::new(room_id)
.add_state_event(StateTestEvent::Create)
.add_state_event(StateTestEvent::Encryption),
);
})
.await;
let room =
alice.get_room(room_id).expect("Alice should have access to the room now that we synced");
let event_id = event_id!("$some_id");
let (event_receiver, mock) =
matrix_mock_server.mock_room_send().ok_with_capture(event_id, alice_user_id);
mock.mock_once().mount().await;
matrix_mock_server
.mock_get_members()
.ok(vec![alice_member_event.clone().cast()])
.mock_once()
.mount()
.await;
let event_id = room
.send(RoomMessageEventContent::text_plain("It's a secret to everybody"))
.await
.expect("We should be able to send an initial message")
.event_id;
matrix_mock_server.mock_authenticated_media_config().ok_default().mock_once().mount().await;
let (receiver, upload_mock) = matrix_mock_server.mock_upload().ok_with_capture(mxid);
upload_mock.mock_once().mount().await;
let (_guard, bundle_info) = matrix_mock_server.mock_capture_put_to_device(alice_user_id).await;
room.invite_user_by_id(bob_user_id).await.expect("We should be able to invite Bob");
let bundle = receiver.await.expect("We should have received a bundle now.");
let mut bundle_stream = bob
.encryption()
.historic_room_key_stream()
.await
.expect("We should be able to get the bundle stream");
let bob_member_event = event_factory.member(alice_user_id).invited(bob_user_id);
matrix_mock_server
.mock_sync()
.ok_and_run(&bob, |builder| {
builder.add_invited_room(
InvitedRoomBuilder::new(room_id)
.add_state_event(alice_member_event.cast())
.add_state_event(bob_member_event.into_raw_timeline().cast()),
);
})
.await;
let bob_room = bob.get_room(room_id).expect("Bob should have access to the invited room");
matrix_mock_server.mock_room_join(room_id).ok().mock_once().mount().await;
bob_room.join().await.expect("Bob should be able to join the room");
let details = bob_room
.invite_acceptance_details()
.expect("We should have stored invite acceptance details");
assert_eq!(
details.inviter,
alice.user_id().unwrap(),
"We should have recorded that Alice has invited us"
);
let bundle_info = bundle_info.await;
matrix_mock_server.mock_media_download().ok_bytes(bundle).mock_once().mount().await;
let mut room_key_stream = bob
.encryption()
.room_keys_received_stream()
.await
.expect("We should be able to listen to received room keys");
matrix_mock_server
.mock_sync()
.ok_and_run(&bob, |builder| {
builder.add_to_device_event(
bundle_info
.deserialize_as()
.expect("We should be able to deserialize the bundle info"),
);
})
.await;
let bundle_notification = bundle_stream
.next()
.now_or_never()
.flatten()
.expect("We should have been notiifed about the received bundle");
assert_eq!(bundle_notification.sender, alice_user_id);
assert_eq!(bundle_notification.room_id, room_id);
assert_next_matches_with_timeout!(room_key_stream, 1000, Ok(room_key_infos) => assert_eq!(room_key_infos.len(), 1));
let event = event_receiver.await.expect("We should have received Alice's event");
matrix_mock_server
.mock_room_event()
.room(room_id)
.match_event_id()
.ok(TimelineEvent::from_utd(
event,
UnableToDecryptInfo { session_id: None, reason: UnableToDecryptReason::Unknown },
))
.mock_once()
.mount()
.await;
let event = bob_room
.event(&event_id, None)
.await
.expect("Bob should be able to fetch the event Alice has sent");
assert_decrypted_message_eq!(
event,
"It's a secret to everybody",
"The decrypted event should match the message Alice has sent"
);
}

View File

@@ -3,7 +3,6 @@ use ruma::{
events::AnyStrippedStateEvent, serde::Raw,
};
use super::StrippedStateTestEvent;
use crate::DEFAULT_TEST_ROOM_ID;
pub struct InvitedRoomBuilder {
@@ -26,7 +25,7 @@ impl InvitedRoomBuilder {
}
/// Add an event to the state.
pub fn add_state_event(mut self, event: StrippedStateTestEvent) -> Self {
pub fn add_state_event(mut self, event: impl Into<Raw<AnyStrippedStateEvent>>) -> Self {
self.inner.invite_state.events.push(event.into());
self
}