tests: add some sliding sync tests for thread subscriptions and catchup

This commit is contained in:
Benjamin Bouvier
2025-09-01 16:47:50 +02:00
parent a3704c3563
commit e89ac3d7df
3 changed files with 281 additions and 8 deletions

View File

@@ -659,15 +659,19 @@ impl SlidingSync {
|| !self.inner.lists.read().await.is_empty()
}
/// Send a single sliding sync request, and returns the response summary.
///
/// Public for testing purposes only.
#[doc(hidden)]
#[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
async fn sync_once(&self) -> Result<UpdateSummary> {
pub async fn sync_once(&self) -> Result<UpdateSummary> {
let (request, request_config, position_guard) =
self.generate_sync_request(&mut LazyTransactionId::new()).await?;
// Send the request, kaboom.
// Send the request.
let summaries = self.send_sync_request(request, request_config, position_guard).await?;
// Notify a new sync was received
// Notify a new sync was received.
self.inner.client.inner.sync_beat.notify(usize::MAX);
Ok(summaries)

View File

@@ -33,6 +33,7 @@ use ruma::{
api::client::{
receipt::create_receipt::v3::ReceiptType,
room::Visibility,
sync::sync_events::v5,
threads::get_thread_subscriptions_changes::unstable::{
ThreadSubscription, ThreadUnsubscription,
},
@@ -68,7 +69,7 @@ pub mod encryption;
pub mod oauth;
use super::client::MockClientBuilder;
use crate::{room::IncludeRelations, Client, OwnedServerName, Room};
use crate::{room::IncludeRelations, Client, OwnedServerName, Room, SlidingSyncBuilder};
/// Structure used to store the crypto keys uploaded to the server.
/// They will be served back to clients when requested.
@@ -353,6 +354,13 @@ impl MatrixMockServer {
)
}
/// Mocks the sliding sync endpoint.
pub fn mock_sliding_sync(&self) -> MockEndpoint<'_, SlidingSyncEndpoint> {
let mock = Mock::given(method("POST"))
.and(path("/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"));
self.mock_endpoint(mock, SlidingSyncEndpoint)
}
/// Creates a prebuilt mock for joining a room.
///
/// # Examples
@@ -4172,6 +4180,8 @@ pub struct GetThreadSubscriptionsEndpoint {
subscribed: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadSubscription>>,
/// New thread unsubscriptions per (room id, thread root event id).
unsubscribed: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadUnsubscription>>,
/// Optional delay to respond to the query.
delay: Option<Duration>,
}
impl<'a> MockEndpoint<'a, GetThreadSubscriptionsEndpoint> {
@@ -4197,6 +4207,12 @@ impl<'a> MockEndpoint<'a, GetThreadSubscriptionsEndpoint> {
self
}
/// Respond with a given delay to the query.
pub fn with_delay(mut self, delay: Duration) -> Self {
self.endpoint.delay = Some(delay);
self
}
/// Match the `from` query parameter to a given value.
pub fn match_from(self, from: &str) -> Self {
Self { mock: self.mock.and(query_param("from", from)), ..self }
@@ -4214,7 +4230,14 @@ impl<'a> MockEndpoint<'a, GetThreadSubscriptionsEndpoint> {
"unsubscribed": self.endpoint.unsubscribed,
"end": end,
});
self.respond_with(ResponseTemplate::new(200).set_body_json(response_body))
let mut template = ResponseTemplate::new(200).set_body_json(response_body);
if let Some(delay) = self.endpoint.delay {
template = template.set_delay(delay);
}
self.respond_with(template)
}
}
@@ -4280,3 +4303,39 @@ impl<'a> MockEndpoint<'a, GetHierarchyEndpoint> {
})))
}
}
/// A prebuilt mock for running simplified sliding sync.
pub struct SlidingSyncEndpoint;
impl<'a> MockEndpoint<'a, SlidingSyncEndpoint> {
/// Mocks the sliding sync endpoint with the given response.
pub fn ok(self, response: v5::Response) -> MatrixMock<'a> {
// A bit silly that we need to destructure all the fields ourselves, but
// Response isn't serializable :'(
self.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"txn_id": response.txn_id,
"pos": response.pos,
"lists": response.lists,
"rooms": response.rooms,
"extensions": response.extensions,
})))
}
/// Temporarily mocks the sync with the given endpoint and runs a client
/// sync with it.
///
/// After calling this function, the sync endpoint isn't mocked anymore.
pub async fn ok_and_run<F: FnOnce(SlidingSyncBuilder) -> SlidingSyncBuilder>(
self,
client: &Client,
on_builder: F,
response: v5::Response,
) {
let _scope = self.ok(response).mount_as_scoped().await;
let sliding_sync =
on_builder(client.sliding_sync("test_id").unwrap()).build().await.unwrap();
let _summary = sliding_sync.sync_once().await.unwrap();
}
}

