feat(event cache): add basic support for the latest event in the thread summary

This commit is contained in:
Benjamin Bouvier
2025-06-03 14:18:26 +02:00
parent a152f9c074
commit ff4b7a8acc
8 changed files with 145 additions and 31 deletions

View File

@@ -372,6 +372,10 @@ impl<'de> Deserialize<'de> for EncryptionInfo {
/// - whether the user participated or not to this thread.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct ThreadSummary {
/// The event id for the latest reply to the thread.
#[serde(skip_serializing_if = "Option::is_none")]
pub latest_reply: Option<OwnedEventId>,
/// The number of replies to the thread.
///
/// This doesn't include the thread root event itself. It can be zero if no
@@ -1370,8 +1374,9 @@ mod tests {
// When creating a timeline event from a raw event, the thread summary is always
// extracted, if available.
let timeline_event = TimelineEvent::new(raw);
assert_matches!(timeline_event.thread_summary, ThreadSummaryStatus::Some(ThreadSummary { num_replies }) => {
assert_matches!(timeline_event.thread_summary, ThreadSummaryStatus::Some(ThreadSummary { num_replies, latest_reply }) => {
assert_eq!(num_replies, 2);
assert_eq!(latest_reply.as_deref(), Some(event_id!("$latest_event:example.com")));
});
// When deserializing an old serialized timeline event, the thread summary is
@@ -1386,8 +1391,9 @@ mod tests {
let timeline_event: TimelineEvent =
serde_json::from_value(serialized_timeline_item).unwrap();
assert_matches!(timeline_event.thread_summary, ThreadSummaryStatus::Some(ThreadSummary { num_replies }) => {
assert_matches!(timeline_event.thread_summary, ThreadSummaryStatus::Some(ThreadSummary { num_replies, latest_reply }) => {
assert_eq!(num_replies, 2);
assert_eq!(latest_reply.as_deref(), Some(event_id!("$latest_event:example.com")));
});
}
@@ -1651,7 +1657,10 @@ mod tests {
)])),
}),
push_actions: Default::default(),
thread_summary: ThreadSummaryStatus::Some(ThreadSummary { num_replies: 2 }),
thread_summary: ThreadSummaryStatus::Some(ThreadSummary {
num_replies: 2,
latest_reply: None,
}),
};
with_settings!({ sort_maps => true, prepend_module_to_snapshot => false }, {

View File

@@ -82,7 +82,10 @@ pub fn extract_bundled_thread_summary(event: &Raw<AnySyncTimelineEvent>) -> Thre
// to happen to have that many events in real-world threads.
let count = bundled_thread.count.try_into().unwrap_or(UInt::MAX.try_into().unwrap());
ThreadSummaryStatus::Some(ThreadSummary { num_replies: count })
let latest_reply =
bundled_thread.latest_event.get_field::<OwnedEventId>("event_id").ok().flatten();
ThreadSummaryStatus::Some(ThreadSummary { num_replies: count, latest_reply })
}
Ok(_) => ThreadSummaryStatus::None,
Err(_) => ThreadSummaryStatus::Unknown,

View File

@@ -37,7 +37,7 @@ use super::{
};
use crate::timeline::{
event_handler::{FailedToParseEvent, RemovedItem, TimelineAction},
ThreadSummary, TimelineDetails, VirtualTimelineItem,
ThreadSummary, ThreadSummaryLatestEvent, TimelineDetails, VirtualTimelineItem,
};
pub(in crate::timeline) struct TimelineStateTransaction<'a> {
@@ -555,13 +555,54 @@ impl<'a> TimelineStateTransaction<'a> {
settings: &TimelineSettings,
date_divider_adjuster: &mut DateDividerAdjuster,
) -> RemovedItem {
// TODO: do something with the thread summary!
let TimelineEvent { push_actions, kind, thread_summary } = event;
let thread_summary = thread_summary.summary().map(|summary| ThreadSummary {
latest_event: TimelineDetails::Unavailable,
num_replies: summary.num_replies,
});
let thread_summary = if let Some(summary) = thread_summary.summary() {
let latest_reply_item =
if let Some(event_id) = summary.latest_reply.as_ref() {
// Attempt to load the timeline event, either from the event cache or the
// storage.
let event = room_data_provider
.load_event(event_id)
.await
.inspect_err(|err| {
warn!("Failed to load thread latest event: {err}");
})
.ok();
if let Some(event) = event {
// lol @ hack
crate::timeline::RepliedToEvent::try_from_timeline_event(
event,
room_data_provider,
&self.items,
&mut self.meta,
)
.await
.inspect_err(|err| {
warn!("Failed to extract thread event into a timeline item content: {err}");
})
.ok()
.flatten()
.map(|replied_to| Box::new(ThreadSummaryLatestEvent {
content: replied_to.content().clone(),
sender: replied_to.sender().to_owned(),
sender_profile: replied_to.sender_profile().clone(),
}))
} else {
None
}
} else {
None
};
Some(ThreadSummary {
latest_event: TimelineDetails::from_initial_value(latest_reply_item),
num_replies: summary.num_replies,
})
} else {
None
};
let encryption_info = kind.encryption_info().cloned();

View File

@@ -437,4 +437,8 @@ impl RoomDataProvider for TestRoomDataProvider {
) -> Result<Relations, matrix_sdk::Error> {
unimplemented!();
}
async fn load_event<'a>(&'a self, _event_id: &'a EventId) -> matrix_sdk::Result<TimelineEvent> {
unimplemented!();
}
}

