refactor(event cache): only process threaded linked chunks if thread support has been globally enabled

This commit is contained in:
Benjamin Bouvier
2025-08-26 13:17:56 +02:00
parent d66733052a
commit f4ce4356ab
4 changed files with 59 additions and 26 deletions

View File

@@ -18,7 +18,7 @@ use assert_matches2::{assert_let, assert_matches};
use eyeball_im::VectorDiff;
use futures_util::StreamExt as _;
use matrix_sdk::{
assert_let_timeout,
Client, ThreadingSupport, assert_let_timeout,
test_utils::mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
};
use matrix_sdk_test::{
@@ -48,10 +48,20 @@ use ruma::{
use stream_assert::assert_pending;
use tokio::task::yield_now;
async fn client_with_threading_support(server: &MatrixMockServer) -> Client {
server
.client_builder()
.on_builder(|builder| {
builder.with_threading_support(ThreadingSupport::Enabled { with_subscriptions: false })
})
.build()
.await
}
#[async_test]
async fn test_new_empty_thread() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
@@ -75,7 +85,7 @@ async fn test_new_empty_thread() {
#[async_test]
async fn test_thread_backpagination() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let sender_id = user_id!("@alice:b.c");
@@ -236,7 +246,7 @@ async fn test_extract_bundled_thread_summary() {
// `ThreadSummary` in the associated timeline content.
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let room = server.sync_joined_room(&client, room_id).await;
@@ -291,7 +301,7 @@ async fn test_new_thread_reply_causes_thread_summary_update() {
// summary to be updated.
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let room = server.sync_joined_room(&client, room_id).await;
@@ -446,7 +456,7 @@ async fn test_thread_filtering_for_sync() {
// - a thread timeline will show the threaded events
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let sender_id = user_id!("@alice:b.c");
@@ -603,7 +613,7 @@ async fn test_thread_timeline_gets_related_events_from_sync() {
// gets updated.
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let sender_id = user_id!("@alice:b.c");
@@ -696,7 +706,7 @@ async fn test_thread_timeline_gets_local_echoes() {
// gets updated. If the event is a reaction, it gets updated too.
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let thread_root_event_id = owned_event_id!("$root");
@@ -823,7 +833,7 @@ async fn test_thread_timeline_can_send_edit() {
// relationship).
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let thread_root_event_id = owned_event_id!("$root");
@@ -907,7 +917,7 @@ async fn test_send_sticker_thread() {
// set the threaded relationship does kick in).
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let thread_root_event_id = owned_event_id!("$root");
@@ -985,7 +995,7 @@ async fn test_send_poll_thread() {
// set the threaded relationship does kick in).
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let thread_root_event_id = owned_event_id!("$root");
@@ -1069,7 +1079,7 @@ async fn test_sending_read_receipt_with_no_events_doesnt_unset_read_flag() {
// unread flag on the room.
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let thread_root_event_id = owned_event_id!("$root");
@@ -1109,7 +1119,7 @@ async fn test_read_receipts() {
// Threaded read receipts are correctly handled in a thread timeline.
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let thread_root = owned_event_id!("$root");
@@ -1248,7 +1258,7 @@ async fn test_initial_read_receipts_are_correctly_populated() {
// populated in the timeline.
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!a:b.c");
@@ -1312,7 +1322,7 @@ async fn test_send_read_receipts() {
// read receipt on an event that had one is a no-op.
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
client.event_cache().subscribe().unwrap();
let user_id = client.user_id().unwrap();

View File

@@ -47,7 +47,7 @@ use matrix_sdk_base::{
serde_helpers::extract_thread_root_from_content,
store_locks::LockStoreError,
sync::RoomUpdates,
timer,
timer, ThreadingSupport,
};
use matrix_sdk_common::executor::{spawn, JoinHandle};
use room::RoomEventCacheState;
@@ -998,9 +998,15 @@ impl EventCacheInner {
.ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
let room_version_rules = room.clone_info().room_version_rules_or_default();
let enabled_thread_support = matches!(
client.base_client().threading_support,
ThreadingSupport::Enabled { .. }
);
let room_state = RoomEventCacheState::new(
room_id.to_owned(),
room_version_rules,
enabled_thread_support,
self.linked_chunk_update_sender.clone(),
self.store.clone(),
pagination_status.clone(),

View File

@@ -658,6 +658,9 @@ mod private {
/// The rules for the version of this room.
room_version_rules: RoomVersionRules,
/// Whether thread support has been enabled for the event cache.
enabled_thread_support: bool,
/// Reference to the underlying backing store.
store: EventCacheStoreLock,
@@ -700,6 +703,7 @@ mod private {
pub async fn new(
room_id: OwnedRoomId,
room_version_rules: RoomVersionRules,
enabled_thread_support: bool,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
store: EventCacheStoreLock,
pagination_status: SharedObservable<RoomPaginationStatus>,
@@ -767,6 +771,7 @@ mod private {
Ok(Self {
room: room_id,
room_version_rules,
enabled_thread_support,
store,
room_linked_chunk,
threads,
@@ -1459,12 +1464,14 @@ mod private {
for event in events {
self.maybe_apply_new_redaction(&event).await?;
if let Some(thread_root) = extract_thread_root(event.raw()) {
new_events_by_thread.entry(thread_root).or_default().push(event.clone());
} else if let Some(event_id) = event.event_id() {
// If we spot the root of a thread, add it to its linked chunk, in sync mode.
if self.threads.contains_key(&event_id) {
new_events_by_thread.entry(event_id).or_default().push(event.clone());
if self.enabled_thread_support {
if let Some(thread_root) = extract_thread_root(event.raw()) {
new_events_by_thread.entry(thread_root).or_default().push(event.clone());
} else if let Some(event_id) = event.event_id() {
// If we spot the root of a thread, add it to its linked chunk.
if self.threads.contains_key(&event_id) {
new_events_by_thread.entry(event_id).or_default().push(event.clone());
}
}
}

View File

@@ -48,10 +48,20 @@ async fn wait_for_initial_events(
}
}
async fn client_with_threading_support(server: &MatrixMockServer) -> Client {
server
.client_builder()
.on_builder(|builder| {
builder.with_threading_support(ThreadingSupport::Enabled { with_subscriptions: false })
})
.build()
.await
}
#[async_test]
async fn test_thread_can_paginate_even_if_seen_sync_event() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!galette:saucisse.bzh");
@@ -115,7 +125,7 @@ async fn test_thread_can_paginate_even_if_seen_sync_event() {
#[async_test]
async fn test_ignored_user_empties_threads() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
// Immediately subscribe the event cache to sync updates.
client.event_cache().subscribe().unwrap();
@@ -207,7 +217,7 @@ async fn test_ignored_user_empties_threads() {
#[async_test]
async fn test_gappy_sync_empties_all_threads() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
// Immediately subscribe the event cache to sync updates.
client.event_cache().subscribe().unwrap();
@@ -355,7 +365,7 @@ async fn test_gappy_sync_empties_all_threads() {
#[async_test]
async fn test_deduplication() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let client = client_with_threading_support(&server).await;
// Immediately subscribe the event cache to sync updates.
client.event_cache().subscribe().unwrap();