From ff4b7a8acc04e9c4b571a0e47bc8037ee0e10976 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 3 Jun 2025 14:18:26 +0200 Subject: [PATCH] feat(event cache): add basic support for the latest event in the thread summary --- .../src/deserialized_responses.rs | 15 ++++-- crates/matrix-sdk-common/src/serde_helpers.rs | 5 +- .../timeline/controller/state_transaction.rs | 53 ++++++++++++++++--- .../matrix-sdk-ui/src/timeline/tests/mod.rs | 4 ++ crates/matrix-sdk-ui/src/timeline/traits.rs | 10 ++++ .../tests/integration/timeline/thread.rs | 43 +++++++++++---- .../matrix-sdk/src/event_cache/pagination.rs | 2 +- crates/matrix-sdk/src/event_cache/room/mod.rs | 44 +++++++++++---- 8 files changed, 145 insertions(+), 31 deletions(-) diff --git a/crates/matrix-sdk-common/src/deserialized_responses.rs b/crates/matrix-sdk-common/src/deserialized_responses.rs index d1108f0a9..c81f7d831 100644 --- a/crates/matrix-sdk-common/src/deserialized_responses.rs +++ b/crates/matrix-sdk-common/src/deserialized_responses.rs @@ -372,6 +372,10 @@ impl<'de> Deserialize<'de> for EncryptionInfo { /// - whether the user participated or not to this thread. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct ThreadSummary { + /// The event id for the latest reply to the thread. + #[serde(skip_serializing_if = "Option::is_none")] + pub latest_reply: Option, + /// The number of replies to the thread. /// /// This doesn't include the thread root event itself. It can be zero if no @@ -1370,8 +1374,9 @@ mod tests { // When creating a timeline event from a raw event, the thread summary is always // extracted, if available. let timeline_event = TimelineEvent::new(raw); - assert_matches!(timeline_event.thread_summary, ThreadSummaryStatus::Some(ThreadSummary { num_replies }) => { + assert_matches!(timeline_event.thread_summary, ThreadSummaryStatus::Some(ThreadSummary { num_replies, latest_reply }) => { assert_eq!(num_replies, 2); + assert_eq!(latest_reply.as_deref(), Some(event_id!("$latest_event:example.com"))); }); // When deserializing an old serialized timeline event, the thread summary is @@ -1386,8 +1391,9 @@ mod tests { let timeline_event: TimelineEvent = serde_json::from_value(serialized_timeline_item).unwrap(); - assert_matches!(timeline_event.thread_summary, ThreadSummaryStatus::Some(ThreadSummary { num_replies }) => { + assert_matches!(timeline_event.thread_summary, ThreadSummaryStatus::Some(ThreadSummary { num_replies, latest_reply }) => { assert_eq!(num_replies, 2); + assert_eq!(latest_reply.as_deref(), Some(event_id!("$latest_event:example.com"))); }); } @@ -1651,7 +1657,10 @@ mod tests { )])), }), push_actions: Default::default(), - thread_summary: ThreadSummaryStatus::Some(ThreadSummary { num_replies: 2 }), + thread_summary: ThreadSummaryStatus::Some(ThreadSummary { + num_replies: 2, + latest_reply: None, + }), }; with_settings!({ sort_maps => true, prepend_module_to_snapshot => false }, { diff --git a/crates/matrix-sdk-common/src/serde_helpers.rs b/crates/matrix-sdk-common/src/serde_helpers.rs index 9b12aec3e..223490bfb 100644 --- a/crates/matrix-sdk-common/src/serde_helpers.rs +++ b/crates/matrix-sdk-common/src/serde_helpers.rs @@ -82,7 +82,10 @@ pub fn extract_bundled_thread_summary(event: &Raw) -> Thre // to happen to have that many events in real-world threads. let count = bundled_thread.count.try_into().unwrap_or(UInt::MAX.try_into().unwrap()); - ThreadSummaryStatus::Some(ThreadSummary { num_replies: count }) + let latest_reply = + bundled_thread.latest_event.get_field::("event_id").ok().flatten(); + + ThreadSummaryStatus::Some(ThreadSummary { num_replies: count, latest_reply }) } Ok(_) => ThreadSummaryStatus::None, Err(_) => ThreadSummaryStatus::Unknown, diff --git a/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs b/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs index 0595b4083..343bb2c95 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs @@ -37,7 +37,7 @@ use super::{ }; use crate::timeline::{ event_handler::{FailedToParseEvent, RemovedItem, TimelineAction}, - ThreadSummary, TimelineDetails, VirtualTimelineItem, + ThreadSummary, ThreadSummaryLatestEvent, TimelineDetails, VirtualTimelineItem, }; pub(in crate::timeline) struct TimelineStateTransaction<'a> { @@ -555,13 +555,54 @@ impl<'a> TimelineStateTransaction<'a> { settings: &TimelineSettings, date_divider_adjuster: &mut DateDividerAdjuster, ) -> RemovedItem { - // TODO: do something with the thread summary! let TimelineEvent { push_actions, kind, thread_summary } = event; - let thread_summary = thread_summary.summary().map(|summary| ThreadSummary { - latest_event: TimelineDetails::Unavailable, - num_replies: summary.num_replies, - }); + let thread_summary = if let Some(summary) = thread_summary.summary() { + let latest_reply_item = + if let Some(event_id) = summary.latest_reply.as_ref() { + // Attempt to load the timeline event, either from the event cache or the + // storage. + let event = room_data_provider + .load_event(event_id) + .await + .inspect_err(|err| { + warn!("Failed to load thread latest event: {err}"); + }) + .ok(); + + if let Some(event) = event { + // lol @ hack + crate::timeline::RepliedToEvent::try_from_timeline_event( + event, + room_data_provider, + &self.items, + &mut self.meta, + ) + .await + .inspect_err(|err| { + warn!("Failed to extract thread event into a timeline item content: {err}"); + }) + .ok() + .flatten() + .map(|replied_to| Box::new(ThreadSummaryLatestEvent { + content: replied_to.content().clone(), + sender: replied_to.sender().to_owned(), + sender_profile: replied_to.sender_profile().clone(), + })) + } else { + None + } + } else { + None + }; + + Some(ThreadSummary { + latest_event: TimelineDetails::from_initial_value(latest_reply_item), + num_replies: summary.num_replies, + }) + } else { + None + }; let encryption_info = kind.encryption_info().cloned(); diff --git a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs index 08d9b2dff..8ccee7282 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs @@ -437,4 +437,8 @@ impl RoomDataProvider for TestRoomDataProvider { ) -> Result { unimplemented!(); } + + async fn load_event<'a>(&'a self, _event_id: &'a EventId) -> matrix_sdk::Result { + unimplemented!(); + } } diff --git a/crates/matrix-sdk-ui/src/timeline/traits.rs b/crates/matrix-sdk-ui/src/timeline/traits.rs index 11fc3bc2b..87f4e57da 100644 --- a/crates/matrix-sdk-ui/src/timeline/traits.rs +++ b/crates/matrix-sdk-ui/src/timeline/traits.rs @@ -145,6 +145,12 @@ pub(super) trait RoomDataProvider: ) -> impl Future>> + SendOutsideWasm; async fn relations(&self, event_id: OwnedEventId, opts: RelationsOptions) -> Result; + + /// Loads an event from the cache or network. + fn load_event<'a>( + &'a self, + event_id: &'a EventId, + ) -> impl Future> + SendOutsideWasm + 'a; } impl RoomDataProvider for Room { @@ -294,6 +300,10 @@ impl RoomDataProvider for Room { async fn relations(&self, event_id: OwnedEventId, opts: RelationsOptions) -> Result { self.relations(event_id, opts).await } + + async fn load_event<'a>(&'a self, event_id: &'a EventId) -> Result { + self.load_or_fetch_event(event_id, None).await + } } // Internal helper to make most of retry_event_decryption independent of a room diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/thread.rs b/crates/matrix-sdk-ui/tests/integration/timeline/thread.rs index 1c02efcf4..67a61b001 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/thread.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/thread.rs @@ -213,16 +213,18 @@ async fn test_extract_bundled_thread_summary() { let f = EventFactory::new().room(room_id).sender(&ALICE); let thread_event_id = event_id!("$thread_root"); let latest_event_id = event_id!("$latest_event"); + let latest_event = f.text_msg("the last one!").event_id(latest_event_id).into_event(); let event = f .text_msg("thready thread mcthreadface") - .with_bundled_thread_summary( - f.text_msg("the last one!").event_id(latest_event_id).into_raw(), - 42, - false, - ) + .with_bundled_thread_summary(latest_event.raw().cast_ref().clone(), 42, false) .event_id(thread_event_id); + // Set up the /event for the latest thread event. + // FIXME(bnjbvr): shouldn't be necessary, the event cache could save the bundled + // latest event instead. + server.mock_room_event().match_event_id().ok(latest_event).mock_once().mount().await; + server.sync_room(&client, JoinedRoomBuilder::new(room_id).add_timeline_event(event)).await; assert_let_timeout!(Some(timeline_updates) = stream.next()); @@ -234,8 +236,15 @@ async fn test_extract_bundled_thread_summary() { let event_item = value.as_event().unwrap(); assert_eq!(event_item.event_id().unwrap(), thread_event_id); assert_let!(Some(summary) = event_item.content().thread_summary()); - // Soon™, Stefan, soon™. - assert!(summary.latest_event.is_unavailable()); + + // We get the latest event from the bundled thread summary. + assert!(summary.latest_event.is_ready()); + assert_let!(TimelineDetails::Ready(latest_event) = summary.latest_event); + assert_eq!(latest_event.content.as_message().unwrap().body(), "the last one!"); + assert_eq!(latest_event.sender, *ALICE); + assert!(latest_event.sender_profile.is_unavailable()); + + // We get the count from the bundled thread summary. assert_eq!(summary.num_replies, 42); assert_let!(VectorDiff::PushFront { value } = &timeline_updates[1]); @@ -321,9 +330,15 @@ async fn test_new_thread_reply_causes_thread_summary() { assert_eq!(event_item.event_id().unwrap(), thread_event_id); assert!(event_item.content().thread_root().is_none()); + // The thread summary contains the detailed information about the latest event. assert_let!(Some(summary) = event_item.content().thread_summary()); - // Soon™, Stefan, soon™. - assert!(summary.latest_event.is_unavailable()); + assert!(summary.latest_event.is_ready()); + assert_let!(TimelineDetails::Ready(latest_event) = summary.latest_event); + assert_eq!(latest_event.content.as_message().unwrap().body(), "thread reply"); + assert_eq!(latest_event.sender, *BOB); + assert!(latest_event.sender_profile.is_unavailable()); + + // The thread summary contains the number of replies. assert_eq!(summary.num_replies, 1); assert_pending!(stream); @@ -373,8 +388,14 @@ async fn test_new_thread_reply_causes_thread_summary() { assert!(event_item.content().thread_root().is_none()); assert_let!(Some(summary) = event_item.content().thread_summary()); - // Soon™, Stefan, soon™. - assert!(summary.latest_event.is_unavailable()); + + // The latest event has been updated. + assert!(summary.latest_event.is_ready()); + assert_let!(TimelineDetails::Ready(latest_event) = summary.latest_event); + assert_eq!(latest_event.content.as_message().unwrap().body(), "another thread reply"); + assert_eq!(latest_event.sender, *BOB); + assert!(latest_event.sender_profile.is_unavailable()); + // The number of replies has been updated. assert_eq!(summary.num_replies, 2); } diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index a666cdd48..38cc63d64 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -352,7 +352,7 @@ impl RoomPagination { }; let next_diffs = state - .with_events_mut(|room_events| { + .with_events_mut(false, |room_events| { // Reverse the order of the events as `/messages` has been called with `dir=b` // (backwards). The `RoomEvents` API expects the first event to be the oldest. // Let's re-order them for this block. diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 93dbca801..424cc790a 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -429,7 +429,7 @@ impl RoomEventCacheInner { // Add the previous back-pagination token (if present), followed by the timeline // events themselves. let new_timeline_event_diffs = state - .with_events_mut(|room_events| { + .with_events_mut(true, |room_events| { // If we only received duplicated events, we don't need to store the gap: if // there was a gap, we'd have received an unknown event at the tail of // the room's timeline (unless the server reordered sync events since the last @@ -1161,6 +1161,7 @@ mod private { #[instrument(skip_all, fields(room_id = %self.room))] pub async fn with_events_mut( &mut self, + is_live_sync: bool, func: F, ) -> Result>, EventCacheError> where @@ -1173,7 +1174,7 @@ mod private { for event in &events_to_post_process { self.maybe_apply_new_redaction(event).await?; - self.analyze_thread_root(event).await?; + self.analyze_thread_root(event, is_live_sync).await?; } // If we've never waited for an initial previous-batch token, and we now have at @@ -1192,7 +1193,11 @@ mod private { /// If the event is a threaded reply, ensure the related thread's root /// event (i.e. first thread event) has a thread summary. #[instrument(skip_all)] - async fn analyze_thread_root(&mut self, event: &Event) -> Result<(), EventCacheError> { + async fn analyze_thread_root( + &mut self, + event: &Event, + is_live_sync: bool, + ) -> Result<(), EventCacheError> { let Some(thread_root) = extract_thread_root(event.raw()) else { // No thread root, carry on. return Ok(()); @@ -1219,13 +1224,34 @@ mod private { related_thread_events.len() }; - let new_summary = ThreadSummary { num_replies }; + let prev_summary = target_event.thread_summary.summary(); + let mut latest_reply = + prev_summary.as_ref().and_then(|summary| summary.latest_reply.clone()); - if let ThreadSummaryStatus::Some(existing) = &target_event.thread_summary { - if existing == &new_summary { - trace!("thread summary is already up-to-date"); - return Ok(()); - } + // If we're live-syncing, then the latest event is always the event we're + // currently processing. We're processing the sync events from oldest to newest, + // so a a single sync response containing multiple thread events + // will correctly override the latest event to the most recent one. + // + // If we're back-paginating, then we shouldn't update the latest event + // information if it's set. If it's not set, then we should update + // it to the last event in the batch. TODO(bnjbvr): the code is + // wrong here in this particular case, because a single pagination + // batch may include multiple events in the same thread, and they're + // processed from oldest to newest; so the first in-thread event seen in that + // batch will be marked as the latest reply, which is incorrect. + // This will be fixed Later™ by using a proper linked chunk per + // thread. + + if is_live_sync || latest_reply.is_none() { + latest_reply = event.event_id(); + } + + let new_summary = ThreadSummary { num_replies, latest_reply }; + + if prev_summary == Some(&new_summary) { + trace!("thread summary is already up-to-date"); + return Ok(()); } // Cause an update to observers.