feat(event cache): replace events we just sent, instead of removing and reinserting them

This commit is contained in:
Benjamin Bouvier
2026-04-22 14:34:27 +02:00
parent 103ff7d3ec
commit f714b703c4
7 changed files with 368 additions and 21 deletions

View File

@@ -204,6 +204,7 @@ pub struct TimelineVectorDiffs {
}
/// An enum representing where an event has been found.
#[derive(Debug)]
pub(super) enum EventLocation {
/// Event lives in memory (and likely in the store!).
Memory(Position),

View File

@@ -514,6 +514,7 @@ impl RoomEventCacheInner {
timeline,
ephemeral_events,
ambiguity_changes,
None,
)
.await
}
@@ -523,6 +524,7 @@ impl RoomEventCacheInner {
/// The event is inserted if and only if the cache is not empty.
async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
let state = self.state.write().await?;
let sent_event_id = event.event_id();
// Insert the event if the room is not empty, otherwise it can break the
// pagination logic when detecting the start of the timeline because no gap can
@@ -534,6 +536,7 @@ impl RoomEventCacheInner {
Timeline { limited: false, prev_batch: None, events: vec![event] },
Vec::new(),
BTreeMap::new(),
sent_event_id,
)
.await;
}
@@ -547,6 +550,7 @@ impl RoomEventCacheInner {
timeline: Timeline,
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
sent_event_id_from_send_queue: Option<OwnedEventId>,
) -> Result<()> {
if timeline.events.is_empty()
&& timeline.prev_batch.is_none()
@@ -562,6 +566,10 @@ impl RoomEventCacheInner {
let (stored_prev_batch_token, timeline_event_diffs) =
state.handle_sync(timeline, &ephemeral_events).await?;
if let Some(event_id) = sent_event_id_from_send_queue {
state.mark_event_as_sent_from_send_queue(event_id);
}
drop(state);
// Now that all events have been added, we can trigger the

View File

@@ -18,6 +18,7 @@
//! [`RoomEventCache`]: super::super::super::RoomEventCache
use std::{
collections::HashSet,
pin::Pin,
sync::Arc,
task::{Context, Poll},
@@ -360,20 +361,34 @@ impl PaginatedCache for Arc<RoomEventCacheInner> {
all_events: mut events,
in_memory_duplicated_event_ids,
in_store_duplicated_event_ids,
in_place_replacements,
non_empty_all_duplicates: all_duplicates,
} = {
let room_linked_chunk = state.room_linked_chunk();
// Don't try to replace events in-place when back-paginating, as we want to make
// sure that our sent events are moved at their correct, final
// topological position in the room (and not left at their sync
// ordering position, which is less precise and correct).
let replacement_event_ids = &HashSet::new();
filter_duplicate_events(
&state.state.own_user_id,
&state.store,
LinkedChunkId::Room(&state.state.room_id),
room_linked_chunk,
replacement_event_ids,
events,
)
.await?
};
// See comment above about `replacement_event_ids`.
assert!(
in_place_replacements.is_empty(),
"we don't expect any in-place replacement when back-paginating."
);
// If not all the events have been back-paginated, we need to remove the
// previous ones, otherwise we can end up with misordered events.
//

View File

@@ -57,7 +57,7 @@ use super::{
super::{
super::{
EventCacheError,
deduplicator::{DeduplicationOutcome, filter_duplicate_events},
deduplicator::{DeduplicationOutcome, ReplaceableDuplicate, filter_duplicate_events},
persistence::send_updates_to_store,
},
EventLocation, TimelineVectorDiffs,
@@ -151,6 +151,10 @@ pub struct RoomEventCacheState {
/// A copy of the automatic pagination API object.
automatic_pagination: Option<AutomaticPagination>,
/// Event IDs sent by the user in the current session, and not remote-echoed
/// yet.
send_queue_event_ids: HashSet<OwnedEventId>,
}
impl RoomEventCacheState {
@@ -395,6 +399,7 @@ impl LockedRoomEventCacheState {
subscriber_count: Default::default(),
pinned_event_cache: OnceLock::new(),
automatic_pagination,
send_queue_event_ids: HashSet::new(),
}))
}
}
@@ -573,6 +578,12 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
&mut self.state.waited_for_initial_prev_token
}
/// Mark event ID that were inserted by the send queue during the current
/// session.
pub fn mark_event_as_sent_from_send_queue(&mut self, event_id: OwnedEventId) {
self.state.send_queue_event_ids.insert(event_id);
}
/// Find a single event in this room.
///
/// It starts by looking into loaded events in `EventLinkedChunk` before
@@ -721,6 +732,25 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
self.propagate_changes().await
}
/// Apply in-place replacements of duplicate events, given their position.
pub(super) async fn apply_in_place_replacements(
&mut self,
replacements: Vec<ReplaceableDuplicate>,
) -> Result<Vec<Event>, EventCacheError> {
let mut replaced_events = Vec::with_capacity(replacements.len());
for replacement in replacements {
// Replace the event in the store or in the in-memory linked chunk.
self.replace_event_at(replacement.location, replacement.event.clone()).await?;
self.state.send_queue_event_ids.remove(&replacement.event_id);
replaced_events.push(replacement.event);
}
Ok(replaced_events)
}
async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
let updates = self.state.room_linked_chunk.store_updates().take();
@@ -775,6 +805,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
self.state.room_linked_chunk.reset();
self.state.send_queue_event_ids.clear();
// No need to update the thread summaries: the room events are
// gone because of the reset of `room_linked_chunk`.
@@ -816,14 +847,16 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
let DeduplicationOutcome {
all_events: events,
in_memory_duplicated_event_ids,
in_store_duplicated_event_ids,
mut in_memory_duplicated_event_ids,
mut in_store_duplicated_event_ids,
in_place_replacements,
non_empty_all_duplicates: all_duplicates,
} = filter_duplicate_events(
&self.state.own_user_id,
&self.store,
LinkedChunkId::Room(&self.state.room_id),
&self.state.room_linked_chunk,
&self.state.send_queue_event_ids,
timeline.events,
)
.await?;
@@ -848,7 +881,24 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
let has_new_gap = prev_batch.is_some();
if has_new_gap {
let (replacement_event_ids, replaced_events) = if has_new_gap {
// If we have a gap, we can't just replace events! The new timeline contains a
// gap *then* the events; if we only replaced the events, then the
// replaced events would be *before* the gap, and end up in the
// wrong order.
//
// Revert the in-place replacements to a removal of the duplicated events.
for re in in_place_replacements {
match re.location {
EventLocation::Memory(position) => {
in_memory_duplicated_event_ids.push((re.event_id, position));
}
EventLocation::Store => {
in_store_duplicated_event_ids.push((re.event_id, re.position));
}
}
}
// Sad time: there's a gap, somewhere, in the timeline, and there's at least one
// non-duplicated event. We don't know which threads might have gappy, so we
// must invalidate them all :(
@@ -880,19 +930,42 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
self.replace_event_at(location, target_event).await?;
}
}
}
(Default::default(), Default::default())
} else {
// We don't have a gap, so it's safe to apply the in-place replacements \o/
let replacement_event_ids = in_place_replacements
.iter()
.map(|replacement| replacement.event_id.clone())
.collect::<HashSet<_>>();
// This applies the replacements in memory and in storage, and flush updates to
// storage.
let replaced_events = self.apply_in_place_replacements(in_place_replacements).await?;
(replacement_event_ids, replaced_events)
};
if all_duplicates {
// No new events and no gap (per the previous check), thus no need to change the
// room state. We're done!
// room state. We're done, except that we may still have replaced a local echo.
let new_receipt = extract_read_receipt(ephemeral_events);
// We might have a new read receipt, though! If that's the case, handle it for
// unread counts tracking.
if let Some(new_receipt) = extract_read_receipt(ephemeral_events) {
self.update_read_receipts(Some(&new_receipt)).await?;
if replaced_events.is_empty() {
if let Some(new_receipt) = new_receipt {
self.update_read_receipts(Some(&new_receipt)).await?;
}
return Ok((false, Vec::new()));
}
return Ok((false, Vec::new()));
// If we've replaced at least one local echo, we still need to post-process it
// properly.
self.post_process_new_events(replaced_events, PostProcessingOrigin::Sync, new_receipt)
.await?;
let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
return Ok((false, timeline_event_diffs));
}
// If we've never waited for an initial previous-batch token, and we've now
@@ -907,9 +980,19 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
// events, because we are pushing all _new_ `events` at the back.
self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids).await?;
self.state
.room_linked_chunk
.push_live_events(prev_batch.map(|prev_token| Gap { token: prev_token }), &events);
// Only push live events that haven't replaced earlier.
let events_to_push = events
.iter()
.filter(|event| {
event.event_id().is_none_or(|event_id| !replacement_event_ids.contains(&event_id))
})
.cloned()
.collect::<Vec<_>>();
self.state.room_linked_chunk.push_live_events(
prev_batch.map(|prev_token| Gap { token: prev_token }),
&events_to_push,
);
// Extract a new read receipt, if available.
let new_receipt = extract_read_receipt(ephemeral_events);

