test(event cache): add tests for automatic thread subscriptions

This commit is contained in:
Benjamin Bouvier
2025-08-14 12:21:11 +02:00
parent c25be8b070
commit 64eecd0aee
2 changed files with 361 additions and 2 deletions

View File

@@ -3950,6 +3950,13 @@ impl<'a> MockEndpoint<'a, PutThreadSubscriptionEndpoint> {
self.endpoint.matchers = self.endpoint.matchers.match_thread_id(thread_root);
self
}
/// Match the request body's `automatic` field against a specific event id.
pub fn match_automatic_event_id(mut self, up_to_event_id: &EventId) -> Self {
self.mock = self.mock.and(body_json(json!({
"automatic": up_to_event_id
})));
self
}
}
/// A prebuilt mock for `DELETE

View File

@@ -1,19 +1,30 @@
use std::time::Duration;
use assert_matches2::assert_let;
use eyeball_im::VectorDiff;
use imbl::Vector;
use matrix_sdk::{
assert_let_timeout,
deserialized_responses::{ThreadSummaryStatus, TimelineEvent},
event_cache::{RoomEventCacheUpdate, ThreadEventCacheUpdate},
event_cache::{RoomEventCacheSubscriber, RoomEventCacheUpdate, ThreadEventCacheUpdate},
sleep::sleep,
test_utils::{
assert_event_matches_msg,
mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
},
Client, ThreadingSupport,
};
use matrix_sdk_test::{
async_test, event_factory::EventFactory, GlobalAccountDataTestEvent, JoinedRoomBuilder, ALICE,
};
use ruma::{event_id, room_id, user_id};
use ruma::{
event_id,
events::{AnySyncTimelineEvent, Mentions},
push::{ConditionalPushRule, Ruleset},
room_id,
serde::Raw,
user_id, OwnedEventId, OwnedRoomId,
};
use serde_json::json;
use tokio::sync::broadcast;
@@ -433,3 +444,344 @@ async fn test_deduplication() {
// The events were already known, so the stream is still empty.
assert!(thread_stream.is_empty());
}
struct ThreadSubscriptionTestSetup {
server: MatrixMockServer,
client: Client,
factory: EventFactory,
room_id: OwnedRoomId,
subscriber: RoomEventCacheSubscriber,
/// 3 events: 1 non-mention, 1 mention, and another non-mention.
events: Vec<Raw<AnySyncTimelineEvent>>,
mention_event_id: OwnedEventId,
thread_root: OwnedEventId,
}
/// Create a new setup for a thread subscription test, with enough data so that
/// a push context can be created.
///
/// The setup uses custom push rules, to trigger notifications only on mentions.
///
/// The setup includes 3 events (1 non-mention, 1 mention, and another
/// non-mention) in the same thread, for easy testing of automated
/// subscriptions.
async fn thread_subscription_test_setup() -> ThreadSubscriptionTestSetup {
let server = MatrixMockServer::new().await;
let thread_root = event_id!("$thread_root");
// Assuming a client that's interested in thread subscriptions,
let client = server
.client_builder()
.on_builder(|builder| {
builder.with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
})
.build()
.await;
// Immediately subscribe the event cache to sync updates.
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (initial_events, mut subscriber) = room_event_cache.subscribe().await;
assert!(initial_events.is_empty());
assert!(subscriber.is_empty());
// Provide a dummy sync with the room's member profile of the current user, so
// the push context can be created.
let own_user_id = client.user_id().unwrap();
let f = EventFactory::new().room(room_id).sender(*ALICE);
let member = f.member(own_user_id).sender(own_user_id);
// Override push rules so that only an intentional mention causes a
// notification.
let mut push_rules = Ruleset::default();
push_rules.override_.insert(ConditionalPushRule::is_user_mention(own_user_id));
server
.mock_sync()
.ok_and_run(&client, |sync_builder| {
sync_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(member));
sync_builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
"type": "m.push_rules",
"content": {
"global": push_rules
}
})));
})
.await;
// Wait for the initial sync processing to complete; it will trigger a member
// update, at the very least.
assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateMembers { .. }) = subscriber.recv());
let first_reply_event_id = event_id!("$first_reply");
let first_reply = f
.text_msg("hey there")
.in_thread(thread_root, thread_root)
.event_id(first_reply_event_id)
.into_raw();
let second_reply_event_id = event_id!("$second_reply");
let second_reply = f
.text_msg("hoy test user!")
.mentions(Mentions::with_user_ids([own_user_id.to_owned()]))
.in_thread(thread_root, first_reply_event_id)
.event_id(second_reply_event_id)
.into_raw();
let third_reply_event_id = event_id!("$third_reply");
let third_reply = f
.text_msg("ciao!")
.in_thread(thread_root, second_reply_event_id)
.event_id(third_reply_event_id)
.into_raw();
ThreadSubscriptionTestSetup {
server,
client,
factory: f,
subscriber,
events: vec![first_reply, second_reply, third_reply],
mention_event_id: second_reply_event_id.to_owned(),
thread_root: thread_root.to_owned(),
room_id: room_id.to_owned(),
}
}
#[async_test]
async fn test_auto_subscribe_thread_via_sync() {
let mut s = thread_subscription_test_setup().await;
// (The endpoint will be called for the current thread, and with an automatic
// subscription up to the given event ID.)
s.server
.mock_put_thread_subscription()
.match_automatic_event_id(&s.mention_event_id)
.match_thread_id(s.thread_root.to_owned())
.ok()
.mock_once()
.mount()
.await;
let mut thread_subscriber_updates =
s.client.event_cache().subscribe_thread_subscriber_updates();
// When I receive 3 events (1 non mention, 1 mention, then 1 non mention again),
// from sync, I'll get subscribed to the thread because of the second event.
s.server
.sync_room(&s.client, JoinedRoomBuilder::new(&s.room_id).add_timeline_bulk(s.events))
.await;
// Let the event cache process the update.
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = s.subscriber.recv()
);
assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv());
// The actual check is the `mock_once` call above!
}
#[async_test]
async fn test_dont_auto_subscribe_on_already_subscribed_thread() {
let mut s = thread_subscription_test_setup().await;
// Given a thread I'm already subscribed to,
s.server
.mock_get_thread_subscription()
.match_thread_id(s.thread_root.to_owned())
.ok(false)
.mock_once()
.mount()
.await;
// The PUT endpoint (to subscribe to the thread) shouldn't be called…
s.server.mock_put_thread_subscription().ok().expect(0).mount().await;
// …when I receive a new in-thread mention for this thread.
s.server
.sync_room(&s.client, JoinedRoomBuilder::new(&s.room_id).add_timeline_bulk(s.events))
.await;
// Let the event cache process the update.
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = s.subscriber.recv()
);
// Let a bit of time for the background thread subscriber task to process the
// update.
sleep(Duration::from_millis(200)).await;
// The actual check is the `expect` call above!
}
#[async_test]
async fn test_auto_subscribe_on_thread_paginate() {
// In this scenario, we're back-paginating a thread and making sure that the
// back-paginated events do cause a subscription.
let s = thread_subscription_test_setup().await;
let event_cache = s.client.event_cache();
event_cache.subscribe().unwrap();
let mut thread_subscriber_updates =
s.client.event_cache().subscribe_thread_subscriber_updates();
let thread_root_id = event_id!("$thread_root");
let thread_resp_id = event_id!("$thread_resp");
// Receive an in-thread event.
let room = s
.server
.sync_room(
&s.client,
JoinedRoomBuilder::new(&s.room_id).add_timeline_event(
s.factory
.text_msg("that's a good point")
.in_thread(thread_root_id, thread_root_id)
.event_id(thread_resp_id),
),
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (thread_events, mut thread_stream) =
room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await;
// Sanity check: the sync event is added to the thread.
let mut thread_events = wait_for_initial_events(thread_events, &mut thread_stream).await;
assert_eq!(thread_events.len(), 1);
assert_eq!(thread_events.remove(0).event_id().as_deref(), Some(thread_resp_id));
assert!(thread_subscriber_updates.is_empty());
// It's possible to paginate the thread, and this will push the thread root
// because there's no prev-batch token.
let reversed_events = s.events.into_iter().rev().map(Raw::cast_unchecked).collect();
s.server
.mock_room_relations()
.match_target_event(thread_root_id.to_owned())
.ok(RoomRelationsResponseTemplate::default().events(reversed_events))
.mock_once()
.mount()
.await;
s.server
.mock_room_event()
.match_event_id()
.ok(s.factory.text_msg("Thread root").event_id(thread_root_id).into())
.mock_once()
.mount()
.await;
// (The endpoint will be called for the current thread, and with an automatic
// subscription up to the given event ID.)
s.server
.mock_put_thread_subscription()
.match_automatic_event_id(&s.mention_event_id)
.match_thread_id(s.thread_root.to_owned())
.ok()
.mock_once()
.mount()
.await;
let hit_start =
room_event_cache.paginate_thread_backwards(thread_root_id.to_owned(), 42).await.unwrap();
assert!(hit_start);
// Let the event cache process the update.
assert_let_timeout!(Ok(ThreadEventCacheUpdate { .. }) = thread_stream.recv());
assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv());
assert!(thread_subscriber_updates.is_empty());
}
#[async_test]
async fn test_auto_subscribe_on_thread_paginate_root_event() {
// In this scenario, the root of a thread is the event that would cause the
// subscription.
let s = thread_subscription_test_setup().await;
let event_cache = s.client.event_cache();
event_cache.subscribe().unwrap();
let mut thread_subscriber_updates =
s.client.event_cache().subscribe_thread_subscriber_updates();
let thread_root_id = event_id!("$thread_root");
let thread_resp_id = event_id!("$thread_resp");
// Receive an in-thread event.
let room = s
.server
.sync_room(
&s.client,
JoinedRoomBuilder::new(&s.room_id).add_timeline_event(
s.factory
.text_msg("that's a good point")
.in_thread(thread_root_id, thread_root_id)
.event_id(thread_resp_id),
),
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (thread_events, mut thread_stream) =
room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await;
// Sanity check: the sync event is added to the thread.
let mut thread_events = wait_for_initial_events(thread_events, &mut thread_stream).await;
assert_eq!(thread_events.len(), 1);
assert_eq!(thread_events.remove(0).event_id().as_deref(), Some(thread_resp_id));
assert!(thread_subscriber_updates.is_empty());
// It's possible to paginate the thread, and this will push the thread root
// because there's no prev-batch token.
s.server
.mock_room_relations()
.match_target_event(thread_root_id.to_owned())
.ok(RoomRelationsResponseTemplate::default())
.mock_once()
.mount()
.await;
s.server
.mock_room_event()
.match_event_id()
.ok(s
.factory
.text_msg("da r00t")
.event_id(thread_root_id)
.mentions(Mentions::with_user_ids(s.client.user_id().map(ToOwned::to_owned)))
.into())
.mock_once()
.mount()
.await;
// (The endpoint will be called for the current thread, and with an automatic
// subscription up to the given event ID.)
s.server
.mock_put_thread_subscription()
.match_automatic_event_id(thread_root_id)
.match_thread_id(thread_root_id.to_owned())
.ok()
.mock_once()
.mount()
.await;
let hit_start =
room_event_cache.paginate_thread_backwards(thread_root_id.to_owned(), 42).await.unwrap();
assert!(hit_start);
// Let the event cache process the update.
assert_let_timeout!(Ok(ThreadEventCacheUpdate { .. }) = thread_stream.recv());
assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv());
}