diff --git a/crates/matrix-sdk/src/test_utils/mocks/mod.rs b/crates/matrix-sdk/src/test_utils/mocks/mod.rs index 1a8d9a9f7..f7465fdfc 100644 --- a/crates/matrix-sdk/src/test_utils/mocks/mod.rs +++ b/crates/matrix-sdk/src/test_utils/mocks/mod.rs @@ -3950,6 +3950,13 @@ impl<'a> MockEndpoint<'a, PutThreadSubscriptionEndpoint> { self.endpoint.matchers = self.endpoint.matchers.match_thread_id(thread_root); self } + /// Match the request body's `automatic` field against a specific event id. + pub fn match_automatic_event_id(mut self, up_to_event_id: &EventId) -> Self { + self.mock = self.mock.and(body_json(json!({ + "automatic": up_to_event_id + }))); + self + } } /// A prebuilt mock for `DELETE diff --git a/crates/matrix-sdk/tests/integration/event_cache/threads.rs b/crates/matrix-sdk/tests/integration/event_cache/threads.rs index fd268baff..ccab2f3af 100644 --- a/crates/matrix-sdk/tests/integration/event_cache/threads.rs +++ b/crates/matrix-sdk/tests/integration/event_cache/threads.rs @@ -1,19 +1,30 @@ +use std::time::Duration; + use assert_matches2::assert_let; use eyeball_im::VectorDiff; use imbl::Vector; use matrix_sdk::{ assert_let_timeout, deserialized_responses::{ThreadSummaryStatus, TimelineEvent}, - event_cache::{RoomEventCacheUpdate, ThreadEventCacheUpdate}, + event_cache::{RoomEventCacheSubscriber, RoomEventCacheUpdate, ThreadEventCacheUpdate}, + sleep::sleep, test_utils::{ assert_event_matches_msg, mocks::{MatrixMockServer, RoomRelationsResponseTemplate}, }, + Client, ThreadingSupport, }; use matrix_sdk_test::{ async_test, event_factory::EventFactory, GlobalAccountDataTestEvent, JoinedRoomBuilder, ALICE, }; -use ruma::{event_id, room_id, user_id}; +use ruma::{ + event_id, + events::{AnySyncTimelineEvent, Mentions}, + push::{ConditionalPushRule, Ruleset}, + room_id, + serde::Raw, + user_id, OwnedEventId, OwnedRoomId, +}; use serde_json::json; use tokio::sync::broadcast; @@ -433,3 +444,344 @@ async fn test_deduplication() { // The events were already known, so the stream is still empty. assert!(thread_stream.is_empty()); } + +struct ThreadSubscriptionTestSetup { + server: MatrixMockServer, + client: Client, + factory: EventFactory, + room_id: OwnedRoomId, + subscriber: RoomEventCacheSubscriber, + /// 3 events: 1 non-mention, 1 mention, and another non-mention. + events: Vec>, + mention_event_id: OwnedEventId, + thread_root: OwnedEventId, +} + +/// Create a new setup for a thread subscription test, with enough data so that +/// a push context can be created. +/// +/// The setup uses custom push rules, to trigger notifications only on mentions. +/// +/// The setup includes 3 events (1 non-mention, 1 mention, and another +/// non-mention) in the same thread, for easy testing of automated +/// subscriptions. +async fn thread_subscription_test_setup() -> ThreadSubscriptionTestSetup { + let server = MatrixMockServer::new().await; + + let thread_root = event_id!("$thread_root"); + + // Assuming a client that's interested in thread subscriptions, + let client = server + .client_builder() + .on_builder(|builder| { + builder.with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true }) + }) + .build() + .await; + + // Immediately subscribe the event cache to sync updates. + client.event_cache().subscribe().unwrap(); + + let room_id = room_id!("!omelette:fromage.fr"); + let room = server.sync_joined_room(&client, room_id).await; + + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + let (initial_events, mut subscriber) = room_event_cache.subscribe().await; + assert!(initial_events.is_empty()); + assert!(subscriber.is_empty()); + + // Provide a dummy sync with the room's member profile of the current user, so + // the push context can be created. + let own_user_id = client.user_id().unwrap(); + let f = EventFactory::new().room(room_id).sender(*ALICE); + let member = f.member(own_user_id).sender(own_user_id); + + // Override push rules so that only an intentional mention causes a + // notification. + let mut push_rules = Ruleset::default(); + push_rules.override_.insert(ConditionalPushRule::is_user_mention(own_user_id)); + + server + .mock_sync() + .ok_and_run(&client, |sync_builder| { + sync_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(member)); + sync_builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({ + "type": "m.push_rules", + "content": { + "global": push_rules + } + }))); + }) + .await; + + // Wait for the initial sync processing to complete; it will trigger a member + // update, at the very least. + assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateMembers { .. }) = subscriber.recv()); + + let first_reply_event_id = event_id!("$first_reply"); + let first_reply = f + .text_msg("hey there") + .in_thread(thread_root, thread_root) + .event_id(first_reply_event_id) + .into_raw(); + + let second_reply_event_id = event_id!("$second_reply"); + let second_reply = f + .text_msg("hoy test user!") + .mentions(Mentions::with_user_ids([own_user_id.to_owned()])) + .in_thread(thread_root, first_reply_event_id) + .event_id(second_reply_event_id) + .into_raw(); + + let third_reply_event_id = event_id!("$third_reply"); + let third_reply = f + .text_msg("ciao!") + .in_thread(thread_root, second_reply_event_id) + .event_id(third_reply_event_id) + .into_raw(); + + ThreadSubscriptionTestSetup { + server, + client, + factory: f, + subscriber, + events: vec![first_reply, second_reply, third_reply], + mention_event_id: second_reply_event_id.to_owned(), + thread_root: thread_root.to_owned(), + room_id: room_id.to_owned(), + } +} + +#[async_test] +async fn test_auto_subscribe_thread_via_sync() { + let mut s = thread_subscription_test_setup().await; + + // (The endpoint will be called for the current thread, and with an automatic + // subscription up to the given event ID.) + s.server + .mock_put_thread_subscription() + .match_automatic_event_id(&s.mention_event_id) + .match_thread_id(s.thread_root.to_owned()) + .ok() + .mock_once() + .mount() + .await; + + let mut thread_subscriber_updates = + s.client.event_cache().subscribe_thread_subscriber_updates(); + + // When I receive 3 events (1 non mention, 1 mention, then 1 non mention again), + // from sync, I'll get subscribed to the thread because of the second event. + s.server + .sync_room(&s.client, JoinedRoomBuilder::new(&s.room_id).add_timeline_bulk(s.events)) + .await; + + // Let the event cache process the update. + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = s.subscriber.recv() + ); + assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv()); + + // The actual check is the `mock_once` call above! +} + +#[async_test] +async fn test_dont_auto_subscribe_on_already_subscribed_thread() { + let mut s = thread_subscription_test_setup().await; + + // Given a thread I'm already subscribed to, + s.server + .mock_get_thread_subscription() + .match_thread_id(s.thread_root.to_owned()) + .ok(false) + .mock_once() + .mount() + .await; + + // The PUT endpoint (to subscribe to the thread) shouldn't be called… + s.server.mock_put_thread_subscription().ok().expect(0).mount().await; + + // …when I receive a new in-thread mention for this thread. + s.server + .sync_room(&s.client, JoinedRoomBuilder::new(&s.room_id).add_timeline_bulk(s.events)) + .await; + + // Let the event cache process the update. + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = s.subscriber.recv() + ); + + // Let a bit of time for the background thread subscriber task to process the + // update. + sleep(Duration::from_millis(200)).await; + + // The actual check is the `expect` call above! +} + +#[async_test] +async fn test_auto_subscribe_on_thread_paginate() { + // In this scenario, we're back-paginating a thread and making sure that the + // back-paginated events do cause a subscription. + + let s = thread_subscription_test_setup().await; + + let event_cache = s.client.event_cache(); + event_cache.subscribe().unwrap(); + + let mut thread_subscriber_updates = + s.client.event_cache().subscribe_thread_subscriber_updates(); + + let thread_root_id = event_id!("$thread_root"); + let thread_resp_id = event_id!("$thread_resp"); + + // Receive an in-thread event. + let room = s + .server + .sync_room( + &s.client, + JoinedRoomBuilder::new(&s.room_id).add_timeline_event( + s.factory + .text_msg("that's a good point") + .in_thread(thread_root_id, thread_root_id) + .event_id(thread_resp_id), + ), + ) + .await; + + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + let (thread_events, mut thread_stream) = + room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await; + + // Sanity check: the sync event is added to the thread. + let mut thread_events = wait_for_initial_events(thread_events, &mut thread_stream).await; + assert_eq!(thread_events.len(), 1); + assert_eq!(thread_events.remove(0).event_id().as_deref(), Some(thread_resp_id)); + + assert!(thread_subscriber_updates.is_empty()); + + // It's possible to paginate the thread, and this will push the thread root + // because there's no prev-batch token. + let reversed_events = s.events.into_iter().rev().map(Raw::cast_unchecked).collect(); + s.server + .mock_room_relations() + .match_target_event(thread_root_id.to_owned()) + .ok(RoomRelationsResponseTemplate::default().events(reversed_events)) + .mock_once() + .mount() + .await; + + s.server + .mock_room_event() + .match_event_id() + .ok(s.factory.text_msg("Thread root").event_id(thread_root_id).into()) + .mock_once() + .mount() + .await; + + // (The endpoint will be called for the current thread, and with an automatic + // subscription up to the given event ID.) + s.server + .mock_put_thread_subscription() + .match_automatic_event_id(&s.mention_event_id) + .match_thread_id(s.thread_root.to_owned()) + .ok() + .mock_once() + .mount() + .await; + + let hit_start = + room_event_cache.paginate_thread_backwards(thread_root_id.to_owned(), 42).await.unwrap(); + assert!(hit_start); + + // Let the event cache process the update. + assert_let_timeout!(Ok(ThreadEventCacheUpdate { .. }) = thread_stream.recv()); + assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv()); + assert!(thread_subscriber_updates.is_empty()); +} + +#[async_test] +async fn test_auto_subscribe_on_thread_paginate_root_event() { + // In this scenario, the root of a thread is the event that would cause the + // subscription. + + let s = thread_subscription_test_setup().await; + + let event_cache = s.client.event_cache(); + event_cache.subscribe().unwrap(); + + let mut thread_subscriber_updates = + s.client.event_cache().subscribe_thread_subscriber_updates(); + + let thread_root_id = event_id!("$thread_root"); + let thread_resp_id = event_id!("$thread_resp"); + + // Receive an in-thread event. + let room = s + .server + .sync_room( + &s.client, + JoinedRoomBuilder::new(&s.room_id).add_timeline_event( + s.factory + .text_msg("that's a good point") + .in_thread(thread_root_id, thread_root_id) + .event_id(thread_resp_id), + ), + ) + .await; + + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + let (thread_events, mut thread_stream) = + room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await; + + // Sanity check: the sync event is added to the thread. + let mut thread_events = wait_for_initial_events(thread_events, &mut thread_stream).await; + assert_eq!(thread_events.len(), 1); + assert_eq!(thread_events.remove(0).event_id().as_deref(), Some(thread_resp_id)); + + assert!(thread_subscriber_updates.is_empty()); + + // It's possible to paginate the thread, and this will push the thread root + // because there's no prev-batch token. + s.server + .mock_room_relations() + .match_target_event(thread_root_id.to_owned()) + .ok(RoomRelationsResponseTemplate::default()) + .mock_once() + .mount() + .await; + + s.server + .mock_room_event() + .match_event_id() + .ok(s + .factory + .text_msg("da r00t") + .event_id(thread_root_id) + .mentions(Mentions::with_user_ids(s.client.user_id().map(ToOwned::to_owned))) + .into()) + .mock_once() + .mount() + .await; + + // (The endpoint will be called for the current thread, and with an automatic + // subscription up to the given event ID.) + s.server + .mock_put_thread_subscription() + .match_automatic_event_id(thread_root_id) + .match_thread_id(thread_root_id.to_owned()) + .ok() + .mock_once() + .mount() + .await; + + let hit_start = + room_event_cache.paginate_thread_backwards(thread_root_id.to_owned(), 42).await.unwrap(); + assert!(hit_start); + + // Let the event cache process the update. + assert_let_timeout!(Ok(ThreadEventCacheUpdate { .. }) = thread_stream.recv()); + assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv()); +}