From f153d85dd044dbd69e2da8394bdd7baf8751ff05 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 28 Apr 2026 18:29:43 +0200 Subject: [PATCH] refactor(sdk): Move thread summary computation from `RoomEventCache` to `ThreadEventCache`. This patch updates the `ThreadSummary` flow. `Caches` ask each thread to compute its `ThreadSummary` and ask the room to update/store it. The edit events targeting in-thread events are now stored in the thread cache! This is new as it was only stored in the room cache previously. This unlocks the possibility to correctly compute the latest event for a thread with the `matrix_sdk::latest_events` API in the future. --- .../src/event_cache/caches/aggregator.rs | 12 +- .../matrix-sdk/src/event_cache/caches/mod.rs | 18 ++- .../src/event_cache/caches/room/mod.rs | 28 +++- .../src/event_cache/caches/room/state.rs | 150 ++---------------- .../src/event_cache/caches/thread/mod.rs | 66 ++------ .../src/event_cache/caches/thread/state.rs | 96 ++++++++++- 6 files changed, 173 insertions(+), 197 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/aggregator.rs b/crates/matrix-sdk/src/event_cache/caches/aggregator.rs index 1266c151d..c7f67b003 100644 --- a/crates/matrix-sdk/src/event_cache/caches/aggregator.rs +++ b/crates/matrix-sdk/src/event_cache/caches/aggregator.rs @@ -59,14 +59,18 @@ pub async fn aggregate_timeline_for_threads( .push(event.clone()); } - // This event is an edit that may apply to a thread.. + // This event is an edit that may apply to a thread. if let Some(edit_target) = extract_edit_target(event.raw()) { // This event is known and part of a thread. - if let Some((_location, edit_target_event)) = + if let Some((_location, edited_event)) = room_event_cache.find_event(&edit_target).await? - && let Some(thread_root) = extract_thread_root(edit_target_event.raw()) + && let Some(thread_root) = extract_thread_root(edited_event.raw()) { - new_events_by_thread.entry(thread_root).or_insert_with(default_timeline); + new_events_by_thread + .entry(thread_root) + .or_insert_with(default_timeline) + .events + .push(event.clone()); } } } diff --git a/crates/matrix-sdk/src/event_cache/caches/mod.rs b/crates/matrix-sdk/src/event_cache/caches/mod.rs index 318fdf966..962529281 100644 --- a/crates/matrix-sdk/src/event_cache/caches/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/mod.rs @@ -210,7 +210,14 @@ impl Caches { let mut updates = updates.clone(); updates.timeline = timeline; - self.thread(thread_id).await?.handle_joined_room_update(updates).await?; + let thread = self.thread(thread_id).await?; + thread.handle_joined_room_update(updates).await?; + + if let Some(thread_summary) = + thread.state().read().await?.compute_thread_summary().await? + { + room.update_thread_summary(thread.thread_id(), thread_summary).await?; + } } } @@ -246,7 +253,14 @@ impl Caches { let mut updates = updates.clone(); updates.timeline = timeline; - self.thread(thread_id).await?.handle_left_room_update(updates).await?; + let thread = self.thread(thread_id).await?; + thread.handle_left_room_update(updates).await?; + + if let Some(thread_summary) = + thread.state().read().await?.compute_thread_summary().await? + { + room.update_thread_summary(thread.thread_id(), thread_summary).await?; + } } } diff --git a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs index 5c94fe20c..8cfb2092d 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs @@ -25,7 +25,7 @@ use std::{ use eyeball::SharedObservable; use matrix_sdk_base::{ - deserialized_responses::AmbiguityChange, + deserialized_responses::{AmbiguityChange, ThreadSummary}, event_cache::Event, sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline}, }; @@ -394,6 +394,32 @@ impl RoomEventCache { Ok(()) } + pub(super) async fn update_thread_summary( + &self, + thread_id: &EventId, + new_thread_summary: ThreadSummary, + ) -> Result<()> { + let timeline_event_diffs = self + .inner + .state + .write() + .await? + .update_thread_summary(thread_id, new_thread_summary) + .await?; + + if !timeline_event_diffs.is_empty() { + self.inner.update_sender.send( + RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { + diffs: timeline_event_diffs, + origin: EventsOrigin::Sync, + }), + Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }), + ); + } + + Ok(()) + } + /// Get a reference to the [`RoomEventCacheUpdateSender`]. pub(in super::super) fn update_sender(&self) -> &RoomEventCacheUpdateSender { &self.inner.update_sender diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index b9702e15b..61c89721d 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -756,7 +756,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { self.shrink_to_last_chunk().await?; } - let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs(); + let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs(); Ok((has_new_gap, timeline_event_diffs)) } @@ -789,55 +789,8 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { .await?; } - let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new(); - for event in events { self.maybe_apply_new_redaction(&event, post_processing_origin).await?; - - if self.state.enabled_thread_support { - // Only add the event to a thread if: - // - thread support is enabled, - // - and if this is a sync (we can't know where to insert backpaginated events - // in threads). - if matches!(post_processing_origin, PostProcessingOrigin::Sync) { - if let Some(thread_root) = extract_thread_root(event.raw()) { - new_events_by_thread.entry(thread_root).or_default().push(event.clone()); - } else if let Some(event_id) = event.event_id() { - // If we spot the root of a thread, add it to its linked chunk. - if self.state.threads.contains_key(&event_id) { - new_events_by_thread.entry(event_id).or_default().push(event.clone()); - } - } - } - - // If the post-processing origin is the redecryption, and this is part of a - // thread, mark the thread as needing an update, potentially for its latest - // event, that might have been redecrypted now. - #[cfg(feature = "e2e-encryption")] - if matches!(post_processing_origin, PostProcessingOrigin::Redecryption) - && let Some(thread_root) = extract_thread_root(event.raw()) - { - new_events_by_thread.entry(thread_root).or_default(); - } - - // Look for edits that may apply to a thread; we'll process them later. - if let Some(edit_target) = extract_edit_target(event.raw()) { - // If the edited event is known, and part of a thread, - if let Some((_location, edit_target_event)) = - self.find_event(&edit_target).await? - && let Some(thread_root) = extract_thread_root(edit_target_event.raw()) - { - // Mark the thread for processing, unless it was already marked as - // such. - new_events_by_thread.entry(thread_root).or_default(); - } - } - } - } - - if self.state.enabled_thread_support { - self.update_threads(new_events_by_thread, prev_batch_token, post_processing_origin) - .await?; } self.update_read_receipts(receipt_event.as_ref()).await?; @@ -940,104 +893,29 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { let thread_cache = self.get_or_reload_thread(thread_root.clone()).await?; thread_cache.add_live_events(new_events, &prev_batch_token).await?; - - let mut latest_event_id = thread_cache.latest_event_id().await?; - - // If there's an edit to the latest event in the thread, use the latest edit - // event id as the latest event id for the thread summary. - if let Some(event_id) = latest_event_id.as_ref() - && let Some((original_event, edits)) = self - .find_event_with_relations(event_id, Some(vec![RelationType::Replacement])) - .await? - { - let latest_valid_edit = edits.into_iter().rfind(|edit| { - let original_json = original_event.raw(); - let original_encryption_info = original_event.encryption_info(); - let replacement_json = edit.raw(); - let replacement_encryption_info = edit.encryption_info(); - - check_validity_of_replacement_events( - original_json, - original_encryption_info.map(|v| &**v), - replacement_json, - replacement_encryption_info.map(|v| &**v), - ) - .is_ok() - }); - - if let Some(latest_valid_edit) = latest_valid_edit { - latest_event_id = latest_valid_edit.event_id(); - } - } - - self.maybe_update_thread_summary(thread_root, latest_event_id, post_processing_origin) - .await?; } Ok(()) } /// Update a thread summary on the given thread root, if needs be. - async fn maybe_update_thread_summary( + #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] + pub async fn update_thread_summary( &mut self, - thread_root: OwnedEventId, - latest_event_id: Option, - _post_processing_origin: PostProcessingOrigin, - ) -> Result<(), EventCacheError> { - // Add a thread summary to the (room) event which has the thread root, if we - // knew about it. - - let Some((location, mut target_event)) = self.find_event(&thread_root).await? else { - trace!(%thread_root, "thread root event is missing from the room linked chunk"); - return Ok(()); + thread_id: &EventId, + new_thread_summary: ThreadSummary, + ) -> Result>, EventCacheError> { + let Some((location, mut thread_root_event)) = self.find_event(&thread_id).await? else { + trace!(%thread_id, "thread root event is missing from the room linked chunk"); + return Ok(Vec::new()); }; - let prev_summary = target_event.thread_summary.summary(); - - // Recompute the thread summary, if needs be. - - // Read the latest number of thread replies from the store. - // - // Implementation note: since this is based on the `m.relates_to` field, and - // that field can only be present on room messages, we don't have to - // worry about filtering out aggregation events (like - // reactions/edits/etc.). Pretty neat, huh? - let num_replies = { - let thread_replies = self - .store - .find_event_relations( - &self.state.room_id, - &thread_root, - Some(&[RelationType::Thread]), - ) - .await?; - thread_replies.len().try_into().unwrap_or(u32::MAX) - }; - - let new_summary = if num_replies > 0 { - Some(ThreadSummary { num_replies, latest_reply: latest_event_id }) - } else { - None - }; - - // Note: in the case of redecryption, we still trigger an update even if the - // summary has changed, so that observers can be notified that the - // event in the summary may have been decrypted now. - #[cfg(feature = "e2e-encryption")] - let update_if_same_summaries = - matches!(_post_processing_origin, PostProcessingOrigin::Redecryption); - #[cfg(not(feature = "e2e-encryption"))] - let update_if_same_summaries = false; - - if !update_if_same_summaries && prev_summary == new_summary.as_ref() { - trace!(%thread_root, "thread summary is up-to-date, no need to update it"); - return Ok(()); - } - // Trigger an update to observers. - trace!(%thread_root, "updating thread summary: {new_summary:?}"); - target_event.thread_summary = ThreadSummaryStatus::from_opt(new_summary); - self.replace_event_at(location, target_event).await + trace!(%thread_id, "updating thread summary: {new_thread_summary:?}"); + thread_root_event.thread_summary = ThreadSummaryStatus::from_opt(Some(new_thread_summary)); + self.replace_event_at(location, thread_root_event).await?; + + Ok(self.room_linked_chunk.updates_as_vector_diffs()) } /// Replaces a single event, be it saved in memory or in the store. diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs index 0453a0bd2..444fbcb70 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs @@ -24,7 +24,9 @@ use matrix_sdk_base::{ event_cache::{Event, store::EventCacheStoreLock}, sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline}, }; -use ruma::{EventId, OwnedEventId, OwnedRoomId, OwnedUserId}; +use ruma::{ + EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, room_version_rules::RoomVersionRules, +}; use tokio::sync::{ Notify, broadcast::{Receiver, Sender}, @@ -123,6 +125,16 @@ impl ThreadEventCache { Ok(cache) } + /// Get the room ID for this room. + pub fn room_id(&self) -> &RoomId { + &self.inner.room_id + } + + /// Get the thread ID for this thread. + pub fn thread_id(&self) -> &EventId { + &self.inner.thread_id + } + /// Subscribe to live events from this thread. pub async fn subscribe(&self) -> Result<(Vec, Receiver)> { let state = self.inner.state.read().await?; @@ -142,7 +154,7 @@ impl ThreadEventCache { } /// Return a reference to the state. - pub(in super::super) fn state(&self) -> &LockedThreadEventCacheState { + pub(super) fn state(&self) -> &LockedThreadEventCacheState { &self.inner.state } @@ -197,43 +209,6 @@ impl ThreadEventCache { Ok(()) } - /// Push some live events to this thread, and propagate the updates to - /// the listeners. - pub async fn add_live_events( - &mut self, - events: Vec, - prev_batch_token: &Option, - ) -> Result<()> { - todo!() - /* - if events.is_empty() { - return Ok(()); - } - - trace!("adding new events"); - - let (stored_prev_batch_token, timeline_event_diffs) = - self.inner.state.write().await?.handle_sync(events, prev_batch_token).await?; - - // Now that all events have been added, we can trigger the - // `pagination_token_notifier`. - if stored_prev_batch_token { - self.inner.pagination_batch_token_notifier.notify_one(); - } - - if !timeline_event_diffs.is_empty() { - self.inner.update_sender.send( - TimelineVectorDiffs { diffs: timeline_event_diffs, origin: EventsOrigin::Sync }, - // This function is part of the `RoomEventCache` flow. The generic update is - // handled by it. - None, - ); - } - - Ok(()) - */ - } - /// Replaces a single event, be it saved in memory or in the store. /// /// If it was saved in memory, this will emit a notification to @@ -266,19 +241,6 @@ impl ThreadEventCache { Ok(()) } - /// Returns the latest event ID in this thread, if any. - pub async fn latest_event_id(&self) -> Result> { - Ok(self - .inner - .state - .read() - .await? - .thread_linked_chunk() - .revents() - .next() - .and_then(|(_position, event)| event.event_id())) - } - /// Force to reload the thread. // // TODO(@hywan): Temporary fix. All the states must be in a single struct behind diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs index f6f9b0de8..45096e646 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs @@ -14,6 +14,8 @@ use eyeball_im::VectorDiff; use matrix_sdk_base::{ + check_validity_of_replacement_events, + deserialized_responses::{ThreadSummary, ThreadSummaryStatus}, event_cache::{ Event, Gap, store::{EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState}, @@ -21,10 +23,11 @@ use matrix_sdk_base::{ linked_chunk::{ ChunkIdentifierGenerator, LinkedChunkId, OwnedLinkedChunkId, Position, Update, lazy_loader, }, + serde_helpers::extract_edit_target, sync::Timeline, }; use matrix_sdk_common::executor::spawn; -use ruma::{EventId, OwnedEventId, OwnedRoomId, OwnedUserId}; +use ruma::{EventId, OwnedEventId, OwnedRoomId, OwnedUserId, events::relation::RelationType}; use tokio::sync::broadcast::Sender; use tracing::{debug, error, instrument, trace}; @@ -33,7 +36,10 @@ use super::{ super::{ EventCacheError, EventsOrigin, Result, deduplicator::{DeduplicationOutcome, filter_duplicate_events}, - persistence::{find_event, load_linked_chunk_metadata, send_updates_to_store}, + persistence::{ + find_event, find_event_with_relations, load_linked_chunk_metadata, + send_updates_to_store, + }, }, EventLocation, TimelineVectorDiffs, event_linked_chunk::{EventLinkedChunk, sort_positions_descending}, @@ -318,6 +324,92 @@ impl<'a> ThreadEventCacheStateLockReadGuard<'a> { pub fn thread_linked_chunk(&self) -> &EventLinkedChunk { &self.state.thread_linked_chunk } + + /// Compute and return the [`ThreadSummary`] for this thread. + pub async fn compute_thread_summary(&self) -> Result> { + // Find the latest event ID. + // + // TODO(@hywan): This is inefficient. Ultimately, we want to rely on + // `LatestEvent` to compute the `ThreadSummary` correctly. + let latest_event_id = { + // Find the last non-edit event. + let mut latest_event_id = self + .thread_linked_chunk() + .revents() + .filter(|(_position, event)| extract_edit_target(event.raw()).is_none()) + .next() + .and_then(|(_position, event)| event.event_id()); + + // If there's an edit to the latest event in the thread, use the latest edit + // event ID as the latest event ID for the thread summary. + if let Some(event_id) = latest_event_id.as_ref() + && let Some((original_event, edits)) = self + .find_event_with_relations(event_id, Some(vec![RelationType::Replacement])) + .await? + { + let latest_valid_edit = edits.into_iter().rfind(|edit| { + let original_json = original_event.raw(); + let original_encryption_info = original_event.encryption_info(); + let replacement_json = edit.raw(); + let replacement_encryption_info = edit.encryption_info(); + + check_validity_of_replacement_events( + original_json, + original_encryption_info.map(|v| &**v), + replacement_json, + replacement_encryption_info.map(|v| &**v), + ) + .is_ok() + }); + + if let Some(latest_valid_edit) = latest_valid_edit { + latest_event_id = latest_valid_edit.event_id(); + } + } + + latest_event_id + }; + + // Compute the thread summary. + + // Read the latest number of thread replies from the store. + // + // Implementation note: since this is based on the `m.relates_to` field, and + // that field can only be present on room messages, we don't have to + // worry about filtering out aggregation events (like reactions/edits/etc.). + // Pretty neat, huh? + let num_replies = { + let thread_replies = self + .store + .find_event_relations(&self.room_id, &self.thread_id, Some(&[RelationType::Thread])) + .await?; + thread_replies.len().try_into().unwrap_or(u32::MAX) + }; + + let summary = if num_replies > 0 { + Some(ThreadSummary { num_replies, latest_reply: latest_event_id }) + } else { + None + }; + + Ok(summary) + } + + /// See documentation of [`find_event_with_relations`]. + async fn find_event_with_relations( + &self, + event_id: &EventId, + filters: Option>, + ) -> Result)>> { + find_event_with_relations( + event_id, + &self.room_id, + filters, + &self.thread_linked_chunk, + &self.store, + ) + .await + } } impl<'a> ThreadEventCacheStateLockWriteGuard<'a> {