View File

@@ -6,12 +6,13 @@ use futures_util::FutureExt;
use matrix_sdk::{
authentication::oauth::{error::OAuthTokenRevocationError, OAuthError},
config::{RequestConfig, StoreConfig, SyncSettings, SyncToken},
store::RoomLoadSettings,
sleep::sleep,
store::{RoomLoadSettings, ThreadSubscriptionStatus},
sync::{RoomUpdate, State},
test_utils::{
client::mock_matrix_session, mocks::MatrixMockServer, no_retry_test_client_with_server,
},
Client, Error, MemoryStore, StateChanges, StateStore,
Client, Error, MemoryStore, SlidingSyncList, StateChanges, StateStore, ThreadingSupport,
};
use matrix_sdk_base::{sync::RoomUpdates, RoomState};
use matrix_sdk_common::executor::spawn;
@@ -36,6 +37,7 @@ use ruma::{
get_public_rooms,
get_public_rooms_filtered::{self, v3::Request as PublicRoomsFilterRequest},
},
sync::sync_events::v5,
threads::get_thread_subscriptions_changes::unstable::{
ThreadSubscription, ThreadUnsubscription,
},
@@ -56,7 +58,7 @@ use ruma::{
room::JoinRule,
room_id,
serde::Raw,
uint, user_id, OwnedUserId,
uint, user_id, EventId, OwnedUserId, RoomId,
};
use serde_json::{json, Value as JsonValue};
use stream_assert::{assert_next_matches, assert_pending};
@@ -1587,3 +1589,211 @@ async fn test_fetch_thread_subscriptions() {
let u = &response.unsubscribed[&room3][&thread3];
assert_eq!(u.bump_stamp, uint!(13));
}
/// Create a sliding sync thread_subscription response with no `prev_batch`
/// token.
fn thread_subscription_response(
room1: &RoomId,
thread1: &EventId,
room2: &RoomId,
thread2: &EventId,
) -> v5::response::ThreadSubscriptions {
assign!(v5::response::ThreadSubscriptions::default(), {
subscribed: {
let mut map = BTreeMap::new();
map.insert(room1.to_owned(), {
let mut threads = BTreeMap::new();
threads.insert(thread1.to_owned(), ThreadSubscription::new(true, uint!(42)));
threads
});
map
},
unsubscribed: {
let mut map = BTreeMap::new();
map.insert(room2.to_owned(), {
let mut threads = BTreeMap::new();
threads.insert(thread2.to_owned(), ThreadUnsubscription::new(uint!(7)));
threads
});
map
},
prev_batch: None,
})
}
#[async_test]
async fn test_sync_thread_subscriptions() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let room1 = owned_room_id!("!room1:example.com");
let room2 = owned_room_id!("!room2:example.com");
let thread1 = owned_event_id!("$thread1:example.com");
let thread2 = owned_event_id!("$thread2:example.com");
// At first, there are no thread subscriptions at all.
let stored1 = client
.state_store()
.load_thread_subscription(&room1, &thread1)
.await
.expect("loading room1/thread1 works fine");
assert_matches!(stored1, None);
let stored2 = client
.state_store()
.load_thread_subscription(&room2, &thread2)
.await
.expect("loading room2/thread2 works fine");
assert_matches!(stored2, None);
// When I sliding-sync thread subscriptions,
server
.mock_sliding_sync()
.ok_and_run(
&client,
|config_builder| {
config_builder.with_thread_subscriptions_extension(
assign!(v5::request::ThreadSubscriptions::default(), {
enabled: Some(true),
limit: Some(uint!(10)),
}),
)
},
assign!(v5::Response::new("pos".to_owned()), {
extensions: assign!(v5::response::Extensions::default(), {
thread_subscriptions: thread_subscription_response(
&room1, &thread1, &room2, &thread2,
),
}),
}),
)
.await;
// Then they're stored in the local database.
let stored1 = client
.state_store()
.load_thread_subscription(&room1, &thread1)
.await
.expect("loading room1/thread1 works fine")
.expect("found room1/thread1 subscription");
assert_eq!(stored1.status, ThreadSubscriptionStatus::Subscribed { automatic: true });
assert_eq!(stored1.bump_stamp, Some(42));
let stored2 = client
.state_store()
.load_thread_subscription(&room2, &thread2)
.await
.expect("loading room2/thread2 works fine")
.expect("found room2/thread2 unsubscription");
assert_eq!(stored2.status, ThreadSubscriptionStatus::Unsubscribed);
assert_eq!(stored2.bump_stamp, Some(7));
}
#[async_test]
async fn test_sync_thread_subscriptions_with_catchup() {
let server = MatrixMockServer::new().await;
let client = server
.client_builder()
.on_builder(|builder| {
builder.with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
})
.build()
.await;
let room_id1 = owned_room_id!("!room1:example.com");
let room_id2 = owned_room_id!("!room2:example.com");
let thread1 = owned_event_id!("$thread1:example.com");
let thread2 = owned_event_id!("$thread2:example.com");
let thread3 = owned_event_id!("$thread3:example.com");
// The provided catchup token will be used to fetch more thread
// subscriptions via the msc4308 companion endpoint.
server
.mock_get_thread_subscriptions()
.match_from("catchup_token")
.add_subscription(
room_id1.clone(),
thread3.clone(),
ThreadSubscription::new(false, uint!(1337)),
)
.with_delay(Duration::from_millis(300)) // Simulate some network delay.
// No more subscriptions after the first catchup request.
.ok(None)
.mock_once()
.mount()
.await;
// When I sliding-sync thread subscriptions, and the response includes this
// catch-up token,
let mut thread_subscriptions =
thread_subscription_response(&room_id1, &thread1, &room_id2, &thread2);
thread_subscriptions.prev_batch = Some("catchup_token".to_owned());
server
.mock_sliding_sync()
.ok_and_run(
&client,
|config_builder| {
config_builder
.with_thread_subscriptions_extension(
assign!(v5::request::ThreadSubscriptions::default(), {
enabled: Some(true),
limit: Some(uint!(10)),
}),
)
.add_list(SlidingSyncList::builder("rooms"))
},
assign!(v5::Response::new("pos".to_owned()), {
rooms: {
let mut rooms = BTreeMap::new();
rooms.insert(room_id1.clone(), v5::response::Room::default());
rooms.insert(room_id2.clone(), v5::response::Room::default());
rooms
},
extensions: assign!(v5::response::Extensions::default(), {
thread_subscriptions,
}),
}),
)
.await;
// If I try to get the subscription status for thread 1, it's still hitting
// network, because it doesn't know yet about the result of the catch-up
// request. (Ideally, the choice of whether some information is outdated or
// not would be per room/thread pair, but for simplicity it's global right
// now.)
server
.mock_room_get_thread_subscription()
.match_room_id(room_id1.clone())
.match_thread_id(thread1.clone())
.ok(true)
.mock_once()
.mount()
.await;
let room1 = client.get_room(&room_id1).unwrap();
let sub1 = room1.load_or_fetch_thread_subscription(&thread1).await.unwrap();
assert_eq!(sub1, Some(matrix_sdk::room::ThreadSubscription { automatic: true }));
// All the thread subscriptions are eventually known in the database.
sleep(Duration::from_millis(400)).await;
let stored3 = client
.state_store()
.load_thread_subscription(&room_id1, &thread3)
.await
.expect("loading room1/thread3 works fine")
.expect("found room1/thread3 subscription");
assert_eq!(stored3.status, ThreadSubscriptionStatus::Subscribed { automatic: false });
assert_eq!(stored3.bump_stamp, Some(1337));
// So the client will use the database only to load_or_fetch thread
// subscriptions. (Which is confirmed by the absence of mocking the
// room_get_thread_subscription endpoint for thread3.)
let sub3 = room1.load_or_fetch_thread_subscription(&thread3).await.unwrap();
assert_eq!(sub3, Some(matrix_sdk::room::ThreadSubscription { automatic: false }));
}