mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-04-20 23:27:44 -04:00
Merge pull request #6215 from matrix-org/kaylendog/history-sharing/restart-import
feat: Try import stored key bundles on client start
This commit is contained in:
@@ -8,6 +8,9 @@ All notable changes to this project will be documented in this file.
|
||||
|
||||
### Features
|
||||
|
||||
- Attempt to import stored room key bundles for rooms awaiting bundles at
|
||||
client startup.
|
||||
([#6215](https://github.com/matrix-org/matrix-rust-sdk/pull/6215))
|
||||
- Add `OAuth::cached_server_metadata()` that caches the authorization server
|
||||
metadata for a while.
|
||||
([#6217](https://github.com/matrix-org/matrix-rust-sdk/pull/6217))
|
||||
|
||||
@@ -785,6 +785,14 @@ impl Client {
|
||||
pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
|
||||
self.olm_machine().await
|
||||
}
|
||||
|
||||
/// Aborts the client's bundle receiver task, for testing purposes only.
|
||||
pub fn abort_bundle_receiver_task(&self) {
|
||||
let tasks = self.inner.e2ee.tasks.lock();
|
||||
if let Some(task) = tasks.receive_historic_room_key_bundles.as_ref() {
|
||||
task.abort()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A high-level API to manage the client's encryption.
|
||||
|
||||
@@ -16,14 +16,11 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration};
|
||||
|
||||
use futures_core::Stream;
|
||||
use futures_util::{StreamExt, pin_mut};
|
||||
use matrix_sdk_base::crypto::store::types::{RoomKeyBundleInfo, RoomPendingKeyBundleDetails};
|
||||
#[cfg(feature = "experimental-encrypted-state-events")]
|
||||
use matrix_sdk_base::crypto::types::events::room::encrypted::{
|
||||
EncryptedEvent, RoomEventEncryptionScheme,
|
||||
};
|
||||
use matrix_sdk_base::{
|
||||
RoomState,
|
||||
crypto::store::types::{RoomKeyBundleInfo, RoomPendingKeyBundleDetails},
|
||||
};
|
||||
use matrix_sdk_common::failures_cache::FailuresCache;
|
||||
#[cfg(not(feature = "experimental-encrypted-state-events"))]
|
||||
use ruma::events::room::encrypted::{EncryptedEventScheme, OriginalSyncRoomEncryptedEvent};
|
||||
@@ -434,16 +431,18 @@ impl BackupDownloadTaskListenerState {
|
||||
}
|
||||
|
||||
pub(crate) struct BundleReceiverTask {
|
||||
_handle: JoinHandle<()>,
|
||||
_startup_handle: JoinHandle<()>,
|
||||
_listen_handle: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl BundleReceiverTask {
|
||||
pub async fn new(client: &Client) -> Self {
|
||||
let stream = client.encryption().historic_room_key_stream().await.expect("E2EE tasks should only be initialized once we have logged in and have access to an OlmMachine");
|
||||
let weak_client = WeakClient::from_client(client);
|
||||
let handle = spawn(Self::listen_task(weak_client, stream));
|
||||
|
||||
Self { _handle: handle }
|
||||
Self {
|
||||
_listen_handle: spawn(Self::listen_task(weak_client.clone(), stream)),
|
||||
_startup_handle: spawn(Self::startup_task(weak_client)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn listen_task(client: WeakClient, stream: impl Stream<Item = RoomKeyBundleInfo>) {
|
||||
@@ -469,6 +468,89 @@ impl BundleReceiverTask {
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieves a list of all rooms pending key bundles, then cross-references
|
||||
/// this with bundles held in the crypto store. If all conditions outlined
|
||||
/// in [`shared_room_history::maybe_accept_key_bundle`], then the bundle
|
||||
/// will be imported.
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn startup_task(client: WeakClient) {
|
||||
tracing::debug!("Checking for unimported stored room key bundles...");
|
||||
|
||||
let Some(client) = client.get() else {
|
||||
// The client was dropped before the worker future was first polled.
|
||||
return;
|
||||
};
|
||||
|
||||
let olm_machine = client.olm_machine().await;
|
||||
let Some(olm_machine) = olm_machine.as_ref() else {
|
||||
// The Olm machine was not initialized by the time this task is ready
|
||||
// to perform its work. This is likely a bug, as this worker is only
|
||||
// expected to be spawned once the client is fully ready and the
|
||||
// Olm machine is available.
|
||||
tracing::warn!("Skipping startup bundle checks because the Olm machine is unavailable");
|
||||
return;
|
||||
};
|
||||
|
||||
let room_details = match olm_machine.store().get_all_rooms_pending_key_bundles().await {
|
||||
Ok(room_details) => room_details,
|
||||
Err(e) => {
|
||||
tracing::warn!("Error while fetching rooms pending key bundles: {e:?}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Partition the room details into two categories: those that should be
|
||||
// processed and those that should be removed.
|
||||
let (valid, invalid): (Vec<_>, Vec<_>) = room_details.iter().partition(|details| {
|
||||
shared_room_history::should_process_room_pending_key_bundle_details(details)
|
||||
});
|
||||
|
||||
tracing::debug!(
|
||||
"Found {} valid and {} invalid rooms that are still pending key bundles",
|
||||
valid.len(),
|
||||
invalid.len(),
|
||||
);
|
||||
|
||||
// Iterate over the details that are valid for processing. For each valid
|
||||
// details, check if we have the corresponding key bundle data in the
|
||||
// store. If the data exists, attempt to re-import the bundle.
|
||||
for RoomPendingKeyBundleDetails { room_id, inviter, .. } in valid {
|
||||
let Some(room) = client.get_room(room_id) else {
|
||||
// Skip processing if the room is not cached in the state store.
|
||||
tracing::trace!(?room_id, "Room not available in state store, skipping...");
|
||||
continue;
|
||||
};
|
||||
let bundle =
|
||||
match olm_machine.store().get_received_room_key_bundle_data(room_id, inviter).await
|
||||
{
|
||||
Ok(Some(bundle)) => bundle,
|
||||
Ok(None) => {
|
||||
// If the bundle data is not available, skip processing. The listener task
|
||||
// will handle this case when the bundle arrives.
|
||||
tracing::trace!(?room_id, "No bundle available, skipping...");
|
||||
continue;
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
?room_id,
|
||||
"Failed to fetch received room key bundle data: {err:?}"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
Self::handle_bundle(&room, &(&bundle).into()).await;
|
||||
}
|
||||
|
||||
// For each invalid details, clear the pending key bundle information from the
|
||||
// respective room to avoid re-checking it in the future.
|
||||
for RoomPendingKeyBundleDetails { room_id, .. } in &invalid {
|
||||
tracing::trace!(?room_id, "Clearing pending flag for room");
|
||||
if let Err(e) = olm_machine.store().clear_room_pending_key_bundle(room_id).await {
|
||||
tracing::warn!("Error clearing room pending key bundle: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// We have received a key bundle for a given room: check if we recently
|
||||
/// accepted an invite from the sender of the bundle, and if so, join
|
||||
/// the room.
|
||||
@@ -488,8 +570,8 @@ impl BundleReceiverTask {
|
||||
/// thread will process it.
|
||||
#[instrument(skip(room), fields(room_id = %room.room_id()))]
|
||||
async fn handle_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) {
|
||||
if Self::should_accept_bundle(room, bundle_info).await {
|
||||
info!("Accepting a late key bundle.");
|
||||
if shared_room_history::should_accept_key_bundle(room, bundle_info).await {
|
||||
info!(room_id = %room.room_id(), "Accepting a late key bundle.");
|
||||
|
||||
if let Err(e) =
|
||||
shared_room_history::maybe_accept_key_bundle(room, &bundle_info.sender).await
|
||||
@@ -501,50 +583,24 @@ impl BundleReceiverTask {
|
||||
}
|
||||
}
|
||||
|
||||
async fn should_accept_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) -> bool {
|
||||
// We accept historic room key bundles up to one day after we have accepted an
|
||||
// invite.
|
||||
const DAY: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
|
||||
// If we don't have any invite acceptance details, then this client wasn't the
|
||||
// one that accepted the invite.
|
||||
let Ok(Some(RoomPendingKeyBundleDetails { invite_accepted_at, inviter, .. })) =
|
||||
room.client.base_client().get_pending_key_bundle_details_for_room(room.room_id()).await
|
||||
else {
|
||||
debug!("Not accepting key bundle as there are no recorded invite acceptance details");
|
||||
return false;
|
||||
};
|
||||
|
||||
let state = room.state();
|
||||
let elapsed_since_join = invite_accepted_at.to_system_time().and_then(|t| t.elapsed().ok());
|
||||
let bundle_sender = &bundle_info.sender;
|
||||
|
||||
match (state, elapsed_since_join) {
|
||||
(RoomState::Joined, Some(elapsed_since_join)) => {
|
||||
elapsed_since_join < DAY && bundle_sender == &inviter
|
||||
}
|
||||
(RoomState::Joined, None) => false,
|
||||
(RoomState::Left | RoomState::Invited | RoomState::Knocked | RoomState::Banned, _) => {
|
||||
false
|
||||
}
|
||||
}
|
||||
#[cfg(any(feature = "testing", test))]
|
||||
pub(crate) fn abort(&self) {
|
||||
self._startup_handle.abort();
|
||||
self._listen_handle.abort();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, not(target_family = "wasm")))]
|
||||
mod test {
|
||||
use matrix_sdk_test::{
|
||||
InvitedRoomBuilder, JoinedRoomBuilder, async_test, event_factory::EventFactory,
|
||||
};
|
||||
use matrix_sdk_test::async_test;
|
||||
#[cfg(not(feature = "experimental-encrypted-state-events"))]
|
||||
use ruma::events::room::encrypted::OriginalSyncRoomEncryptedEvent;
|
||||
use ruma::{event_id, room_id, user_id};
|
||||
use ruma::{event_id, room_id};
|
||||
use serde_json::json;
|
||||
use vodozemac::Curve25519PublicKey;
|
||||
use wiremock::MockServer;
|
||||
|
||||
use super::*;
|
||||
use crate::test_utils::{logged_in_client, mocks::MatrixMockServer};
|
||||
use crate::test_utils::logged_in_client;
|
||||
|
||||
// Test that, if backups are not enabled, we don't incorrectly mark a room key
|
||||
// as downloaded.
|
||||
@@ -606,89 +662,4 @@ mod test {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Test that ensures that we only accept a bundle if a certain set of
|
||||
/// conditions is met.
|
||||
#[async_test]
|
||||
async fn test_should_accept_bundle() {
|
||||
let server = MatrixMockServer::new().await;
|
||||
|
||||
let alice_user_id = user_id!("@alice:localhost");
|
||||
let bob_user_id = user_id!("@bob:localhost");
|
||||
let joined_room_id = room_id!("!joined:localhost");
|
||||
let invited_rom_id = room_id!("!invited:localhost");
|
||||
|
||||
let client = server
|
||||
.client_builder()
|
||||
.logged_in_with_token("ABCD".to_owned(), alice_user_id.into(), "DEVICEID".into())
|
||||
.build()
|
||||
.await;
|
||||
|
||||
let event_factory = EventFactory::new().room(invited_rom_id);
|
||||
let bob_member_event = event_factory.member(bob_user_id);
|
||||
let alice_member_event = event_factory.member(bob_user_id).invited(alice_user_id);
|
||||
|
||||
server
|
||||
.mock_sync()
|
||||
.ok_and_run(&client, |builder| {
|
||||
builder.add_joined_room(JoinedRoomBuilder::new(joined_room_id)).add_invited_room(
|
||||
InvitedRoomBuilder::new(invited_rom_id)
|
||||
.add_state_event(bob_member_event)
|
||||
.add_state_event(alice_member_event),
|
||||
);
|
||||
})
|
||||
.await;
|
||||
|
||||
let room =
|
||||
client.get_room(joined_room_id).expect("We should have access to our joined room now");
|
||||
|
||||
assert!(
|
||||
client
|
||||
.base_client()
|
||||
.get_pending_key_bundle_details_for_room(room.room_id())
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none(),
|
||||
"We shouldn't have any invite acceptance details if we didn't join the room on this Client"
|
||||
);
|
||||
|
||||
let bundle_info = RoomKeyBundleInfo {
|
||||
sender: bob_user_id.to_owned(),
|
||||
sender_key: Curve25519PublicKey::from_bytes([0u8; 32]),
|
||||
room_id: joined_room_id.to_owned(),
|
||||
};
|
||||
|
||||
assert!(
|
||||
!BundleReceiverTask::should_accept_bundle(&room, &bundle_info).await,
|
||||
"We should not accept a bundle if we did not join the room from this Client"
|
||||
);
|
||||
|
||||
let invited_room =
|
||||
client.get_room(invited_rom_id).expect("We should have access to our invited room now");
|
||||
|
||||
assert!(
|
||||
!BundleReceiverTask::should_accept_bundle(&invited_room, &bundle_info).await,
|
||||
"We should not accept a bundle if we didn't join the room."
|
||||
);
|
||||
|
||||
server.mock_room_join(invited_rom_id).ok().mock_once().mount().await;
|
||||
|
||||
let room = client
|
||||
.join_room_by_id(invited_rom_id)
|
||||
.await
|
||||
.expect("We should be able to join the invited room");
|
||||
|
||||
let details = client
|
||||
.base_client()
|
||||
.get_pending_key_bundle_details_for_room(room.room_id())
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("We should have stored the invite acceptance details");
|
||||
assert_eq!(details.inviter, bob_user_id, "We should have recorded that Bob has invited us");
|
||||
|
||||
assert!(
|
||||
BundleReceiverTask::should_accept_bundle(&room, &bundle_info).await,
|
||||
"We should accept a bundle if we just joined the room and did so from this very Client object"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,13 +12,17 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::{collections::HashSet, iter};
|
||||
use std::{collections::HashSet, iter, time::Duration};
|
||||
|
||||
use matrix_sdk_base::{
|
||||
crypto::{store::types::Changes, types::events::room_key_bundle::RoomKeyBundleContent},
|
||||
RoomState,
|
||||
crypto::{
|
||||
store::types::{Changes, RoomKeyBundleInfo, RoomPendingKeyBundleDetails},
|
||||
types::events::room_key_bundle::RoomKeyBundleContent,
|
||||
},
|
||||
media::{MediaFormat, MediaRequestParameters},
|
||||
};
|
||||
use ruma::{OwnedUserId, UserId, events::room::MediaSource};
|
||||
use ruma::{OwnedUserId, UserId, api::client::error::ErrorKind, events::room::MediaSource};
|
||||
use tracing::{debug, info, instrument, warn};
|
||||
|
||||
use crate::{Error, Result, Room};
|
||||
@@ -111,6 +115,76 @@ pub(super) async fn share_room_history(room: &Room, user_id: OwnedUserId) -> Res
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Determines whether a room key bundle should be accepted for a given room.
|
||||
///
|
||||
/// This function checks if the client has recorded invite acceptance details
|
||||
/// for the room and ensures that the bundle sender matches the inviter.
|
||||
/// Additionally, it verifies that the room is in a joined state and that the
|
||||
/// bundle is received within one day of the invite being accepted.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `room` - The room for which the key bundle acceptance is being evaluated.
|
||||
/// * `bundle_info` - Information about the room key bundle being evaluated.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// Returns `true` if the key bundle should be accepted, otherwise `false`.
|
||||
pub(crate) async fn should_accept_key_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) -> bool {
|
||||
// If we don't have any invite acceptance details, then this client wasn't the
|
||||
// one that accepted the invite.
|
||||
let Ok(Some(details)) =
|
||||
room.client.base_client().get_pending_key_bundle_details_for_room(room.room_id()).await
|
||||
else {
|
||||
debug!("Not accepting key bundle as there are no recorded invite acceptance details");
|
||||
return false;
|
||||
};
|
||||
|
||||
if !should_process_room_pending_key_bundle_details(&details) {
|
||||
return false;
|
||||
}
|
||||
|
||||
let state = room.state();
|
||||
let bundle_sender = &bundle_info.sender;
|
||||
|
||||
match state {
|
||||
RoomState::Joined => bundle_sender == &details.inviter,
|
||||
RoomState::Left | RoomState::Invited | RoomState::Knocked | RoomState::Banned => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Determines whether the pending key bundle details for a room should be
|
||||
/// processed.
|
||||
///
|
||||
/// This function checks if the invite acceptance timestamp is within the
|
||||
/// allowed time window (one day). If the elapsed time since the invite was
|
||||
/// accepted exceeds this window, the pending key bundle details will not be
|
||||
/// processed.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `details` - The details of the pending key bundle, including the invite
|
||||
/// acceptance timestamp.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// Returns `true` if the pending key bundle details should be processed,
|
||||
/// otherwise `false`.
|
||||
pub(crate) fn should_process_room_pending_key_bundle_details(
|
||||
details: &RoomPendingKeyBundleDetails,
|
||||
) -> bool {
|
||||
// We accept historic room key bundles up to one day after we have accepted an
|
||||
// invite.
|
||||
const DAY: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
|
||||
details
|
||||
.invite_accepted_at
|
||||
.to_system_time()
|
||||
.and_then(|t| t.elapsed().ok())
|
||||
.map(|elapsed_since_join| elapsed_since_join < DAY)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Having accepted an invite for the given room from the given user, attempt to
|
||||
/// find a information about a room key bundle and, if found, download the
|
||||
/// bundle and import the room keys, as per [MSC4268].
|
||||
@@ -159,7 +233,7 @@ pub(crate) async fn maybe_accept_key_bundle(room: &Room, inviter: &UserId) -> Re
|
||||
room.client.keys_query(&req_id, request.device_keys).await?;
|
||||
}
|
||||
|
||||
let bundle_content = client
|
||||
let bundle_content = match client
|
||||
.media()
|
||||
.get_media_content(
|
||||
&MediaRequestParameters {
|
||||
@@ -168,7 +242,31 @@ pub(crate) async fn maybe_accept_key_bundle(room: &Room, inviter: &UserId) -> Re
|
||||
},
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
{
|
||||
Ok(bundle_content) => bundle_content,
|
||||
Err(err) => {
|
||||
// If we encountered an HTTP client error, we should check the status code to
|
||||
// see if we have been sent a bogus link.
|
||||
let Some(err) = err
|
||||
.as_ruma_api_error()
|
||||
.and_then(|e| e.as_client_api_error())
|
||||
.and_then(|e| e.error_kind())
|
||||
else {
|
||||
// Some other error occurred, which we may be able to recover from at the next
|
||||
// client startup.
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if ErrorKind::NotFound == *err {
|
||||
// Clear the pending flag since checking these details again at startup are
|
||||
// guaranteed to fail.
|
||||
olm_machine.store().clear_room_pending_key_bundle(room.room_id()).await?;
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
match serde_json::from_slice(&bundle_content) {
|
||||
Ok(bundle) => {
|
||||
@@ -192,5 +290,107 @@ pub(crate) async fn maybe_accept_key_bundle(room: &Room, inviter: &UserId) -> Re
|
||||
// olm_machine.store().clear_received_room_key_bundle_data(room.room_id(),
|
||||
// user_id).await?;
|
||||
|
||||
// If we have reached this point, the bundle was either successfully imported,
|
||||
// or was malformed and failed to deserialise. In either case, we can clear
|
||||
// the room pending state.
|
||||
olm_machine.store().clear_room_pending_key_bundle(room.room_id()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(all(test, not(target_family = "wasm")))]
|
||||
mod test {
|
||||
use matrix_sdk_base::crypto::store::types::RoomKeyBundleInfo;
|
||||
use matrix_sdk_test::{
|
||||
InvitedRoomBuilder, JoinedRoomBuilder, async_test, event_factory::EventFactory,
|
||||
};
|
||||
use ruma::{room_id, user_id};
|
||||
use vodozemac::Curve25519PublicKey;
|
||||
|
||||
use crate::{room::shared_room_history, test_utils::mocks::MatrixMockServer};
|
||||
|
||||
/// Test that ensures that we only accept a bundle if a certain set of
|
||||
/// conditions is met.
|
||||
#[async_test]
|
||||
async fn test_should_accept_bundle() {
|
||||
let server = MatrixMockServer::new().await;
|
||||
|
||||
let alice_user_id = user_id!("@alice:localhost");
|
||||
let bob_user_id = user_id!("@bob:localhost");
|
||||
let joined_room_id = room_id!("!joined:localhost");
|
||||
let invited_rom_id = room_id!("!invited:localhost");
|
||||
|
||||
let client = server
|
||||
.client_builder()
|
||||
.logged_in_with_token("ABCD".to_owned(), alice_user_id.into(), "DEVICEID".into())
|
||||
.build()
|
||||
.await;
|
||||
|
||||
let event_factory = EventFactory::new().room(invited_rom_id);
|
||||
let bob_member_event = event_factory.member(bob_user_id);
|
||||
let alice_member_event = event_factory.member(bob_user_id).invited(alice_user_id);
|
||||
|
||||
server
|
||||
.mock_sync()
|
||||
.ok_and_run(&client, |builder| {
|
||||
builder.add_joined_room(JoinedRoomBuilder::new(joined_room_id)).add_invited_room(
|
||||
InvitedRoomBuilder::new(invited_rom_id)
|
||||
.add_state_event(bob_member_event)
|
||||
.add_state_event(alice_member_event),
|
||||
);
|
||||
})
|
||||
.await;
|
||||
|
||||
let room =
|
||||
client.get_room(joined_room_id).expect("We should have access to our joined room now");
|
||||
|
||||
assert!(
|
||||
client
|
||||
.base_client()
|
||||
.get_pending_key_bundle_details_for_room(room.room_id())
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none(),
|
||||
"We shouldn't have any invite acceptance details if we didn't join the room on this Client"
|
||||
);
|
||||
|
||||
let bundle_info = RoomKeyBundleInfo {
|
||||
sender: bob_user_id.to_owned(),
|
||||
sender_key: Curve25519PublicKey::from_bytes([0u8; 32]),
|
||||
room_id: joined_room_id.to_owned(),
|
||||
};
|
||||
|
||||
assert!(
|
||||
!shared_room_history::should_accept_key_bundle(&room, &bundle_info).await,
|
||||
"We should not accept a bundle if we did not join the room from this Client"
|
||||
);
|
||||
|
||||
let invited_room =
|
||||
client.get_room(invited_rom_id).expect("We should have access to our invited room now");
|
||||
|
||||
assert!(
|
||||
!shared_room_history::should_accept_key_bundle(&invited_room, &bundle_info).await,
|
||||
"We should not accept a bundle if we didn't join the room."
|
||||
);
|
||||
|
||||
server.mock_room_join(invited_rom_id).ok().mock_once().mount().await;
|
||||
|
||||
let room = client
|
||||
.join_room_by_id(invited_rom_id)
|
||||
.await
|
||||
.expect("We should be able to join the invited room");
|
||||
|
||||
let details = client
|
||||
.base_client()
|
||||
.get_pending_key_bundle_details_for_room(room.room_id())
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("We should have stored the invite acceptance details");
|
||||
assert_eq!(details.inviter, bob_user_id, "We should have recorded that Bob has invited us");
|
||||
|
||||
assert!(
|
||||
shared_room_history::should_accept_key_bundle(&room, &bundle_info).await,
|
||||
"We should accept a bundle if we just joined the room and did so from this very Client object"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,20 +1,53 @@
|
||||
use futures_util::{FutureExt, StreamExt};
|
||||
use matrix_sdk::{
|
||||
assert_decrypted_message_eq, assert_next_matches_with_timeout,
|
||||
Client, assert_decrypted_message_eq, assert_next_matches_with_timeout,
|
||||
deserialized_responses::{TimelineEvent, UnableToDecryptInfo, UnableToDecryptReason},
|
||||
encryption::EncryptionSettings,
|
||||
test_utils::mocks::MatrixMockServer,
|
||||
};
|
||||
use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedToDeviceEvent;
|
||||
use matrix_sdk_test::{
|
||||
InvitedRoomBuilder, JoinedRoomBuilder, async_test, event_factory::EventFactory,
|
||||
};
|
||||
use ruma::{
|
||||
RoomVersionId, device_id, event_id, events::room::message::RoomMessageEventContent, mxc_uri,
|
||||
room_id, user_id,
|
||||
OwnedEventId, RoomVersionId, device_id, event_id,
|
||||
events::{AnySyncTimelineEvent, room::message::RoomMessageEventContent},
|
||||
mxc_uri, room_id,
|
||||
serde::Raw,
|
||||
user_id,
|
||||
};
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[async_test]
|
||||
async fn test_shared_history_out_of_order() {
|
||||
/// Helper struct to collect test data together.
|
||||
struct Test {
|
||||
matrix_mock_server: MatrixMockServer,
|
||||
alice: Client,
|
||||
bob: Client,
|
||||
bob_room: matrix_sdk::Room,
|
||||
/// The encrypted event Alice sent before Bob joined.
|
||||
event_id: OwnedEventId,
|
||||
/// The raw bytes of the uploaded key bundle.
|
||||
bundle: Vec<u8>,
|
||||
/// The captured to-device event carrying bundle info.
|
||||
bundle_info: Raw<EncryptedToDeviceEvent>,
|
||||
/// Receiver for the original event sent by Alice.
|
||||
event_receiver: tokio::sync::oneshot::Receiver<Raw<AnySyncTimelineEvent>>,
|
||||
}
|
||||
|
||||
/// Sets up the shared-history scenario up to the point where Bob has joined the
|
||||
/// room and the bundle info to-device event is ready to be delivered.
|
||||
///
|
||||
/// Both tests below share this identical preamble:
|
||||
///
|
||||
/// - Server + Alice + Bob clients created
|
||||
/// - E2EE identities exchanged
|
||||
/// - Alice creates an encrypted room and sends a message
|
||||
/// - Alice invites Bob, triggering key bundle upload
|
||||
/// - Bob syncs the invite and joins
|
||||
/// - Bundle details are verified
|
||||
async fn setup_shared_history(
|
||||
bob_builder_fn: impl FnOnce(matrix_sdk::ClientBuilder) -> matrix_sdk::ClientBuilder,
|
||||
) -> Test {
|
||||
let room_id = room_id!("!test:localhost");
|
||||
let mxid = mxc_uri!("mxc://localhost/12345");
|
||||
|
||||
@@ -43,7 +76,7 @@ async fn test_shared_history_out_of_order() {
|
||||
let bob = matrix_mock_server
|
||||
.client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id)
|
||||
.on_builder(|builder| {
|
||||
builder
|
||||
bob_builder_fn(builder)
|
||||
.with_enable_share_history_on_invite(true)
|
||||
.with_encryption_settings(encryption_settings)
|
||||
})
|
||||
@@ -105,12 +138,6 @@ async fn test_shared_history_out_of_order() {
|
||||
room.invite_user_by_id(bob_user_id).await.expect("We should be able to invite Bob");
|
||||
let bundle = receiver.await.expect("We should have received a bundle now.");
|
||||
|
||||
let mut bundle_stream = bob
|
||||
.encryption()
|
||||
.historic_room_key_stream()
|
||||
.await
|
||||
.expect("We should be able to get the bundle stream");
|
||||
|
||||
let bob_member_event = event_factory.member(alice_user_id).invited(bob_user_id);
|
||||
|
||||
matrix_mock_server
|
||||
@@ -142,6 +169,33 @@ async fn test_shared_history_out_of_order() {
|
||||
);
|
||||
|
||||
let bundle_info = bundle_info.await;
|
||||
|
||||
Test { matrix_mock_server, alice, bob, bob_room, event_id, bundle, bundle_info, event_receiver }
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_shared_history_out_of_order() {
|
||||
let room_id = room_id!("!test:localhost");
|
||||
let alice_user_id = user_id!("@alice:localhost");
|
||||
let alice_device_id = device_id!("ALICEDEVICE");
|
||||
|
||||
let Test {
|
||||
matrix_mock_server,
|
||||
bob,
|
||||
bob_room,
|
||||
event_id,
|
||||
bundle,
|
||||
bundle_info,
|
||||
event_receiver,
|
||||
..
|
||||
} = setup_shared_history(|builder| builder).await;
|
||||
|
||||
let mut bundle_stream = bob
|
||||
.encryption()
|
||||
.historic_room_key_stream()
|
||||
.await
|
||||
.expect("We should be able to get the bundle stream");
|
||||
|
||||
matrix_mock_server
|
||||
.mock_authed_media_download()
|
||||
.expect_any_access_token()
|
||||
@@ -211,3 +265,144 @@ async fn test_shared_history_out_of_order() {
|
||||
"The decrypted event should match the message Alice has sent"
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
#[async_test]
|
||||
async fn test_shared_history_crash_before_import() {
|
||||
let room_id = room_id!("!test:localhost");
|
||||
let alice_user_id = user_id!("@alice:localhost");
|
||||
let alice_device_id = device_id!("ALICEDEVICE");
|
||||
let bob_user_id = user_id!("@bob:localhost");
|
||||
let bob_device_id = device_id!("BOBDEVICE");
|
||||
|
||||
// Use a common store path for Bob so we can persist invite acceptance details
|
||||
// over the crash.
|
||||
let bob_sqlite_path = tempdir().unwrap();
|
||||
let encryption_settings =
|
||||
EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() };
|
||||
|
||||
let Test {
|
||||
matrix_mock_server, alice, bob, event_id, bundle, bundle_info, event_receiver, ..
|
||||
} = setup_shared_history(|builder| builder.sqlite_store(bob_sqlite_path.path(), None)).await;
|
||||
|
||||
matrix_mock_server
|
||||
.mock_authed_media_download()
|
||||
.expect_any_access_token()
|
||||
.ok_bytes(bundle)
|
||||
.mock_once()
|
||||
.named("media_download")
|
||||
.mount()
|
||||
.await;
|
||||
|
||||
let mut bundle_stream = bob
|
||||
.encryption()
|
||||
.historic_room_key_stream()
|
||||
.await
|
||||
.expect("We should be able to get the bundle stream");
|
||||
|
||||
// Abort the bundle receiver, so we can properly test the crash recovery
|
||||
// mechanism.
|
||||
bob.abort_bundle_receiver_task();
|
||||
|
||||
// Bob now receives the bundle info ...
|
||||
matrix_mock_server
|
||||
.mock_sync()
|
||||
.ok_and_run(&bob, |builder| {
|
||||
builder.add_to_device_event(
|
||||
bundle_info
|
||||
.deserialize_as()
|
||||
.expect("We should be able to deserialize the bundle info"),
|
||||
);
|
||||
})
|
||||
.await;
|
||||
|
||||
let bundle_notification = bundle_stream
|
||||
.next()
|
||||
.now_or_never()
|
||||
.flatten()
|
||||
.expect("We should have been notified about the received bundle");
|
||||
|
||||
assert_eq!(bundle_notification.sender, alice_user_id);
|
||||
assert_eq!(bundle_notification.room_id, room_id);
|
||||
|
||||
// .. but crashes before they finish importing the bundle.
|
||||
drop(bob);
|
||||
|
||||
// Bob restarts their client.
|
||||
let bob_restarted = matrix_mock_server
|
||||
.client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id)
|
||||
.on_builder(|builder| {
|
||||
builder
|
||||
.sqlite_store(bob_sqlite_path.path(), None)
|
||||
.with_enable_share_history_on_invite(true)
|
||||
.with_encryption_settings(encryption_settings)
|
||||
})
|
||||
.build()
|
||||
.await;
|
||||
|
||||
// Bob's restarted client should be able to access the persisted pending key
|
||||
// bundle details.
|
||||
let details = bob_restarted
|
||||
.get_pending_key_bundle_details_for_room(room_id)
|
||||
.await
|
||||
.expect("Bob should be able to get the pending key bundle details for the room")
|
||||
.expect("We should have stored invite acceptance details");
|
||||
|
||||
assert_eq!(
|
||||
details.inviter,
|
||||
alice.user_id().unwrap(),
|
||||
"The persisted pending key bundle details should be identical"
|
||||
);
|
||||
|
||||
// Bob's restarted client should successfully import the room keys.
|
||||
let mut room_key_stream = bob_restarted
|
||||
.encryption()
|
||||
.room_keys_received_stream()
|
||||
.await
|
||||
.expect("We should be able to listen to received room keys");
|
||||
|
||||
assert_next_matches_with_timeout!(room_key_stream, 1000, Ok(room_key_infos) => assert_eq!(room_key_infos.len(), 1));
|
||||
|
||||
let event = event_receiver.await.expect("We should have received Alice's event");
|
||||
|
||||
matrix_mock_server
|
||||
.mock_room_event()
|
||||
.room(room_id)
|
||||
.match_event_id()
|
||||
.ok(TimelineEvent::from_utd(
|
||||
event,
|
||||
UnableToDecryptInfo { session_id: None, reason: UnableToDecryptReason::Unknown },
|
||||
))
|
||||
.mock_once()
|
||||
.mount()
|
||||
.await;
|
||||
|
||||
let event = bob_restarted
|
||||
.get_room(room_id)
|
||||
.expect("Bob should have access to the room after restart")
|
||||
.event(&event_id, None)
|
||||
.await
|
||||
.expect("Bob should be able to fetch the event Alice has sent");
|
||||
|
||||
let encryption_info = event.encryption_info().expect("Event did not have encryption info");
|
||||
|
||||
// Check Bob stored information about the key forwarder.
|
||||
let forwarder_info = encryption_info.forwarder.as_ref().unwrap();
|
||||
assert_eq!(forwarder_info.user_id, alice_user_id);
|
||||
assert_eq!(forwarder_info.device_id, alice_device_id);
|
||||
|
||||
assert_decrypted_message_eq!(
|
||||
event,
|
||||
"It's a secret to everybody",
|
||||
"The decrypted event should match the message Alice has sent"
|
||||
);
|
||||
|
||||
assert!(
|
||||
bob_restarted
|
||||
.get_pending_key_bundle_details_for_room(room_id)
|
||||
.await
|
||||
.expect("Bob should be able to get the pending key bundle details for the room")
|
||||
.is_none(),
|
||||
"Bob should have cleared the pending key bundle details for the room"
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user