View File

@@ -145,6 +145,12 @@ pub(super) trait RoomDataProvider:
) -> impl Future<Output = Option<Arc<EncryptionInfo>>> + SendOutsideWasm;
async fn relations(&self, event_id: OwnedEventId, opts: RelationsOptions) -> Result<Relations>;
/// Loads an event from the cache or network.
fn load_event<'a>(
&'a self,
event_id: &'a EventId,
) -> impl Future<Output = Result<TimelineEvent>> + SendOutsideWasm + 'a;
}
impl RoomDataProvider for Room {
@@ -294,6 +300,10 @@ impl RoomDataProvider for Room {
async fn relations(&self, event_id: OwnedEventId, opts: RelationsOptions) -> Result<Relations> {
self.relations(event_id, opts).await
}
async fn load_event<'a>(&'a self, event_id: &'a EventId) -> Result<TimelineEvent> {
self.load_or_fetch_event(event_id, None).await
}
}
// Internal helper to make most of retry_event_decryption independent of a room

View File

@@ -213,16 +213,18 @@ async fn test_extract_bundled_thread_summary() {
let f = EventFactory::new().room(room_id).sender(&ALICE);
let thread_event_id = event_id!("$thread_root");
let latest_event_id = event_id!("$latest_event");
let latest_event = f.text_msg("the last one!").event_id(latest_event_id).into_event();
let event = f
.text_msg("thready thread mcthreadface")
.with_bundled_thread_summary(
f.text_msg("the last one!").event_id(latest_event_id).into_raw(),
42,
false,
)
.with_bundled_thread_summary(latest_event.raw().cast_ref().clone(), 42, false)
.event_id(thread_event_id);
// Set up the /event for the latest thread event.
// FIXME(bnjbvr): shouldn't be necessary, the event cache could save the bundled
// latest event instead.
server.mock_room_event().match_event_id().ok(latest_event).mock_once().mount().await;
server.sync_room(&client, JoinedRoomBuilder::new(room_id).add_timeline_event(event)).await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
@@ -234,8 +236,15 @@ async fn test_extract_bundled_thread_summary() {
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
assert_let!(Some(summary) = event_item.content().thread_summary());
// Soon™, Stefan, soon™.
assert!(summary.latest_event.is_unavailable());
// We get the latest event from the bundled thread summary.
assert!(summary.latest_event.is_ready());
assert_let!(TimelineDetails::Ready(latest_event) = summary.latest_event);
assert_eq!(latest_event.content.as_message().unwrap().body(), "the last one!");
assert_eq!(latest_event.sender, *ALICE);
assert!(latest_event.sender_profile.is_unavailable());
// We get the count from the bundled thread summary.
assert_eq!(summary.num_replies, 42);
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[1]);
@@ -321,9 +330,15 @@ async fn test_new_thread_reply_causes_thread_summary() {
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
assert!(event_item.content().thread_root().is_none());
// The thread summary contains the detailed information about the latest event.
assert_let!(Some(summary) = event_item.content().thread_summary());
// Soon™, Stefan, soon™.
assert!(summary.latest_event.is_unavailable());
assert!(summary.latest_event.is_ready());
assert_let!(TimelineDetails::Ready(latest_event) = summary.latest_event);
assert_eq!(latest_event.content.as_message().unwrap().body(), "thread reply");
assert_eq!(latest_event.sender, *BOB);
assert!(latest_event.sender_profile.is_unavailable());
// The thread summary contains the number of replies.
assert_eq!(summary.num_replies, 1);
assert_pending!(stream);
@@ -373,8 +388,14 @@ async fn test_new_thread_reply_causes_thread_summary() {
assert!(event_item.content().thread_root().is_none());
assert_let!(Some(summary) = event_item.content().thread_summary());
// Soon™, Stefan, soon™.
assert!(summary.latest_event.is_unavailable());
// The latest event has been updated.
assert!(summary.latest_event.is_ready());
assert_let!(TimelineDetails::Ready(latest_event) = summary.latest_event);
assert_eq!(latest_event.content.as_message().unwrap().body(), "another thread reply");
assert_eq!(latest_event.sender, *BOB);
assert!(latest_event.sender_profile.is_unavailable());
// The number of replies has been updated.
assert_eq!(summary.num_replies, 2);
}

View File

@@ -352,7 +352,7 @@ impl RoomPagination {
};
let next_diffs = state
.with_events_mut(|room_events| {
.with_events_mut(false, |room_events| {
// Reverse the order of the events as `/messages` has been called with `dir=b`
// (backwards). The `RoomEvents` API expects the first event to be the oldest.
// Let's re-order them for this block.

View File

@@ -429,7 +429,7 @@ impl RoomEventCacheInner {
// Add the previous back-pagination token (if present), followed by the timeline
// events themselves.
let new_timeline_event_diffs = state
.with_events_mut(|room_events| {
.with_events_mut(true, |room_events| {
// If we only received duplicated events, we don't need to store the gap: if
// there was a gap, we'd have received an unknown event at the tail of
// the room's timeline (unless the server reordered sync events since the last
@@ -1161,6 +1161,7 @@ mod private {
#[instrument(skip_all, fields(room_id = %self.room))]
pub async fn with_events_mut<F>(
&mut self,
is_live_sync: bool,
func: F,
) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError>
where
@@ -1173,7 +1174,7 @@ mod private {
for event in &events_to_post_process {
self.maybe_apply_new_redaction(event).await?;
self.analyze_thread_root(event).await?;
self.analyze_thread_root(event, is_live_sync).await?;
}
// If we've never waited for an initial previous-batch token, and we now have at
@@ -1192,7 +1193,11 @@ mod private {
/// If the event is a threaded reply, ensure the related thread's root
/// event (i.e. first thread event) has a thread summary.
#[instrument(skip_all)]
async fn analyze_thread_root(&mut self, event: &Event) -> Result<(), EventCacheError> {
async fn analyze_thread_root(
&mut self,
event: &Event,
is_live_sync: bool,
) -> Result<(), EventCacheError> {
let Some(thread_root) = extract_thread_root(event.raw()) else {
// No thread root, carry on.
return Ok(());
@@ -1219,13 +1224,34 @@ mod private {
related_thread_events.len()
};
let new_summary = ThreadSummary { num_replies };
let prev_summary = target_event.thread_summary.summary();
let mut latest_reply =
prev_summary.as_ref().and_then(|summary| summary.latest_reply.clone());
if let ThreadSummaryStatus::Some(existing) = &target_event.thread_summary {
if existing == &new_summary {
trace!("thread summary is already up-to-date");
return Ok(());
}
// If we're live-syncing, then the latest event is always the event we're
// currently processing. We're processing the sync events from oldest to newest,
// so a a single sync response containing multiple thread events
// will correctly override the latest event to the most recent one.
//
// If we're back-paginating, then we shouldn't update the latest event
// information if it's set. If it's not set, then we should update
// it to the last event in the batch. TODO(bnjbvr): the code is
// wrong here in this particular case, because a single pagination
// batch may include multiple events in the same thread, and they're
// processed from oldest to newest; so the first in-thread event seen in that
// batch will be marked as the latest reply, which is incorrect.
// This will be fixed Later™ by using a proper linked chunk per
// thread.
if is_live_sync || latest_reply.is_none() {
latest_reply = event.event_id();
}
let new_summary = ThreadSummary { num_replies, latest_reply };
if prev_summary == Some(&new_summary) {
trace!("thread summary is already up-to-date");
return Ok(());
}
// Cause an update to observers.