From 910887fbc69e8299fe29606b80cb0eb94667dfcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Tue, 23 Jan 2024 10:07:03 +0100 Subject: [PATCH] sliding_sync: Add test for delayed decryption roominfo updates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Timo Kösters --- Cargo.lock | 2 + .../tests/integration/room_list_service.rs | 12 +- crates/matrix-sdk/src/client/builder.rs | 17 +- crates/matrix-sdk/src/client/mod.rs | 2 +- crates/matrix-sdk/src/http_client/mod.rs | 14 +- crates/matrix-sdk/src/http_client/native.rs | 11 +- .../matrix-sdk-integration-testing/Cargo.toml | 2 + .../src/helpers.rs | 17 +- .../src/tests/sliding_sync/room.rs | 161 +++++++++++++++++- 9 files changed, 217 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 03b31269b..29a6cc56c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3243,11 +3243,13 @@ dependencies = [ "eyeball-im", "futures-core", "futures-util", + "http", "matrix-sdk", "matrix-sdk-test", "matrix-sdk-ui", "once_cell", "rand 0.8.5", + "serde_json", "stream_assert", "tempfile", "tokio", diff --git a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs index de4fb0daa..0d04bb338 100644 --- a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs +++ b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs @@ -1946,7 +1946,7 @@ async fn test_dynamic_entries_stream_manual_update() -> Result<(), Error> { assert_pending!(dynamic_entries_stream); // Now, let's define a filter. - dynamic_entries.set_filter(new_filter_fuzzy_match_room_name(&client, "mat ba")); + dynamic_entries.set_filter(Box::new(new_filter_fuzzy_match_room_name(&client, "mat ba"))); // Assert the dynamic entries. assert_entries_batch! { @@ -2014,9 +2014,10 @@ async fn test_dynamic_entries_stream_manual_update() -> Result<(), Error> { end; }; - // Variation 1: Send manual update after reading stream, !r0 should be at new pos 1 + // Variation 1: Send manual update after reading stream, !r0 should be at new + // pos 1 let room = client.get_room(room_id!("!r0:bar.org")).unwrap(); - room.update_summary(room.clone_info()).await; + room.set_room_info(room.clone_info()); assert_entries_batch! { [dynamic_entries_stream] @@ -2069,9 +2070,10 @@ async fn test_dynamic_entries_stream_manual_update() -> Result<(), Error> { }, }; - // Variation 2: Send manual update before reading stream, !r0 should still be at previous pos 1 + // Variation 2: Send manual update before reading stream, !r0 should still be at + // previous pos 1 let room = client.get_room(room_id!("!r0:bar.org")).unwrap(); - room.update_summary(room.clone_info()).await; + room.set_room_info(room.clone_info()); assert_entries_batch! { [dynamic_entries_stream] diff --git a/crates/matrix-sdk/src/client/builder.rs b/crates/matrix-sdk/src/client/builder.rs index 7e64aea72..e6e96e869 100644 --- a/crates/matrix-sdk/src/client/builder.rs +++ b/crates/matrix-sdk/src/client/builder.rs @@ -15,6 +15,7 @@ use std::{fmt, sync::Arc}; +use bytes::Bytes; use matrix_sdk_base::{store::StoreConfig, BaseClient}; use ruma::{ api::{client::discovery::discover_homeserver, error::FromHttpResponseError, MatrixVersion}, @@ -90,6 +91,7 @@ pub struct ClientBuilder { base_client: Option, #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings, + response_preprocessor: Option, &mut http::Response)>, } impl ClientBuilder { @@ -107,6 +109,7 @@ impl ClientBuilder { base_client: None, #[cfg(feature = "e2e-encryption")] encryption_settings: Default::default(), + response_preprocessor: None, } } @@ -326,6 +329,14 @@ impl ClientBuilder { self } + pub fn response_preprocessor( + mut self, + response_preprocessor: fn(&http::Request, &mut http::Response), + ) -> Self { + self.response_preprocessor = Some(response_preprocessor); + self + } + /// Create a [`Client`] with the options set on this builder. /// /// # Errors @@ -361,7 +372,11 @@ impl ClientBuilder { BaseClient::with_store_config(build_store_config(self.store_config).await?) }; - let http_client = HttpClient::new(inner_http_client.clone(), self.request_config); + let http_client = HttpClient::new( + inner_http_client.clone(), + self.request_config, + self.response_preprocessor, + ); #[cfg(feature = "experimental-oidc")] let mut authentication_server_info = None; diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 9255fe6b7..06df11396 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -362,7 +362,7 @@ impl Client { ClientBuilder::new() } - pub(crate) fn base_client(&self) -> &BaseClient { + pub fn base_client(&self) -> &BaseClient { &self.inner.base_client } diff --git a/crates/matrix-sdk/src/http_client/mod.rs b/crates/matrix-sdk/src/http_client/mod.rs index 72390e427..0c6bee66c 100644 --- a/crates/matrix-sdk/src/http_client/mod.rs +++ b/crates/matrix-sdk/src/http_client/mod.rs @@ -49,11 +49,21 @@ pub(crate) struct HttpClient { pub(crate) inner: reqwest::Client, pub(crate) request_config: RequestConfig, next_request_id: Arc, + response_preprocessor: Option, &mut http::Response)>, } impl HttpClient { - pub(crate) fn new(inner: reqwest::Client, request_config: RequestConfig) -> Self { - HttpClient { inner, request_config, next_request_id: AtomicU64::new(0).into() } + pub(crate) fn new( + inner: reqwest::Client, + request_config: RequestConfig, + response_preprocessor: Option, &mut http::Response)>, + ) -> Self { + HttpClient { + inner, + request_config, + next_request_id: AtomicU64::new(0).into(), + response_preprocessor, + } } fn get_request_id(&self) -> String { diff --git a/crates/matrix-sdk/src/http_client/native.rs b/crates/matrix-sdk/src/http_client/native.rs index 54955104f..f3892a663 100644 --- a/crates/matrix-sdk/src/http_client/native.rs +++ b/crates/matrix-sdk/src/http_client/native.rs @@ -92,9 +92,10 @@ impl HttpClient { } }; - let response = send_request(&self.inner, &request, config.timeout, send_progress) - .await - .map_err(error_type)?; + let mut response = + send_request(&self.inner, &request, config.timeout, send_progress) + .await + .map_err(error_type)?; let status_code = response.status(); let response_size = ByteSize(response.body().len().try_into().unwrap_or(u64::MAX)); @@ -115,6 +116,10 @@ impl HttpClient { } } + if let Some(preprocessor) = self.response_preprocessor { + preprocessor(&request, &mut response); + } + R::IncomingResponse::try_from_http_response(response) .map_err(|e| error_type(HttpError::from(e))) } diff --git a/testing/matrix-sdk-integration-testing/Cargo.toml b/testing/matrix-sdk-integration-testing/Cargo.toml index 7d6e05d63..72c35d606 100644 --- a/testing/matrix-sdk-integration-testing/Cargo.toml +++ b/testing/matrix-sdk-integration-testing/Cargo.toml @@ -13,12 +13,14 @@ assign = "1" eyeball-im = { workspace = true } futures-core = { workspace = true } futures-util = { workspace = true } +http = "0.2.11" matrix-sdk = { workspace = true, default-features = true, features = ["testing", "qrcode"] } matrix-sdk-ui = { workspace = true, default-features = true } matrix-sdk-test = { workspace = true } once_cell = { workspace = true } rand = { workspace = true } stream_assert = "0.1.1" +serde_json = "1.0.108" tempfile = "3.3.0" tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros"] } tracing = { workspace = true } diff --git a/testing/matrix-sdk-integration-testing/src/helpers.rs b/testing/matrix-sdk-integration-testing/src/helpers.rs index d00611e1b..1596a245b 100644 --- a/testing/matrix-sdk-integration-testing/src/helpers.rs +++ b/testing/matrix-sdk-integration-testing/src/helpers.rs @@ -9,6 +9,7 @@ use std::{ use anyhow::Result; use assign::assign; use matrix_sdk::{ + bytes::Bytes, config::{RequestConfig, SyncSettings}, encryption::EncryptionSettings, ruma::api::client::{account::register::v3::Request as RegistrationRequest, uiaa}, @@ -25,6 +26,7 @@ pub struct TestClientBuilder { username: String, use_sqlite: bool, encryption_settings: EncryptionSettings, + response_preprocessor: Option, &mut http::Response)>, } impl TestClientBuilder { @@ -33,6 +35,7 @@ impl TestClientBuilder { username: username.into(), use_sqlite: false, encryption_settings: Default::default(), + response_preprocessor: None, } } @@ -52,6 +55,14 @@ impl TestClientBuilder { self } + pub fn response_preprocessor( + mut self, + r: fn(&http::Request, &mut http::Response), + ) -> Self { + self.response_preprocessor = Some(r); + self + } + pub async fn build(self) -> Result { let mut users = USERS.lock().await; if let Some((client, _)) = users.get(&self.username) { @@ -65,13 +76,17 @@ impl TestClientBuilder { let tmp_dir = tempdir()?; - let client_builder = Client::builder() + let mut client_builder = Client::builder() .user_agent("matrix-sdk-integration-tests") .homeserver_url(homeserver_url) .sliding_sync_proxy(sliding_sync_proxy_url) .with_encryption_settings(self.encryption_settings) .request_config(RequestConfig::short_retry()); + if let Some(response_preprocessor) = self.response_preprocessor { + client_builder = client_builder.response_preprocessor(response_preprocessor); + } + let client = if self.use_sqlite { client_builder.sqlite_store(tmp_dir.path(), None).build().await? } else { diff --git a/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs b/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs index 567f13697..4b167093f 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs @@ -1,16 +1,26 @@ -use std::{sync::Arc, time::Duration}; +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex as StdMutex, + }, + time::Duration, +}; use anyhow::Result; use futures_util::{pin_mut, StreamExt as _}; use matrix_sdk::{ + bytes::Bytes, config::SyncSettings, ruma::{ - api::client::{ - receipt::create_receipt::v3::ReceiptType, - room::create_room::v3::Request as CreateRoomRequest, - sync::sync_events::v4::{ - AccountDataConfig, E2EEConfig, ReceiptsConfig, ToDeviceConfig, + api::{ + client::{ + receipt::create_receipt::v3::ReceiptType, + room::create_room::v3::{Request as CreateRoomRequest, RoomPreset}, + sync::sync_events::v4::{ + AccountDataConfig, E2EEConfig, ReceiptsConfig, ToDeviceConfig, + }, }, + IncomingResponse, }, assign, events::{ @@ -21,9 +31,15 @@ use matrix_sdk::{ }, Client, RoomListEntry, RoomMemberships, RoomState, SlidingSyncList, SlidingSyncMode, }; -use matrix_sdk_ui::RoomListService; +use matrix_sdk_ui::{ + room_list_service::filters::new_filter_all, sync_service::SyncService, RoomListService, +}; use stream_assert::assert_pending; -use tokio::{spawn, sync::Mutex, time::sleep}; +use tokio::{ + spawn, + sync::Mutex, + time::{sleep, timeout}, +}; use tracing::{error, warn}; use crate::helpers::TestClientBuilder; @@ -530,3 +546,132 @@ async fn test_room_notification_count() -> Result<()> { Ok(()) } + +struct DelayedDecryption { + active: bool, + queued: Vec, +} +static DELAYED_DECRYPTION: StdMutex = + StdMutex::new(DelayedDecryption { active: true, queued: Vec::new() }); + +fn delayed_decryption(request: &http::Request, response: &mut http::Response) { + dbg!(request.uri()); + let Ok(mut json) = serde_json::from_slice::(response.body()) else { + return; + }; + let Some(extensions) = json.get_mut("extensions").and_then(|e| e.as_object_mut()) else { + return; + }; + let Some(to_device) = extensions.remove("to_device") else { + return; + }; + let d = DELAYED_DECRYPTION.lock().unwrap(); + if d.active { + *response.body_mut() = serde_json::to_vec(&json).unwrap().into(); + return; + } else { + dbg!(to_device); + // d.queued.push(to_device); + } +} + +#[tokio::test] +async fn test_delayed_decryption_latest_event() -> Result<()> { + let alice = TestClientBuilder::new("alice".to_owned()) + .randomize_username() + .use_sqlite() + .response_preprocessor(delayed_decryption) + .build() + .await?; + let (tx, rx) = tokio::sync::broadcast::channel(10); + alice.base_client().set_roominfo_update_sender(tx); + let bob = + TestClientBuilder::new("bob".to_owned()).randomize_username().use_sqlite().build().await?; + + let alice_sync_service = SyncService::builder(alice.clone()).build().await.unwrap(); + alice_sync_service.start().await; + // Set up sliding sync for alice. + let sliding_alice = alice + .sliding_sync("main")? + .with_all_extensions() + .poll_timeout(Duration::from_secs(2)) + .network_timeout(Duration::from_secs(2)) + .add_list( + SlidingSyncList::builder("all") + .sync_mode(SlidingSyncMode::new_selective().add_range(0..=20)), + ) + .build() + .await?; + + // Set up sliding sync for bob. + let sliding_bob = bob + .sliding_sync("main")? + .with_all_extensions() + .poll_timeout(Duration::from_secs(2)) + .network_timeout(Duration::from_secs(2)) + .add_list( + SlidingSyncList::builder("all") + .sync_mode(SlidingSyncMode::new_selective().add_range(0..=20)), + ) + .build() + .await?; + + let s = sliding_alice.clone(); + tokio::task::spawn(async move { + let stream = s.sync(); + pin_mut!(stream); + while let Some(up) = stream.next().await { + warn!("alice received update: {up:?}"); + } + }); + + let s = sliding_bob.clone(); + tokio::task::spawn(async move { + let stream = s.sync(); + pin_mut!(stream); + while let Some(up) = stream.next().await { + warn!("bob received update: {up:?}"); + } + }); + + // alice creates a room and invites bob and celine. + let alice_room = alice + .create_room(assign!(CreateRoomRequest::new(), { + invite: vec![bob.user_id().unwrap().to_owned()], + is_direct: true, + preset: Some(RoomPreset::TrustedPrivateChat), + })) + .await?; + alice_room.enable_encryption().await.unwrap(); + + sleep(Duration::from_secs(1)).await; + let alice_room = alice.get_room(alice_room.room_id()).unwrap(); + let bob_room = bob.get_room(alice_room.room_id()).unwrap(); + bob_room.join().await.unwrap(); + bob_room.send(RoomMessageEventContent::text_plain("hello world")).await?; + + sleep(Duration::from_secs(1)).await; + assert_eq!(alice_room.state(), RoomState::Joined); + assert!(alice_room.is_encrypted().await.unwrap()); + assert_eq!(bob_room.state(), RoomState::Joined); + + let (stream, entries) = alice_sync_service + .room_list_service() + .all_rooms() + .await + .unwrap() + .entries_with_dynamic_adapters(10, &rx); + entries.set_filter(Box::new(new_filter_all(vec![]))); + pin_mut!(stream); + dbg!(timeout(Duration::from_millis(100), stream.next()).await.unwrap()); + assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err()); + assert!(matches!(alice_room.latest_event(), None)); + + DELAYED_DECRYPTION.lock().unwrap().active = false; + sleep(Duration::from_secs(1)).await; + dbg!(&alice_room.latest_event().unwrap().event().event); + dbg!(timeout(Duration::from_millis(100), stream.next()).await.unwrap()); + assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err()); + + Ok(()) +}