diff --git a/crates/matrix-sdk/src/event_cache/caches/aggregator.rs b/crates/matrix-sdk/src/event_cache/caches/aggregator.rs index 1266c151d..c7f67b003 100644 --- a/crates/matrix-sdk/src/event_cache/caches/aggregator.rs +++ b/crates/matrix-sdk/src/event_cache/caches/aggregator.rs @@ -59,14 +59,18 @@ pub async fn aggregate_timeline_for_threads( .push(event.clone()); } - // This event is an edit that may apply to a thread.. + // This event is an edit that may apply to a thread. if let Some(edit_target) = extract_edit_target(event.raw()) { // This event is known and part of a thread. - if let Some((_location, edit_target_event)) = + if let Some((_location, edited_event)) = room_event_cache.find_event(&edit_target).await? - && let Some(thread_root) = extract_thread_root(edit_target_event.raw()) + && let Some(thread_root) = extract_thread_root(edited_event.raw()) { - new_events_by_thread.entry(thread_root).or_insert_with(default_timeline); + new_events_by_thread + .entry(thread_root) + .or_insert_with(default_timeline) + .events + .push(event.clone()); } } } diff --git a/crates/matrix-sdk/src/event_cache/caches/mod.rs b/crates/matrix-sdk/src/event_cache/caches/mod.rs index 318fdf966..962529281 100644 --- a/crates/matrix-sdk/src/event_cache/caches/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/mod.rs @@ -210,7 +210,14 @@ impl Caches { let mut updates = updates.clone(); updates.timeline = timeline; - self.thread(thread_id).await?.handle_joined_room_update(updates).await?; + let thread = self.thread(thread_id).await?; + thread.handle_joined_room_update(updates).await?; + + if let Some(thread_summary) = + thread.state().read().await?.compute_thread_summary().await? + { + room.update_thread_summary(thread.thread_id(), thread_summary).await?; + } } } @@ -246,7 +253,14 @@ impl Caches { let mut updates = updates.clone(); updates.timeline = timeline; - self.thread(thread_id).await?.handle_left_room_update(updates).await?; + let thread = self.thread(thread_id).await?; + thread.handle_left_room_update(updates).await?; + + if let Some(thread_summary) = + thread.state().read().await?.compute_thread_summary().await? + { + room.update_thread_summary(thread.thread_id(), thread_summary).await?; + } } } diff --git a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs index 5c94fe20c..8cfb2092d 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs @@ -25,7 +25,7 @@ use std::{ use eyeball::SharedObservable; use matrix_sdk_base::{ - deserialized_responses::AmbiguityChange, + deserialized_responses::{AmbiguityChange, ThreadSummary}, event_cache::Event, sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline}, }; @@ -394,6 +394,32 @@ impl RoomEventCache { Ok(()) } + pub(super) async fn update_thread_summary( + &self, + thread_id: &EventId, + new_thread_summary: ThreadSummary, + ) -> Result<()> { + let timeline_event_diffs = self + .inner + .state + .write() + .await? + .update_thread_summary(thread_id, new_thread_summary) + .await?; + + if !timeline_event_diffs.is_empty() { + self.inner.update_sender.send( + RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { + diffs: timeline_event_diffs, + origin: EventsOrigin::Sync, + }), + Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }), + ); + } + + Ok(()) + } + /// Get a reference to the [`RoomEventCacheUpdateSender`]. pub(in super::super) fn update_sender(&self) -> &RoomEventCacheUpdateSender { &self.inner.update_sender diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index b9702e15b..61c89721d 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -756,7 +756,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { self.shrink_to_last_chunk().await?; } - let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs(); + let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs(); Ok((has_new_gap, timeline_event_diffs)) } @@ -789,55 +789,8 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { .await?; } - let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new(); - for event in events { self.maybe_apply_new_redaction(&event, post_processing_origin).await?; - - if self.state.enabled_thread_support { - // Only add the event to a thread if: - // - thread support is enabled, - // - and if this is a sync (we can't know where to insert backpaginated events - // in threads). - if matches!(post_processing_origin, PostProcessingOrigin::Sync) { - if let Some(thread_root) = extract_thread_root(event.raw()) { - new_events_by_thread.entry(thread_root).or_default().push(event.clone()); - } else if let Some(event_id) = event.event_id() { - // If we spot the root of a thread, add it to its linked chunk. - if self.state.threads.contains_key(&event_id) { - new_events_by_thread.entry(event_id).or_default().push(event.clone()); - } - } - } - - // If the post-processing origin is the redecryption, and this is part of a - // thread, mark the thread as needing an update, potentially for its latest - // event, that might have been redecrypted now. - #[cfg(feature = "e2e-encryption")] - if matches!(post_processing_origin, PostProcessingOrigin::Redecryption) - && let Some(thread_root) = extract_thread_root(event.raw()) - { - new_events_by_thread.entry(thread_root).or_default(); - } - - // Look for edits that may apply to a thread; we'll process them later. - if let Some(edit_target) = extract_edit_target(event.raw()) { - // If the edited event is known, and part of a thread, - if let Some((_location, edit_target_event)) = - self.find_event(&edit_target).await? - && let Some(thread_root) = extract_thread_root(edit_target_event.raw()) - { - // Mark the thread for processing, unless it was already marked as - // such. - new_events_by_thread.entry(thread_root).or_default(); - } - } - } - } - - if self.state.enabled_thread_support { - self.update_threads(new_events_by_thread, prev_batch_token, post_processing_origin) - .await?; } self.update_read_receipts(receipt_event.as_ref()).await?; @@ -940,104 +893,29 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { let thread_cache = self.get_or_reload_thread(thread_root.clone()).await?; thread_cache.add_live_events(new_events, &prev_batch_token).await?; - - let mut latest_event_id = thread_cache.latest_event_id().await?; - - // If there's an edit to the latest event in the thread, use the latest edit - // event id as the latest event id for the thread summary. - if let Some(event_id) = latest_event_id.as_ref() - && let Some((original_event, edits)) = self - .find_event_with_relations(event_id, Some(vec![RelationType::Replacement])) - .await? - { - let latest_valid_edit = edits.into_iter().rfind(|edit| { - let original_json = original_event.raw(); - let original_encryption_info = original_event.encryption_info(); - let replacement_json = edit.raw(); - let replacement_encryption_info = edit.encryption_info(); - - check_validity_of_replacement_events( - original_json, - original_encryption_info.map(|v| &**v), - replacement_json, - replacement_encryption_info.map(|v| &**v), - ) - .is_ok() - }); - - if let Some(latest_valid_edit) = latest_valid_edit { - latest_event_id = latest_valid_edit.event_id(); - } - } - - self.maybe_update_thread_summary(thread_root, latest_event_id, post_processing_origin) - .await?; } Ok(()) } /// Update a thread summary on the given thread root, if needs be. - async fn maybe_update_thread_summary( + #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] + pub async fn update_thread_summary( &mut self, - thread_root: OwnedEventId, - latest_event_id: Option, - _post_processing_origin: PostProcessingOrigin, - ) -> Result<(), EventCacheError> { - // Add a thread summary to the (room) event which has the thread root, if we - // knew about it. - - let Some((location, mut target_event)) = self.find_event(&thread_root).await? else { - trace!(%thread_root, "thread root event is missing from the room linked chunk"); - return Ok(()); + thread_id: &EventId, + new_thread_summary: ThreadSummary, + ) -> Result>, EventCacheError> { + let Some((location, mut thread_root_event)) = self.find_event(&thread_id).await? else { + trace!(%thread_id, "thread root event is missing from the room linked chunk"); + return Ok(Vec::new()); }; - let prev_summary = target_event.thread_summary.summary(); - - // Recompute the thread summary, if needs be. - - // Read the latest number of thread replies from the store. - // - // Implementation note: since this is based on the `m.relates_to` field, and - // that field can only be present on room messages, we don't have to - // worry about filtering out aggregation events (like - // reactions/edits/etc.). Pretty neat, huh? - let num_replies = { - let thread_replies = self - .store - .find_event_relations( - &self.state.room_id, - &thread_root, - Some(&[RelationType::Thread]), - ) - .await?; - thread_replies.len().try_into().unwrap_or(u32::MAX) - }; - - let new_summary = if num_replies > 0 { - Some(ThreadSummary { num_replies, latest_reply: latest_event_id }) - } else { - None - }; - - // Note: in the case of redecryption, we still trigger an update even if the - // summary has changed, so that observers can be notified that the - // event in the summary may have been decrypted now. - #[cfg(feature = "e2e-encryption")] - let update_if_same_summaries = - matches!(_post_processing_origin, PostProcessingOrigin::Redecryption); - #[cfg(not(feature = "e2e-encryption"))] - let update_if_same_summaries = false; - - if !update_if_same_summaries && prev_summary == new_summary.as_ref() { - trace!(%thread_root, "thread summary is up-to-date, no need to update it"); - return Ok(()); - } - // Trigger an update to observers. - trace!(%thread_root, "updating thread summary: {new_summary:?}"); - target_event.thread_summary = ThreadSummaryStatus::from_opt(new_summary); - self.replace_event_at(location, target_event).await + trace!(%thread_id, "updating thread summary: {new_thread_summary:?}"); + thread_root_event.thread_summary = ThreadSummaryStatus::from_opt(Some(new_thread_summary)); + self.replace_event_at(location, thread_root_event).await?; + + Ok(self.room_linked_chunk.updates_as_vector_diffs()) } /// Replaces a single event, be it saved in memory or in the store. diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs index 0453a0bd2..444fbcb70 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs @@ -24,7 +24,9 @@ use matrix_sdk_base::{ event_cache::{Event, store::EventCacheStoreLock}, sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline}, }; -use ruma::{EventId, OwnedEventId, OwnedRoomId, OwnedUserId}; +use ruma::{ + EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, room_version_rules::RoomVersionRules, +}; use tokio::sync::{ Notify, broadcast::{Receiver, Sender}, @@ -123,6 +125,16 @@ impl ThreadEventCache { Ok(cache) } + /// Get the room ID for this room. + pub fn room_id(&self) -> &RoomId { + &self.inner.room_id + } + + /// Get the thread ID for this thread. + pub fn thread_id(&self) -> &EventId { + &self.inner.thread_id + } + /// Subscribe to live events from this thread. pub async fn subscribe(&self) -> Result<(Vec, Receiver)> { let state = self.inner.state.read().await?; @@ -142,7 +154,7 @@ impl ThreadEventCache { } /// Return a reference to the state. - pub(in super::super) fn state(&self) -> &LockedThreadEventCacheState { + pub(super) fn state(&self) -> &LockedThreadEventCacheState { &self.inner.state } @@ -197,43 +209,6 @@ impl ThreadEventCache { Ok(()) } - /// Push some live events to this thread, and propagate the updates to - /// the listeners. - pub async fn add_live_events( - &mut self, - events: Vec, - prev_batch_token: &Option, - ) -> Result<()> { - todo!() - /* - if events.is_empty() { - return Ok(()); - } - - trace!("adding new events"); - - let (stored_prev_batch_token, timeline_event_diffs) = - self.inner.state.write().await?.handle_sync(events, prev_batch_token).await?; - - // Now that all events have been added, we can trigger the - // `pagination_token_notifier`. - if stored_prev_batch_token { - self.inner.pagination_batch_token_notifier.notify_one(); - } - - if !timeline_event_diffs.is_empty() { - self.inner.update_sender.send( - TimelineVectorDiffs { diffs: timeline_event_diffs, origin: EventsOrigin::Sync }, - // This function is part of the `RoomEventCache` flow. The generic update is - // handled by it. - None, - ); - } - - Ok(()) - */ - } - /// Replaces a single event, be it saved in memory or in the store. /// /// If it was saved in memory, this will emit a notification to @@ -266,19 +241,6 @@ impl ThreadEventCache { Ok(()) } - /// Returns the latest event ID in this thread, if any. - pub async fn latest_event_id(&self) -> Result> { - Ok(self - .inner - .state - .read() - .await? - .thread_linked_chunk() - .revents() - .next() - .and_then(|(_position, event)| event.event_id())) - } - /// Force to reload the thread. // // TODO(@hywan): Temporary fix. All the states must be in a single struct behind diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs index f6f9b0de8..45096e646 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs @@ -14,6 +14,8 @@ use eyeball_im::VectorDiff; use matrix_sdk_base::{ + check_validity_of_replacement_events, + deserialized_responses::{ThreadSummary, ThreadSummaryStatus}, event_cache::{ Event, Gap, store::{EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState}, @@ -21,10 +23,11 @@ use matrix_sdk_base::{ linked_chunk::{ ChunkIdentifierGenerator, LinkedChunkId, OwnedLinkedChunkId, Position, Update, lazy_loader, }, + serde_helpers::extract_edit_target, sync::Timeline, }; use matrix_sdk_common::executor::spawn; -use ruma::{EventId, OwnedEventId, OwnedRoomId, OwnedUserId}; +use ruma::{EventId, OwnedEventId, OwnedRoomId, OwnedUserId, events::relation::RelationType}; use tokio::sync::broadcast::Sender; use tracing::{debug, error, instrument, trace}; @@ -33,7 +36,10 @@ use super::{ super::{ EventCacheError, EventsOrigin, Result, deduplicator::{DeduplicationOutcome, filter_duplicate_events}, - persistence::{find_event, load_linked_chunk_metadata, send_updates_to_store}, + persistence::{ + find_event, find_event_with_relations, load_linked_chunk_metadata, + send_updates_to_store, + }, }, EventLocation, TimelineVectorDiffs, event_linked_chunk::{EventLinkedChunk, sort_positions_descending}, @@ -318,6 +324,92 @@ impl<'a> ThreadEventCacheStateLockReadGuard<'a> { pub fn thread_linked_chunk(&self) -> &EventLinkedChunk { &self.state.thread_linked_chunk } + + /// Compute and return the [`ThreadSummary`] for this thread. + pub async fn compute_thread_summary(&self) -> Result> { + // Find the latest event ID. + // + // TODO(@hywan): This is inefficient. Ultimately, we want to rely on + // `LatestEvent` to compute the `ThreadSummary` correctly. + let latest_event_id = { + // Find the last non-edit event. + let mut latest_event_id = self + .thread_linked_chunk() + .revents() + .filter(|(_position, event)| extract_edit_target(event.raw()).is_none()) + .next() + .and_then(|(_position, event)| event.event_id()); + + // If there's an edit to the latest event in the thread, use the latest edit + // event ID as the latest event ID for the thread summary. + if let Some(event_id) = latest_event_id.as_ref() + && let Some((original_event, edits)) = self + .find_event_with_relations(event_id, Some(vec![RelationType::Replacement])) + .await? + { + let latest_valid_edit = edits.into_iter().rfind(|edit| { + let original_json = original_event.raw(); + let original_encryption_info = original_event.encryption_info(); + let replacement_json = edit.raw(); + let replacement_encryption_info = edit.encryption_info(); + + check_validity_of_replacement_events( + original_json, + original_encryption_info.map(|v| &**v), + replacement_json, + replacement_encryption_info.map(|v| &**v), + ) + .is_ok() + }); + + if let Some(latest_valid_edit) = latest_valid_edit { + latest_event_id = latest_valid_edit.event_id(); + } + } + + latest_event_id + }; + + // Compute the thread summary. + + // Read the latest number of thread replies from the store. + // + // Implementation note: since this is based on the `m.relates_to` field, and + // that field can only be present on room messages, we don't have to + // worry about filtering out aggregation events (like reactions/edits/etc.). + // Pretty neat, huh? + let num_replies = { + let thread_replies = self + .store + .find_event_relations(&self.room_id, &self.thread_id, Some(&[RelationType::Thread])) + .await?; + thread_replies.len().try_into().unwrap_or(u32::MAX) + }; + + let summary = if num_replies > 0 { + Some(ThreadSummary { num_replies, latest_reply: latest_event_id }) + } else { + None + }; + + Ok(summary) + } + + /// See documentation of [`find_event_with_relations`]. + async fn find_event_with_relations( + &self, + event_id: &EventId, + filters: Option>, + ) -> Result)>> { + find_event_with_relations( + event_id, + &self.room_id, + filters, + &self.thread_linked_chunk, + &self.store, + ) + .await + } } impl<'a> ThreadEventCacheStateLockWriteGuard<'a> {