View File

@@ -322,6 +322,7 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> {
all_events: new_events,
in_memory_duplicated_event_ids,
in_store_duplicated_event_ids,
in_place_replacements: Vec::new(),
non_empty_all_duplicates,
}
}

View File

@@ -15,7 +15,7 @@
//! Simple but efficient types to find duplicated events. See [`Deduplicator`]
//! to learn more.
use std::collections::BTreeSet;
use std::collections::{BTreeSet, HashSet};
use matrix_sdk_base::{
event_cache::store::EventCacheStoreLockGuard,
@@ -25,7 +25,10 @@ use ruma::{OwnedEventId, UserId};
use super::{
EventCacheError,
caches::event_linked_chunk::{Event, EventLinkedChunk},
caches::{
EventLocation,
event_linked_chunk::{Event, EventLinkedChunk},
},
};
/// Find duplicates in the given collection of new events, and return relevant
@@ -36,6 +39,7 @@ pub async fn filter_duplicate_events(
store_guard: &EventCacheStoreLockGuard,
linked_chunk_id: LinkedChunkId<'_>,
linked_chunk: &EventLinkedChunk,
replaceable_event_ids: &HashSet<OwnedEventId>,
mut new_events: Vec<Event>,
) -> Result<DeduplicationOutcome, EventCacheError> {
// Remove all events with no ID, or that are duplicated among the new events,
@@ -61,23 +65,46 @@ pub async fn filter_duplicate_events(
// Separate duplicated events in two collections: ones that are in-memory, ones
// that are in the store.
let (in_memory_duplicated_event_ids, in_store_duplicated_event_ids) = {
let (in_memory_duplicated_event_ids, in_store_duplicated_event_ids, in_place_replacements) = {
// Collect all in-memory chunk identifiers.
let in_memory_chunk_identifiers =
linked_chunk.chunks().map(|chunk| chunk.identifier()).collect::<Vec<_>>();
let mut in_memory = vec![];
let mut in_store = vec![];
let mut in_place_replacements = vec![];
for (duplicated_event_id, position) in duplicated_event_ids {
if in_memory_chunk_identifiers.contains(&position.chunk_identifier()) {
if replaceable_event_ids.contains(&duplicated_event_id) {
// The event is replaceable: find the new content of the event in `new_events`.
let Some(event) = new_events
.iter()
.find(|ev| ev.event_id().as_ref() == Some(&duplicated_event_id))
else {
continue;
};
let location = if in_memory_chunk_identifiers.contains(&position.chunk_identifier())
{
EventLocation::Memory(position)
} else {
EventLocation::Store
};
in_place_replacements.push(ReplaceableDuplicate {
event_id: duplicated_event_id,
position,
location,
event: event.clone(),
});
} else if in_memory_chunk_identifiers.contains(&position.chunk_identifier()) {
in_memory.push((duplicated_event_id, position));
} else {
in_store.push((duplicated_event_id, position));
}
}
(in_memory, in_store)
(in_memory, in_store, in_place_replacements)
};
// See comment of `DeduplicationOutcome::non_empty_all_duplicates` for the
@@ -86,7 +113,8 @@ pub async fn filter_duplicate_events(
new_events.iter().any(|ev| ev.sender().is_some_and(|sender| sender != own_user_id));
let all_duplicates = (in_memory_duplicated_event_ids.len()
+ in_store_duplicated_event_ids.len())
+ in_store_duplicated_event_ids.len()
+ in_place_replacements.len())
== new_events.len();
let non_empty_all_duplicates = at_least_one_event_not_sent_by_me && all_duplicates;
@@ -95,6 +123,7 @@ pub async fn filter_duplicate_events(
all_events: new_events,
in_memory_duplicated_event_ids,
in_store_duplicated_event_ids,
in_place_replacements,
non_empty_all_duplicates,
})
}
@@ -122,6 +151,10 @@ pub(super) struct DeduplicationOutcome {
/// (position is descending).
pub in_store_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
/// Duplicates that can be replaced in place instead of being removed and
/// reinserted.
pub in_place_replacements: Vec<ReplaceableDuplicate>,
/// Whether there's at least one new event sent by some other user, and all
/// new events are duplicate.
///
@@ -151,11 +184,27 @@ pub(super) struct DeduplicationOutcome {
pub non_empty_all_duplicates: bool,
}
/// A duplicate event that can be replaced in place, instead of being removed
/// and reinserted.
pub(super) struct ReplaceableDuplicate {
/// The ID of the duplicated event.
pub event_id: OwnedEventId,
/// The previous position of the event in the linked chunk.
pub position: Position,
/// The location of the duplicated event in the linked chunk (memory or
/// store).
pub location: EventLocation,
/// The new version of the duplicated event that can be used as a
/// replacement.
pub event: Event,
}
#[cfg(test)]
#[cfg(not(target_family = "wasm"))] // These tests uses the cross-process lock, so need time support.
mod tests {
use std::ops::Not as _;
use std::{collections::HashSet, ops::Not as _};
use assert_matches2::assert_let;
use matrix_sdk_base::{
deserialized_responses::TimelineEvent, event_cache::store::EventCacheStoreLock,
linked_chunk::ChunkIdentifier,
@@ -254,6 +303,7 @@ mod tests {
event_cache_store_guard,
LinkedChunkId::Room(room_id),
&linked_chunk,
&HashSet::new(),
vec![event_0.clone(), event_1.clone(), event_2.clone(), event_3.clone()],
)
.await
@@ -270,6 +320,7 @@ mod tests {
event_cache_store_guard,
LinkedChunkId::Room(room_id),
&linked_chunk,
&HashSet::new(),
vec![event_0, event_1, event_2, event_3, event_4],
)
.await
@@ -311,6 +362,7 @@ mod tests {
outcome.in_store_duplicated_event_ids[1],
(event_id_1, Position::new(ChunkIdentifier::new(42), 1))
);
assert!(outcome.in_place_replacements.is_empty());
}
#[async_test]
@@ -384,12 +436,14 @@ mod tests {
all_events: events,
in_memory_duplicated_event_ids,
in_store_duplicated_event_ids,
in_place_replacements,
non_empty_all_duplicates,
} = filter_duplicate_events(
user_id,
event_cache_store_guard,
LinkedChunkId::Room(room_id),
&linked_chunk,
&HashSet::new(),
vec![ev1, ev2, ev3, ev4],
)
.await
@@ -403,6 +457,7 @@ mod tests {
assert_eq!(events[2].event_id().as_deref(), Some(eid3));
assert!(in_memory_duplicated_event_ids.is_empty());
assert!(in_place_replacements.is_empty());
assert_eq!(in_store_duplicated_event_ids.len(), 2);
assert_eq!(
@@ -414,4 +469,83 @@ mod tests {
(eid2.to_owned(), Position::new(ChunkIdentifier::new(43), 0))
);
}
#[async_test]
async fn test_in_place_replaceable_duplicates() {
use std::sync::Arc;
use matrix_sdk_base::{
event_cache::store::{EventCacheStore, MemoryStore},
linked_chunk::Update,
};
use ruma::room_id;
let user_id = user_id!("@user:example.com");
let room_id = room_id!("!fondue:raclette.ch");
// Simulate an event that's been saved in the send queue already, and that could
// be replaced later.
let event_id = owned_event_id!("$ev0");
let event = timeline_event(&event_id);
let event_cache_store = Arc::new(MemoryStore::new());
event_cache_store
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![event.clone()],
},
],
)
.await
.unwrap();
let event_cache_store = EventCacheStoreLock::new(
event_cache_store,
CrossProcessLockConfig::multi_process("hodor"),
);
let event_cache_store = event_cache_store.lock().await.unwrap();
let event_cache_store_guard = event_cache_store.as_clean().unwrap();
let mut linked_chunk = EventLinkedChunk::new();
linked_chunk.push_events([event.clone()]);
// Indicate that the event is replaceable, for example because it's been
// inserted by the send queue earlier.
let replaceable_event_ids = HashSet::from([event_id.clone()]);
let outcome = filter_duplicate_events(
user_id,
event_cache_store_guard,
LinkedChunkId::Room(room_id),
&linked_chunk,
&replaceable_event_ids,
vec![event],
)
.await
.unwrap();
assert!(outcome.in_memory_duplicated_event_ids.is_empty());
assert!(outcome.in_store_duplicated_event_ids.is_empty());
// The event can be replaced in-place.
assert_eq!(outcome.in_place_replacements.len(), 1);
let replacement = &outcome.in_place_replacements[0];
assert_eq!(replacement.event_id, event_id);
assert_let!(EventLocation::Memory(pos) = &replacement.location);
assert_eq!(*pos, Position::new(ChunkIdentifier::new(0), 0));
assert_eq!(replacement.position, Position::new(ChunkIdentifier::new(0), 0));
assert_eq!(replacement.event.event_id().unwrap(), event_id);
}
}

View File

@@ -3046,3 +3046,108 @@ async fn test_backpaginate_on_a_single_event_inserted_via_send_queue_from_an_emp
// assume so.
assert!(reached_start.not());
}
#[async_test]
async fn test_sync_remote_echo_of_send_queue_event_replaces_in_place() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
let own_user_id = client.user_id().unwrap().to_owned();
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("hello").event_id(event_id!("$1"))),
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
assert_eq!(events.len(), 1);
let event_id_2 = event_id!("$2");
let event_id_3 = event_id!("$3");
server.mock_room_state_encryption().plain().mount().await;
server.mock_room_send().ok(event_id_2).mock_once().mount().await;
// We send an event with the send queue.
room.send_queue()
.send(RoomMessageEventContent::text_plain("Hello, World!").into())
.await
.unwrap();
// It's inserted in the room's event cache.
assert_matches!(
room_stream.recv().await,
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) => {
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Append { values } = &diffs[0]);
assert_eq!(values.len(), 1);
assert_eq!(values[0].event_id().as_deref(), Some(event_id_2));
}
);
// Then we send a second message in this room.
server.mock_room_send().ok(event_id_3).mock_once().mount().await;
room.send_queue()
.send(RoomMessageEventContent::text_plain("Bonjour, Monde!").into())
.await
.unwrap();
// It's saved in the room's event cache too.
assert_matches!(
room_stream.recv().await,
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) => {
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Append { values } = &diffs[0]);
assert_eq!(values.len(), 1);
assert_eq!(values[0].event_id().as_deref(), Some(event_id_3));
}
);
let f = EventFactory::new().room(room_id).sender(&own_user_id);
// Then sync comes back with the first event.
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("Hello, World!").event_id(event_id_2)),
)
.await;
// The event is replaced in place, not removed and appended again.
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
room_stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Set { index: 1, value: event } = &diffs[0]);
assert_eq!(event.event_id().as_deref(), Some(event_id_2));
// Then a second sync comes back with the second event.
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("Bonjour, Monde!").event_id(event_id_3)),
)
.await;
// And the next event is replaced in place too.
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
room_stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Set { index: 2, value: event } = &diffs[0]);
assert_eq!(event.event_id().as_deref(), Some(event_id_3));
// That's all, folks!
}