From e89ac3d7df992e6f84ce20d757d61cd3df86b811 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 1 Sep 2025 16:47:50 +0200 Subject: [PATCH] tests: add some sliding sync tests for thread subscriptions and catchup --- crates/matrix-sdk/src/sliding_sync/mod.rs | 10 +- crates/matrix-sdk/src/test_utils/mocks/mod.rs | 63 ++++- crates/matrix-sdk/tests/integration/client.rs | 216 +++++++++++++++++- 3 files changed, 281 insertions(+), 8 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 63e2f2b86..eae13a49e 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -659,15 +659,19 @@ impl SlidingSync { || !self.inner.lists.read().await.is_empty() } + /// Send a single sliding sync request, and returns the response summary. + /// + /// Public for testing purposes only. + #[doc(hidden)] #[instrument(skip_all, fields(pos, conn_id = self.inner.id))] - async fn sync_once(&self) -> Result { + pub async fn sync_once(&self) -> Result { let (request, request_config, position_guard) = self.generate_sync_request(&mut LazyTransactionId::new()).await?; - // Send the request, kaboom. + // Send the request. let summaries = self.send_sync_request(request, request_config, position_guard).await?; - // Notify a new sync was received + // Notify a new sync was received. self.inner.client.inner.sync_beat.notify(usize::MAX); Ok(summaries) diff --git a/crates/matrix-sdk/src/test_utils/mocks/mod.rs b/crates/matrix-sdk/src/test_utils/mocks/mod.rs index b72a28dcf..36ed4a2e6 100644 --- a/crates/matrix-sdk/src/test_utils/mocks/mod.rs +++ b/crates/matrix-sdk/src/test_utils/mocks/mod.rs @@ -33,6 +33,7 @@ use ruma::{ api::client::{ receipt::create_receipt::v3::ReceiptType, room::Visibility, + sync::sync_events::v5, threads::get_thread_subscriptions_changes::unstable::{ ThreadSubscription, ThreadUnsubscription, }, @@ -68,7 +69,7 @@ pub mod encryption; pub mod oauth; use super::client::MockClientBuilder; -use crate::{room::IncludeRelations, Client, OwnedServerName, Room}; +use crate::{room::IncludeRelations, Client, OwnedServerName, Room, SlidingSyncBuilder}; /// Structure used to store the crypto keys uploaded to the server. /// They will be served back to clients when requested. @@ -353,6 +354,13 @@ impl MatrixMockServer { ) } + /// Mocks the sliding sync endpoint. + pub fn mock_sliding_sync(&self) -> MockEndpoint<'_, SlidingSyncEndpoint> { + let mock = Mock::given(method("POST")) + .and(path("/_matrix/client/unstable/org.matrix.simplified_msc3575/sync")); + self.mock_endpoint(mock, SlidingSyncEndpoint) + } + /// Creates a prebuilt mock for joining a room. /// /// # Examples @@ -4172,6 +4180,8 @@ pub struct GetThreadSubscriptionsEndpoint { subscribed: BTreeMap>, /// New thread unsubscriptions per (room id, thread root event id). unsubscribed: BTreeMap>, + /// Optional delay to respond to the query. + delay: Option, } impl<'a> MockEndpoint<'a, GetThreadSubscriptionsEndpoint> { @@ -4197,6 +4207,12 @@ impl<'a> MockEndpoint<'a, GetThreadSubscriptionsEndpoint> { self } + /// Respond with a given delay to the query. + pub fn with_delay(mut self, delay: Duration) -> Self { + self.endpoint.delay = Some(delay); + self + } + /// Match the `from` query parameter to a given value. pub fn match_from(self, from: &str) -> Self { Self { mock: self.mock.and(query_param("from", from)), ..self } @@ -4214,7 +4230,14 @@ impl<'a> MockEndpoint<'a, GetThreadSubscriptionsEndpoint> { "unsubscribed": self.endpoint.unsubscribed, "end": end, }); - self.respond_with(ResponseTemplate::new(200).set_body_json(response_body)) + + let mut template = ResponseTemplate::new(200).set_body_json(response_body); + + if let Some(delay) = self.endpoint.delay { + template = template.set_delay(delay); + } + + self.respond_with(template) } } @@ -4280,3 +4303,39 @@ impl<'a> MockEndpoint<'a, GetHierarchyEndpoint> { }))) } } + +/// A prebuilt mock for running simplified sliding sync. +pub struct SlidingSyncEndpoint; + +impl<'a> MockEndpoint<'a, SlidingSyncEndpoint> { + /// Mocks the sliding sync endpoint with the given response. + pub fn ok(self, response: v5::Response) -> MatrixMock<'a> { + // A bit silly that we need to destructure all the fields ourselves, but + // Response isn't serializable :'( + self.respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "txn_id": response.txn_id, + "pos": response.pos, + "lists": response.lists, + "rooms": response.rooms, + "extensions": response.extensions, + }))) + } + + /// Temporarily mocks the sync with the given endpoint and runs a client + /// sync with it. + /// + /// After calling this function, the sync endpoint isn't mocked anymore. + pub async fn ok_and_run SlidingSyncBuilder>( + self, + client: &Client, + on_builder: F, + response: v5::Response, + ) { + let _scope = self.ok(response).mount_as_scoped().await; + + let sliding_sync = + on_builder(client.sliding_sync("test_id").unwrap()).build().await.unwrap(); + + let _summary = sliding_sync.sync_once().await.unwrap(); + } +} diff --git a/crates/matrix-sdk/tests/integration/client.rs b/crates/matrix-sdk/tests/integration/client.rs index 6bb858574..44cb0954c 100644 --- a/crates/matrix-sdk/tests/integration/client.rs +++ b/crates/matrix-sdk/tests/integration/client.rs @@ -6,12 +6,13 @@ use futures_util::FutureExt; use matrix_sdk::{ authentication::oauth::{error::OAuthTokenRevocationError, OAuthError}, config::{RequestConfig, StoreConfig, SyncSettings, SyncToken}, - store::RoomLoadSettings, + sleep::sleep, + store::{RoomLoadSettings, ThreadSubscriptionStatus}, sync::{RoomUpdate, State}, test_utils::{ client::mock_matrix_session, mocks::MatrixMockServer, no_retry_test_client_with_server, }, - Client, Error, MemoryStore, StateChanges, StateStore, + Client, Error, MemoryStore, SlidingSyncList, StateChanges, StateStore, ThreadingSupport, }; use matrix_sdk_base::{sync::RoomUpdates, RoomState}; use matrix_sdk_common::executor::spawn; @@ -36,6 +37,7 @@ use ruma::{ get_public_rooms, get_public_rooms_filtered::{self, v3::Request as PublicRoomsFilterRequest}, }, + sync::sync_events::v5, threads::get_thread_subscriptions_changes::unstable::{ ThreadSubscription, ThreadUnsubscription, }, @@ -56,7 +58,7 @@ use ruma::{ room::JoinRule, room_id, serde::Raw, - uint, user_id, OwnedUserId, + uint, user_id, EventId, OwnedUserId, RoomId, }; use serde_json::{json, Value as JsonValue}; use stream_assert::{assert_next_matches, assert_pending}; @@ -1587,3 +1589,211 @@ async fn test_fetch_thread_subscriptions() { let u = &response.unsubscribed[&room3][&thread3]; assert_eq!(u.bump_stamp, uint!(13)); } + +/// Create a sliding sync thread_subscription response with no `prev_batch` +/// token. +fn thread_subscription_response( + room1: &RoomId, + thread1: &EventId, + room2: &RoomId, + thread2: &EventId, +) -> v5::response::ThreadSubscriptions { + assign!(v5::response::ThreadSubscriptions::default(), { + subscribed: { + let mut map = BTreeMap::new(); + map.insert(room1.to_owned(), { + let mut threads = BTreeMap::new(); + threads.insert(thread1.to_owned(), ThreadSubscription::new(true, uint!(42))); + threads + }); + map + }, + unsubscribed: { + let mut map = BTreeMap::new(); + map.insert(room2.to_owned(), { + let mut threads = BTreeMap::new(); + threads.insert(thread2.to_owned(), ThreadUnsubscription::new(uint!(7))); + threads + }); + map + }, + prev_batch: None, + }) +} + +#[async_test] +async fn test_sync_thread_subscriptions() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + let room1 = owned_room_id!("!room1:example.com"); + let room2 = owned_room_id!("!room2:example.com"); + + let thread1 = owned_event_id!("$thread1:example.com"); + let thread2 = owned_event_id!("$thread2:example.com"); + + // At first, there are no thread subscriptions at all. + let stored1 = client + .state_store() + .load_thread_subscription(&room1, &thread1) + .await + .expect("loading room1/thread1 works fine"); + assert_matches!(stored1, None); + + let stored2 = client + .state_store() + .load_thread_subscription(&room2, &thread2) + .await + .expect("loading room2/thread2 works fine"); + assert_matches!(stored2, None); + + // When I sliding-sync thread subscriptions, + server + .mock_sliding_sync() + .ok_and_run( + &client, + |config_builder| { + config_builder.with_thread_subscriptions_extension( + assign!(v5::request::ThreadSubscriptions::default(), { + enabled: Some(true), + limit: Some(uint!(10)), + }), + ) + }, + assign!(v5::Response::new("pos".to_owned()), { + extensions: assign!(v5::response::Extensions::default(), { + thread_subscriptions: thread_subscription_response( + &room1, &thread1, &room2, &thread2, + ), + }), + }), + ) + .await; + + // Then they're stored in the local database. + let stored1 = client + .state_store() + .load_thread_subscription(&room1, &thread1) + .await + .expect("loading room1/thread1 works fine") + .expect("found room1/thread1 subscription"); + + assert_eq!(stored1.status, ThreadSubscriptionStatus::Subscribed { automatic: true }); + assert_eq!(stored1.bump_stamp, Some(42)); + + let stored2 = client + .state_store() + .load_thread_subscription(&room2, &thread2) + .await + .expect("loading room2/thread2 works fine") + .expect("found room2/thread2 unsubscription"); + + assert_eq!(stored2.status, ThreadSubscriptionStatus::Unsubscribed); + assert_eq!(stored2.bump_stamp, Some(7)); +} + +#[async_test] +async fn test_sync_thread_subscriptions_with_catchup() { + let server = MatrixMockServer::new().await; + let client = server + .client_builder() + .on_builder(|builder| { + builder.with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true }) + }) + .build() + .await; + + let room_id1 = owned_room_id!("!room1:example.com"); + let room_id2 = owned_room_id!("!room2:example.com"); + + let thread1 = owned_event_id!("$thread1:example.com"); + let thread2 = owned_event_id!("$thread2:example.com"); + let thread3 = owned_event_id!("$thread3:example.com"); + + // The provided catchup token will be used to fetch more thread + // subscriptions via the msc4308 companion endpoint. + server + .mock_get_thread_subscriptions() + .match_from("catchup_token") + .add_subscription( + room_id1.clone(), + thread3.clone(), + ThreadSubscription::new(false, uint!(1337)), + ) + .with_delay(Duration::from_millis(300)) // Simulate some network delay. + // No more subscriptions after the first catchup request. + .ok(None) + .mock_once() + .mount() + .await; + + // When I sliding-sync thread subscriptions, and the response includes this + // catch-up token, + let mut thread_subscriptions = + thread_subscription_response(&room_id1, &thread1, &room_id2, &thread2); + thread_subscriptions.prev_batch = Some("catchup_token".to_owned()); + + server + .mock_sliding_sync() + .ok_and_run( + &client, + |config_builder| { + config_builder + .with_thread_subscriptions_extension( + assign!(v5::request::ThreadSubscriptions::default(), { + enabled: Some(true), + limit: Some(uint!(10)), + }), + ) + .add_list(SlidingSyncList::builder("rooms")) + }, + assign!(v5::Response::new("pos".to_owned()), { + rooms: { + let mut rooms = BTreeMap::new(); + rooms.insert(room_id1.clone(), v5::response::Room::default()); + rooms.insert(room_id2.clone(), v5::response::Room::default()); + rooms + }, + extensions: assign!(v5::response::Extensions::default(), { + thread_subscriptions, + }), + }), + ) + .await; + + // If I try to get the subscription status for thread 1, it's still hitting + // network, because it doesn't know yet about the result of the catch-up + // request. (Ideally, the choice of whether some information is outdated or + // not would be per room/thread pair, but for simplicity it's global right + // now.) + server + .mock_room_get_thread_subscription() + .match_room_id(room_id1.clone()) + .match_thread_id(thread1.clone()) + .ok(true) + .mock_once() + .mount() + .await; + + let room1 = client.get_room(&room_id1).unwrap(); + let sub1 = room1.load_or_fetch_thread_subscription(&thread1).await.unwrap(); + assert_eq!(sub1, Some(matrix_sdk::room::ThreadSubscription { automatic: true })); + + // All the thread subscriptions are eventually known in the database. + sleep(Duration::from_millis(400)).await; + + let stored3 = client + .state_store() + .load_thread_subscription(&room_id1, &thread3) + .await + .expect("loading room1/thread3 works fine") + .expect("found room1/thread3 subscription"); + assert_eq!(stored3.status, ThreadSubscriptionStatus::Subscribed { automatic: false }); + assert_eq!(stored3.bump_stamp, Some(1337)); + + // So the client will use the database only to load_or_fetch thread + // subscriptions. (Which is confirmed by the absence of mocking the + // room_get_thread_subscription endpoint for thread3.) + let sub3 = room1.load_or_fetch_thread_subscription(&thread3).await.unwrap(); + assert_eq!(sub3, Some(matrix_sdk::room::ThreadSubscription { automatic: false })); +}