perf(timeline): make replacing replies much faster by indexing replies

This commit is contained in:
Benjamin Bouvier
2025-02-13 17:57:47 +01:00
parent 1c114978e4
commit 59f9d12da5
5 changed files with 102 additions and 40 deletions

View File

@@ -12,7 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{num::NonZeroUsize, sync::Arc};
use std::{
collections::{BTreeSet, HashMap},
num::NonZeroUsize,
sync::Arc,
};
use matrix_sdk::ring_buffer::RingBuffer;
use ruma::{EventId, OwnedEventId, OwnedUserId, RoomVersionId};
@@ -75,6 +79,9 @@ pub(in crate::timeline) struct TimelineMetadata {
/// Aggregation metadata and pending aggregations.
pub aggregations: Aggregations,
/// Given an event, what are all the events that are replies to it?
pub replies: HashMap<OwnedEventId, BTreeSet<OwnedEventId>>,
/// Edit events received before the related event they're editing.
pub pending_edits: RingBuffer<PendingEdit>,
@@ -114,6 +121,7 @@ impl TimelineMetadata {
next_internal_id: Default::default(),
aggregations: Default::default(),
pending_edits: RingBuffer::new(MAX_NUM_STASHED_PENDING_EDITS),
replies: Default::default(),
fully_read_event: Default::default(),
// It doesn't make sense to set this to false until we fill the `fully_read_event`
// field, otherwise we'll keep on exiting early in `Self::update_read_marker`.
@@ -130,6 +138,7 @@ impl TimelineMetadata {
// Note: we don't clear the next internal id to avoid bad cases of stale unique
// ids across timeline clears.
self.aggregations.clear();
self.replies.clear();
self.pending_edits.clear();
self.fully_read_event = None;
// We forgot about the fully read marker right above, so wait for a new one

View File

@@ -235,6 +235,11 @@ impl<'observable_items> ObservableItemsTransaction<'observable_items> {
self.all_remote_events.get_by_event_id_mut(event_id)
}
/// Get a remote event by using an event ID.
pub fn get_remote_event_by_event_id(&self, event_id: &EventId) -> Option<&EventMeta> {
self.all_remote_events.get_by_event_id(event_id)
}
/// Replace a timeline item at position `timeline_item_index` by
/// `timeline_item`.
pub fn replace(
@@ -346,11 +351,6 @@ pub struct ObservableItemsTransactionEntry<'observable_transaction_items, 'obser
}
impl ObservableItemsTransactionEntry<'_, '_> {
/// Replace the timeline item by `timeline_item`.
pub fn replace(this: &mut Self, timeline_item: Arc<TimelineItem>) -> Arc<TimelineItem> {
ObservableVectorTransactionEntry::set(&mut this.entry, timeline_item)
}
/// Remove this timeline item.
pub fn remove(this: Self) {
let entry_index = ObservableVectorTransactionEntry::index(&this.entry);
@@ -1251,6 +1251,11 @@ impl AllRemoteEvents {
self.0.iter_mut().rev().find(|event_meta| event_meta.event_id == event_id)
}
/// Get an immutable reference to a specific remote event by its ID.
pub fn get_by_event_id(&self, event_id: &EventId) -> Option<&EventMeta> {
self.0.iter().rev().find(|event_meta| event_meta.event_id == event_id)
}
/// Shift to the right all timeline item indexes that are equal to or
/// greater than `new_timeline_item_index`.
fn increment_all_timeline_item_index_after(&mut self, new_timeline_item_index: usize) {

View File

@@ -508,7 +508,7 @@ impl<'a> TimelineStateTransaction<'a> {
self.items.commit();
}
/// Add or update a remote event in the
/// Add or update a remote event in the
/// [`ObservableItems::all_remote_events`] collection.
///
/// This method also adjusts read receipt if needed.

View File

@@ -38,7 +38,7 @@ use ruma::{
room::{
encrypted::RoomEncryptedEventContent,
member::RoomMemberEventContent,
message::{self, RoomMessageEventContent, RoomMessageEventContentWithoutRelation},
message::{Relation, RoomMessageEventContent, RoomMessageEventContentWithoutRelation},
},
AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncStateEvent,
AnySyncTimelineEvent, BundledMessageLikeRelations, EventContent, FullStateEventContent,
@@ -54,8 +54,8 @@ use super::{
algorithms::{rfind_event_by_id, rfind_event_by_item_id},
controller::{
find_item_and_apply_aggregation, Aggregation, AggregationKind, ApplyAggregationResult,
ObservableItemsTransaction, ObservableItemsTransactionEntry, PendingEdit, PendingEditKind,
TimelineMetadata, TimelineStateTransaction,
ObservableItemsTransaction, PendingEdit, PendingEditKind, TimelineMetadata,
TimelineStateTransaction,
},
date_dividers::DateDividerAdjuster,
event_item::{
@@ -398,7 +398,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
}
AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent {
relates_to: Some(message::Relation::Replacement(re)),
relates_to: Some(Relation::Replacement(re)),
..
}) => {
self.handle_room_message_edit(re);
@@ -578,6 +578,27 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
let edit_json = edit_json.flatten();
// If this message is a reply to another message, add an entry in the inverted
// mapping.
if let Some(event_id) = self.ctx.flow.event_id() {
let replied_to_event_id =
msg.relates_to.as_ref().and_then(|relates_to| match relates_to {
Relation::Reply { in_reply_to } => Some(in_reply_to.event_id.clone()),
Relation::Thread(thread) => {
thread.in_reply_to.as_ref().map(|in_reply_to| in_reply_to.event_id.clone())
}
_ => None,
});
if let Some(replied_to_event_id) = replied_to_event_id {
// This is a reply! Add an entry.
self.meta
.replies
.entry(replied_to_event_id)
.or_default()
.insert(event_id.to_owned());
}
}
self.add_item(
TimelineItemContent::message(msg, edit_content, self.items, Default::default()),
edit_json,
@@ -597,7 +618,12 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
let internal_id = item.internal_id.to_owned();
// Update all events that replied to this message with the edited content.
Self::maybe_update_responses(self.items, &replacement.event_id, &new_item);
Self::maybe_update_responses(
self.meta,
self.items,
&replacement.event_id,
&new_item,
);
// Update the event itself.
self.items.replace(item_pos, TimelineItem::new(new_item, internal_id));
@@ -902,7 +928,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
// Look for any timeline event that's a reply to the redacted event, and redact
// the replied-to event there as well.
Self::maybe_update_responses(self.items, &redacted, &new_item);
Self::maybe_update_responses(self.meta, self.items, &redacted, &new_item);
self.items.replace(idx, TimelineItem::new(new_item, internal_id));
self.result.items_updated += 1;
@@ -1181,7 +1207,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
trace!("Updating timeline item at position {idx}");
// Update all events that replied to this previously encrypted message.
Self::maybe_update_responses(self.items, decrypted_event_id, &item);
Self::maybe_update_responses(self.meta, self.items, decrypted_event_id, &item);
let internal_id = self.items[*idx].internal_id.clone();
self.items.replace(*idx, TimelineItem::new(item, internal_id));
@@ -1266,29 +1292,47 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
/// After updating the timeline item `new_item` which id is
/// `target_event_id`, update other items that are responses to this item.
fn maybe_update_responses(
meta: &mut TimelineMetadata,
items: &mut ObservableItemsTransaction<'_>,
target_event_id: &EventId,
new_item: &EventTimelineItem,
) {
items.for_each(|mut entry| {
let Some(event_item) = entry.as_event() else { return };
let Some(message) = event_item.content.as_message() else { return };
let Some(in_reply_to) = message.in_reply_to() else { return };
if target_event_id == in_reply_to.event_id {
trace!(reply_event_id = ?event_item.identifier(), "Updating response to edited event");
let in_reply_to = InReplyToDetails {
event_id: in_reply_to.event_id.clone(),
event: TimelineDetails::Ready(Box::new(
RepliedToEvent::from_timeline_item(new_item),
)),
};
let new_reply_content =
let Some(replies) = meta.replies.get(target_event_id) else {
trace!("item has no replies");
return;
};
for reply_id in replies {
let Some(timeline_item_index) = items
.get_remote_event_by_event_id(reply_id)
.and_then(|meta| meta.timeline_item_index)
else {
warn!(%reply_id, "event not known as an item in the timeline");
continue;
};
let Some(item) = items.get(timeline_item_index) else {
warn!(%reply_id, timeline_item_index, "mapping from event id to timeline item likely incorrect");
continue;
};
let Some(event_item) = item.as_event() else { continue };
let Some(message) = event_item.content.as_message() else { continue };
let Some(in_reply_to) = message.in_reply_to() else { continue };
trace!(reply_event_id = ?event_item.identifier(), "Updating response to updated event");
let in_reply_to = InReplyToDetails {
event_id: in_reply_to.event_id.clone(),
event: TimelineDetails::Ready(Box::new(RepliedToEvent::from_timeline_item(
new_item,
))),
};
let new_reply_content =
TimelineItemContent::Message(message.with_in_reply_to(in_reply_to));
let new_reply_item =
entry.with_kind(event_item.with_content(new_reply_content));
ObservableItemsTransactionEntry::replace(&mut entry, new_reply_item);
}
});
let new_reply_item = item.with_kind(event_item.with_content(new_reply_content));
items.replace(timeline_item_index, new_reply_item);
}
}
}

View File

@@ -409,7 +409,9 @@ async fn test_edit_to_replied_updates_reply() {
timeline.subscribe_filter_map(|item| item.as_event().cloned()).await;
let f = EventFactory::new();
let event_id = event_id!("$original_event");
let eid1 = event_id!("$original_event");
let eid2 = event_id!("$reply1");
let eid3 = event_id!("$reply2");
let user_id = client.user_id().unwrap();
// When a room has two messages, one is a reply to the other…
@@ -417,9 +419,11 @@ async fn test_edit_to_replied_updates_reply() {
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("bonjour").sender(user_id).event_id(event_id))
.add_timeline_event(f.text_msg("hi back").reply_to(event_id).sender(*ALICE))
.add_timeline_event(f.text_msg("yo").reply_to(event_id).sender(*BOB)),
.add_timeline_event(f.text_msg("bonjour").sender(user_id).event_id(eid1))
.add_timeline_event(
f.text_msg("hi back").reply_to(eid1).sender(*ALICE).event_id(eid2),
)
.add_timeline_event(f.text_msg("yo").reply_to(eid1).sender(*BOB).event_id(eid3)),
)
.await;
@@ -435,7 +439,7 @@ async fn test_edit_to_replied_updates_reply() {
assert_eq!(reply_message.body(), "hi back");
let in_reply_to = reply_message.in_reply_to().unwrap();
assert_eq!(in_reply_to.event_id, event_id);
assert_eq!(in_reply_to.event_id, eid1);
assert_let!(TimelineDetails::Ready(replied_to) = &in_reply_to.event);
assert_eq!(replied_to.content().as_message().unwrap().body(), "bonjour");
@@ -446,7 +450,7 @@ async fn test_edit_to_replied_updates_reply() {
assert_eq!(reply_message.body(), "yo");
let in_reply_to = reply_message.in_reply_to().unwrap();
assert_eq!(in_reply_to.event_id, event_id);
assert_eq!(in_reply_to.event_id, eid1);
assert_let!(TimelineDetails::Ready(replied_to) = &in_reply_to.event);
assert_eq!(replied_to.content().as_message().unwrap().body(), "bonjour");
@@ -474,7 +478,7 @@ async fn test_edit_to_replied_updates_reply() {
assert!(!reply_message.is_edited());
let in_reply_to = reply_message.in_reply_to().unwrap();
assert_eq!(in_reply_to.event_id, event_id);
assert_eq!(in_reply_to.event_id, eid1);
assert_let!(TimelineDetails::Ready(replied_to) = &in_reply_to.event);
assert_eq!(replied_to.content().as_message().unwrap().body(), "hello world");
});
@@ -485,7 +489,7 @@ async fn test_edit_to_replied_updates_reply() {
assert!(!reply_message.is_edited());
let in_reply_to = reply_message.in_reply_to().unwrap();
assert_eq!(in_reply_to.event_id, event_id);
assert_eq!(in_reply_to.event_id, eid1);
assert_let!(TimelineDetails::Ready(replied_to) = &in_reply_to.event);
assert_eq!(replied_to.content().as_message().unwrap().body(), "hello world");
});