sliding_sync: Add test for delayed decryption roominfo updates

Signed-off-by: Timo Kösters <timo@koesters.xyz>
This commit is contained in:
Timo Kösters
2024-01-23 10:07:03 +01:00
committed by Benjamin Bouvier
parent 304bd910f0
commit 910887fbc6
9 changed files with 217 additions and 21 deletions

2
Cargo.lock generated
View File

@@ -3243,11 +3243,13 @@ dependencies = [
"eyeball-im",
"futures-core",
"futures-util",
"http",
"matrix-sdk",
"matrix-sdk-test",
"matrix-sdk-ui",
"once_cell",
"rand 0.8.5",
"serde_json",
"stream_assert",
"tempfile",
"tokio",

View File

@@ -1946,7 +1946,7 @@ async fn test_dynamic_entries_stream_manual_update() -> Result<(), Error> {
assert_pending!(dynamic_entries_stream);
// Now, let's define a filter.
dynamic_entries.set_filter(new_filter_fuzzy_match_room_name(&client, "mat ba"));
dynamic_entries.set_filter(Box::new(new_filter_fuzzy_match_room_name(&client, "mat ba")));
// Assert the dynamic entries.
assert_entries_batch! {
@@ -2014,9 +2014,10 @@ async fn test_dynamic_entries_stream_manual_update() -> Result<(), Error> {
end;
};
// Variation 1: Send manual update after reading stream, !r0 should be at new pos 1
// Variation 1: Send manual update after reading stream, !r0 should be at new
// pos 1
let room = client.get_room(room_id!("!r0:bar.org")).unwrap();
room.update_summary(room.clone_info()).await;
room.set_room_info(room.clone_info());
assert_entries_batch! {
[dynamic_entries_stream]
@@ -2069,9 +2070,10 @@ async fn test_dynamic_entries_stream_manual_update() -> Result<(), Error> {
},
};
// Variation 2: Send manual update before reading stream, !r0 should still be at previous pos 1
// Variation 2: Send manual update before reading stream, !r0 should still be at
// previous pos 1
let room = client.get_room(room_id!("!r0:bar.org")).unwrap();
room.update_summary(room.clone_info()).await;
room.set_room_info(room.clone_info());
assert_entries_batch! {
[dynamic_entries_stream]

View File

@@ -15,6 +15,7 @@
use std::{fmt, sync::Arc};
use bytes::Bytes;
use matrix_sdk_base::{store::StoreConfig, BaseClient};
use ruma::{
api::{client::discovery::discover_homeserver, error::FromHttpResponseError, MatrixVersion},
@@ -90,6 +91,7 @@ pub struct ClientBuilder {
base_client: Option<BaseClient>,
#[cfg(feature = "e2e-encryption")]
encryption_settings: EncryptionSettings,
response_preprocessor: Option<fn(&http::Request<Bytes>, &mut http::Response<Bytes>)>,
}
impl ClientBuilder {
@@ -107,6 +109,7 @@ impl ClientBuilder {
base_client: None,
#[cfg(feature = "e2e-encryption")]
encryption_settings: Default::default(),
response_preprocessor: None,
}
}
@@ -326,6 +329,14 @@ impl ClientBuilder {
self
}
pub fn response_preprocessor(
mut self,
response_preprocessor: fn(&http::Request<Bytes>, &mut http::Response<Bytes>),
) -> Self {
self.response_preprocessor = Some(response_preprocessor);
self
}
/// Create a [`Client`] with the options set on this builder.
///
/// # Errors
@@ -361,7 +372,11 @@ impl ClientBuilder {
BaseClient::with_store_config(build_store_config(self.store_config).await?)
};
let http_client = HttpClient::new(inner_http_client.clone(), self.request_config);
let http_client = HttpClient::new(
inner_http_client.clone(),
self.request_config,
self.response_preprocessor,
);
#[cfg(feature = "experimental-oidc")]
let mut authentication_server_info = None;

View File

@@ -362,7 +362,7 @@ impl Client {
ClientBuilder::new()
}
pub(crate) fn base_client(&self) -> &BaseClient {
pub fn base_client(&self) -> &BaseClient {
&self.inner.base_client
}

View File

@@ -49,11 +49,21 @@ pub(crate) struct HttpClient {
pub(crate) inner: reqwest::Client,
pub(crate) request_config: RequestConfig,
next_request_id: Arc<AtomicU64>,
response_preprocessor: Option<fn(&http::Request<Bytes>, &mut http::Response<Bytes>)>,
}
impl HttpClient {
pub(crate) fn new(inner: reqwest::Client, request_config: RequestConfig) -> Self {
HttpClient { inner, request_config, next_request_id: AtomicU64::new(0).into() }
pub(crate) fn new(
inner: reqwest::Client,
request_config: RequestConfig,
response_preprocessor: Option<fn(&http::Request<Bytes>, &mut http::Response<Bytes>)>,
) -> Self {
HttpClient {
inner,
request_config,
next_request_id: AtomicU64::new(0).into(),
response_preprocessor,
}
}
fn get_request_id(&self) -> String {

View File

@@ -92,9 +92,10 @@ impl HttpClient {
}
};
let response = send_request(&self.inner, &request, config.timeout, send_progress)
.await
.map_err(error_type)?;
let mut response =
send_request(&self.inner, &request, config.timeout, send_progress)
.await
.map_err(error_type)?;
let status_code = response.status();
let response_size = ByteSize(response.body().len().try_into().unwrap_or(u64::MAX));
@@ -115,6 +116,10 @@ impl HttpClient {
}
}
if let Some(preprocessor) = self.response_preprocessor {
preprocessor(&request, &mut response);
}
R::IncomingResponse::try_from_http_response(response)
.map_err(|e| error_type(HttpError::from(e)))
}

View File

@@ -13,12 +13,14 @@ assign = "1"
eyeball-im = { workspace = true }
futures-core = { workspace = true }
futures-util = { workspace = true }
http = "0.2.11"
matrix-sdk = { workspace = true, default-features = true, features = ["testing", "qrcode"] }
matrix-sdk-ui = { workspace = true, default-features = true }
matrix-sdk-test = { workspace = true }
once_cell = { workspace = true }
rand = { workspace = true }
stream_assert = "0.1.1"
serde_json = "1.0.108"
tempfile = "3.3.0"
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros"] }
tracing = { workspace = true }

View File

@@ -9,6 +9,7 @@ use std::{
use anyhow::Result;
use assign::assign;
use matrix_sdk::{
bytes::Bytes,
config::{RequestConfig, SyncSettings},
encryption::EncryptionSettings,
ruma::api::client::{account::register::v3::Request as RegistrationRequest, uiaa},
@@ -25,6 +26,7 @@ pub struct TestClientBuilder {
username: String,
use_sqlite: bool,
encryption_settings: EncryptionSettings,
response_preprocessor: Option<fn(&http::Request<Bytes>, &mut http::Response<Bytes>)>,
}
impl TestClientBuilder {
@@ -33,6 +35,7 @@ impl TestClientBuilder {
username: username.into(),
use_sqlite: false,
encryption_settings: Default::default(),
response_preprocessor: None,
}
}
@@ -52,6 +55,14 @@ impl TestClientBuilder {
self
}
pub fn response_preprocessor(
mut self,
r: fn(&http::Request<Bytes>, &mut http::Response<Bytes>),
) -> Self {
self.response_preprocessor = Some(r);
self
}
pub async fn build(self) -> Result<Client> {
let mut users = USERS.lock().await;
if let Some((client, _)) = users.get(&self.username) {
@@ -65,13 +76,17 @@ impl TestClientBuilder {
let tmp_dir = tempdir()?;
let client_builder = Client::builder()
let mut client_builder = Client::builder()
.user_agent("matrix-sdk-integration-tests")
.homeserver_url(homeserver_url)
.sliding_sync_proxy(sliding_sync_proxy_url)
.with_encryption_settings(self.encryption_settings)
.request_config(RequestConfig::short_retry());
if let Some(response_preprocessor) = self.response_preprocessor {
client_builder = client_builder.response_preprocessor(response_preprocessor);
}
let client = if self.use_sqlite {
client_builder.sqlite_store(tmp_dir.path(), None).build().await?
} else {

View File

@@ -1,16 +1,26 @@
use std::{sync::Arc, time::Duration};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex as StdMutex,
},
time::Duration,
};
use anyhow::Result;
use futures_util::{pin_mut, StreamExt as _};
use matrix_sdk::{
bytes::Bytes,
config::SyncSettings,
ruma::{
api::client::{
receipt::create_receipt::v3::ReceiptType,
room::create_room::v3::Request as CreateRoomRequest,
sync::sync_events::v4::{
AccountDataConfig, E2EEConfig, ReceiptsConfig, ToDeviceConfig,
api::{
client::{
receipt::create_receipt::v3::ReceiptType,
room::create_room::v3::{Request as CreateRoomRequest, RoomPreset},
sync::sync_events::v4::{
AccountDataConfig, E2EEConfig, ReceiptsConfig, ToDeviceConfig,
},
},
IncomingResponse,
},
assign,
events::{
@@ -21,9 +31,15 @@ use matrix_sdk::{
},
Client, RoomListEntry, RoomMemberships, RoomState, SlidingSyncList, SlidingSyncMode,
};
use matrix_sdk_ui::RoomListService;
use matrix_sdk_ui::{
room_list_service::filters::new_filter_all, sync_service::SyncService, RoomListService,
};
use stream_assert::assert_pending;
use tokio::{spawn, sync::Mutex, time::sleep};
use tokio::{
spawn,
sync::Mutex,
time::{sleep, timeout},
};
use tracing::{error, warn};
use crate::helpers::TestClientBuilder;
@@ -530,3 +546,132 @@ async fn test_room_notification_count() -> Result<()> {
Ok(())
}
struct DelayedDecryption {
active: bool,
queued: Vec<serde_json::Value>,
}
static DELAYED_DECRYPTION: StdMutex<DelayedDecryption> =
StdMutex::new(DelayedDecryption { active: true, queued: Vec::new() });
fn delayed_decryption(request: &http::Request<Bytes>, response: &mut http::Response<Bytes>) {
dbg!(request.uri());
let Ok(mut json) = serde_json::from_slice::<serde_json::Value>(response.body()) else {
return;
};
let Some(extensions) = json.get_mut("extensions").and_then(|e| e.as_object_mut()) else {
return;
};
let Some(to_device) = extensions.remove("to_device") else {
return;
};
let d = DELAYED_DECRYPTION.lock().unwrap();
if d.active {
*response.body_mut() = serde_json::to_vec(&json).unwrap().into();
return;
} else {
dbg!(to_device);
// d.queued.push(to_device);
}
}
#[tokio::test]
async fn test_delayed_decryption_latest_event() -> Result<()> {
let alice = TestClientBuilder::new("alice".to_owned())
.randomize_username()
.use_sqlite()
.response_preprocessor(delayed_decryption)
.build()
.await?;
let (tx, rx) = tokio::sync::broadcast::channel(10);
alice.base_client().set_roominfo_update_sender(tx);
let bob =
TestClientBuilder::new("bob".to_owned()).randomize_username().use_sqlite().build().await?;
let alice_sync_service = SyncService::builder(alice.clone()).build().await.unwrap();
alice_sync_service.start().await;
// Set up sliding sync for alice.
let sliding_alice = alice
.sliding_sync("main")?
.with_all_extensions()
.poll_timeout(Duration::from_secs(2))
.network_timeout(Duration::from_secs(2))
.add_list(
SlidingSyncList::builder("all")
.sync_mode(SlidingSyncMode::new_selective().add_range(0..=20)),
)
.build()
.await?;
// Set up sliding sync for bob.
let sliding_bob = bob
.sliding_sync("main")?
.with_all_extensions()
.poll_timeout(Duration::from_secs(2))
.network_timeout(Duration::from_secs(2))
.add_list(
SlidingSyncList::builder("all")
.sync_mode(SlidingSyncMode::new_selective().add_range(0..=20)),
)
.build()
.await?;
let s = sliding_alice.clone();
tokio::task::spawn(async move {
let stream = s.sync();
pin_mut!(stream);
while let Some(up) = stream.next().await {
warn!("alice received update: {up:?}");
}
});
let s = sliding_bob.clone();
tokio::task::spawn(async move {
let stream = s.sync();
pin_mut!(stream);
while let Some(up) = stream.next().await {
warn!("bob received update: {up:?}");
}
});
// alice creates a room and invites bob and celine.
let alice_room = alice
.create_room(assign!(CreateRoomRequest::new(), {
invite: vec![bob.user_id().unwrap().to_owned()],
is_direct: true,
preset: Some(RoomPreset::TrustedPrivateChat),
}))
.await?;
alice_room.enable_encryption().await.unwrap();
sleep(Duration::from_secs(1)).await;
let alice_room = alice.get_room(alice_room.room_id()).unwrap();
let bob_room = bob.get_room(alice_room.room_id()).unwrap();
bob_room.join().await.unwrap();
bob_room.send(RoomMessageEventContent::text_plain("hello world")).await?;
sleep(Duration::from_secs(1)).await;
assert_eq!(alice_room.state(), RoomState::Joined);
assert!(alice_room.is_encrypted().await.unwrap());
assert_eq!(bob_room.state(), RoomState::Joined);
let (stream, entries) = alice_sync_service
.room_list_service()
.all_rooms()
.await
.unwrap()
.entries_with_dynamic_adapters(10, &rx);
entries.set_filter(Box::new(new_filter_all(vec![])));
pin_mut!(stream);
dbg!(timeout(Duration::from_millis(100), stream.next()).await.unwrap());
assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err());
assert!(matches!(alice_room.latest_event(), None));
DELAYED_DECRYPTION.lock().unwrap().active = false;
sleep(Duration::from_secs(1)).await;
dbg!(&alice_room.latest_event().unwrap().event().event);
dbg!(timeout(Duration::from_millis(100), stream.next()).await.unwrap());
assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err());
Ok(())
}