From f714b703c4bb37740414d786ebb32ed1cb028f02 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Wed, 22 Apr 2026 14:34:27 +0200 Subject: [PATCH] feat(event cache): replace events we just sent, instead of removing and reinserting them --- .../matrix-sdk/src/event_cache/caches/mod.rs | 1 + .../src/event_cache/caches/room/mod.rs | 8 + .../src/event_cache/caches/room/pagination.rs | 15 ++ .../src/event_cache/caches/room/state.rs | 111 +++++++++++-- .../src/event_cache/caches/thread/state.rs | 1 + .../src/event_cache/deduplicator.rs | 148 +++++++++++++++++- .../tests/integration/event_cache/mod.rs | 105 +++++++++++++ 7 files changed, 368 insertions(+), 21 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/mod.rs b/crates/matrix-sdk/src/event_cache/caches/mod.rs index 3c15a7149..5fc7d4896 100644 --- a/crates/matrix-sdk/src/event_cache/caches/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/mod.rs @@ -204,6 +204,7 @@ pub struct TimelineVectorDiffs { } /// An enum representing where an event has been found. +#[derive(Debug)] pub(super) enum EventLocation { /// Event lives in memory (and likely in the store!). Memory(Position), diff --git a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs index 2e8a21eac..befb51bee 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs @@ -514,6 +514,7 @@ impl RoomEventCacheInner { timeline, ephemeral_events, ambiguity_changes, + None, ) .await } @@ -523,6 +524,7 @@ impl RoomEventCacheInner { /// The event is inserted if and only if the cache is not empty. async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> { let state = self.state.write().await?; + let sent_event_id = event.event_id(); // Insert the event if the room is not empty, otherwise it can break the // pagination logic when detecting the start of the timeline because no gap can @@ -534,6 +536,7 @@ impl RoomEventCacheInner { Timeline { limited: false, prev_batch: None, events: vec![event] }, Vec::new(), BTreeMap::new(), + sent_event_id, ) .await; } @@ -547,6 +550,7 @@ impl RoomEventCacheInner { timeline: Timeline, ephemeral_events: Vec>, ambiguity_changes: BTreeMap, + sent_event_id_from_send_queue: Option, ) -> Result<()> { if timeline.events.is_empty() && timeline.prev_batch.is_none() @@ -562,6 +566,10 @@ impl RoomEventCacheInner { let (stored_prev_batch_token, timeline_event_diffs) = state.handle_sync(timeline, &ephemeral_events).await?; + if let Some(event_id) = sent_event_id_from_send_queue { + state.mark_event_as_sent_from_send_queue(event_id); + } + drop(state); // Now that all events have been added, we can trigger the diff --git a/crates/matrix-sdk/src/event_cache/caches/room/pagination.rs b/crates/matrix-sdk/src/event_cache/caches/room/pagination.rs index f2c31ab9e..e68e3d25e 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/pagination.rs @@ -18,6 +18,7 @@ //! [`RoomEventCache`]: super::super::super::RoomEventCache use std::{ + collections::HashSet, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -360,20 +361,34 @@ impl PaginatedCache for Arc { all_events: mut events, in_memory_duplicated_event_ids, in_store_duplicated_event_ids, + in_place_replacements, non_empty_all_duplicates: all_duplicates, } = { let room_linked_chunk = state.room_linked_chunk(); + // Don't try to replace events in-place when back-paginating, as we want to make + // sure that our sent events are moved at their correct, final + // topological position in the room (and not left at their sync + // ordering position, which is less precise and correct). + let replacement_event_ids = &HashSet::new(); + filter_duplicate_events( &state.state.own_user_id, &state.store, LinkedChunkId::Room(&state.state.room_id), room_linked_chunk, + replacement_event_ids, events, ) .await? }; + // See comment above about `replacement_event_ids`. + assert!( + in_place_replacements.is_empty(), + "we don't expect any in-place replacement when back-paginating." + ); + // If not all the events have been back-paginated, we need to remove the // previous ones, otherwise we can end up with misordered events. // diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index addcfe736..c6efdb892 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -57,7 +57,7 @@ use super::{ super::{ super::{ EventCacheError, - deduplicator::{DeduplicationOutcome, filter_duplicate_events}, + deduplicator::{DeduplicationOutcome, ReplaceableDuplicate, filter_duplicate_events}, persistence::send_updates_to_store, }, EventLocation, TimelineVectorDiffs, @@ -151,6 +151,10 @@ pub struct RoomEventCacheState { /// A copy of the automatic pagination API object. automatic_pagination: Option, + + /// Event IDs sent by the user in the current session, and not remote-echoed + /// yet. + send_queue_event_ids: HashSet, } impl RoomEventCacheState { @@ -395,6 +399,7 @@ impl LockedRoomEventCacheState { subscriber_count: Default::default(), pinned_event_cache: OnceLock::new(), automatic_pagination, + send_queue_event_ids: HashSet::new(), })) } } @@ -573,6 +578,12 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { &mut self.state.waited_for_initial_prev_token } + /// Mark event ID that were inserted by the send queue during the current + /// session. + pub fn mark_event_as_sent_from_send_queue(&mut self, event_id: OwnedEventId) { + self.state.send_queue_event_ids.insert(event_id); + } + /// Find a single event in this room. /// /// It starts by looking into loaded events in `EventLinkedChunk` before @@ -721,6 +732,25 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { self.propagate_changes().await } + /// Apply in-place replacements of duplicate events, given their position. + pub(super) async fn apply_in_place_replacements( + &mut self, + replacements: Vec, + ) -> Result, EventCacheError> { + let mut replaced_events = Vec::with_capacity(replacements.len()); + + for replacement in replacements { + // Replace the event in the store or in the in-memory linked chunk. + self.replace_event_at(replacement.location, replacement.event.clone()).await?; + + self.state.send_queue_event_ids.remove(&replacement.event_id); + + replaced_events.push(replacement.event); + } + + Ok(replaced_events) + } + async fn propagate_changes(&mut self) -> Result<(), EventCacheError> { let updates = self.state.room_linked_chunk.store_updates().take(); @@ -775,6 +805,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { async fn reset_internal(&mut self) -> Result<(), EventCacheError> { self.state.room_linked_chunk.reset(); + self.state.send_queue_event_ids.clear(); // No need to update the thread summaries: the room events are // gone because of the reset of `room_linked_chunk`. @@ -816,14 +847,16 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { let DeduplicationOutcome { all_events: events, - in_memory_duplicated_event_ids, - in_store_duplicated_event_ids, + mut in_memory_duplicated_event_ids, + mut in_store_duplicated_event_ids, + in_place_replacements, non_empty_all_duplicates: all_duplicates, } = filter_duplicate_events( &self.state.own_user_id, &self.store, LinkedChunkId::Room(&self.state.room_id), &self.state.room_linked_chunk, + &self.state.send_queue_event_ids, timeline.events, ) .await?; @@ -848,7 +881,24 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { let has_new_gap = prev_batch.is_some(); - if has_new_gap { + let (replacement_event_ids, replaced_events) = if has_new_gap { + // If we have a gap, we can't just replace events! The new timeline contains a + // gap *then* the events; if we only replaced the events, then the + // replaced events would be *before* the gap, and end up in the + // wrong order. + // + // Revert the in-place replacements to a removal of the duplicated events. + for re in in_place_replacements { + match re.location { + EventLocation::Memory(position) => { + in_memory_duplicated_event_ids.push((re.event_id, position)); + } + EventLocation::Store => { + in_store_duplicated_event_ids.push((re.event_id, re.position)); + } + } + } + // Sad time: there's a gap, somewhere, in the timeline, and there's at least one // non-duplicated event. We don't know which threads might have gappy, so we // must invalidate them all :( @@ -880,19 +930,42 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { self.replace_event_at(location, target_event).await?; } } - } + + (Default::default(), Default::default()) + } else { + // We don't have a gap, so it's safe to apply the in-place replacements \o/ + let replacement_event_ids = in_place_replacements + .iter() + .map(|replacement| replacement.event_id.clone()) + .collect::>(); + + // This applies the replacements in memory and in storage, and flush updates to + // storage. + let replaced_events = self.apply_in_place_replacements(in_place_replacements).await?; + + (replacement_event_ids, replaced_events) + }; if all_duplicates { // No new events and no gap (per the previous check), thus no need to change the - // room state. We're done! + // room state. We're done, except that we may still have replaced a local echo. + let new_receipt = extract_read_receipt(ephemeral_events); - // We might have a new read receipt, though! If that's the case, handle it for - // unread counts tracking. - if let Some(new_receipt) = extract_read_receipt(ephemeral_events) { - self.update_read_receipts(Some(&new_receipt)).await?; + if replaced_events.is_empty() { + if let Some(new_receipt) = new_receipt { + self.update_read_receipts(Some(&new_receipt)).await?; + } + return Ok((false, Vec::new())); } - return Ok((false, Vec::new())); + // If we've replaced at least one local echo, we still need to post-process it + // properly. + self.post_process_new_events(replaced_events, PostProcessingOrigin::Sync, new_receipt) + .await?; + + let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs(); + + return Ok((false, timeline_event_diffs)); } // If we've never waited for an initial previous-batch token, and we've now @@ -907,9 +980,19 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { // events, because we are pushing all _new_ `events` at the back. self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids).await?; - self.state - .room_linked_chunk - .push_live_events(prev_batch.map(|prev_token| Gap { token: prev_token }), &events); + // Only push live events that haven't replaced earlier. + let events_to_push = events + .iter() + .filter(|event| { + event.event_id().is_none_or(|event_id| !replacement_event_ids.contains(&event_id)) + }) + .cloned() + .collect::>(); + + self.state.room_linked_chunk.push_live_events( + prev_batch.map(|prev_token| Gap { token: prev_token }), + &events_to_push, + ); // Extract a new read receipt, if available. let new_receipt = extract_read_receipt(ephemeral_events); diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs index c4af72ac1..109463a46 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs @@ -322,6 +322,7 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> { all_events: new_events, in_memory_duplicated_event_ids, in_store_duplicated_event_ids, + in_place_replacements: Vec::new(), non_empty_all_duplicates, } } diff --git a/crates/matrix-sdk/src/event_cache/deduplicator.rs b/crates/matrix-sdk/src/event_cache/deduplicator.rs index 56e073459..9085431f1 100644 --- a/crates/matrix-sdk/src/event_cache/deduplicator.rs +++ b/crates/matrix-sdk/src/event_cache/deduplicator.rs @@ -15,7 +15,7 @@ //! Simple but efficient types to find duplicated events. See [`Deduplicator`] //! to learn more. -use std::collections::BTreeSet; +use std::collections::{BTreeSet, HashSet}; use matrix_sdk_base::{ event_cache::store::EventCacheStoreLockGuard, @@ -25,7 +25,10 @@ use ruma::{OwnedEventId, UserId}; use super::{ EventCacheError, - caches::event_linked_chunk::{Event, EventLinkedChunk}, + caches::{ + EventLocation, + event_linked_chunk::{Event, EventLinkedChunk}, + }, }; /// Find duplicates in the given collection of new events, and return relevant @@ -36,6 +39,7 @@ pub async fn filter_duplicate_events( store_guard: &EventCacheStoreLockGuard, linked_chunk_id: LinkedChunkId<'_>, linked_chunk: &EventLinkedChunk, + replaceable_event_ids: &HashSet, mut new_events: Vec, ) -> Result { // Remove all events with no ID, or that are duplicated among the new events, @@ -61,23 +65,46 @@ pub async fn filter_duplicate_events( // Separate duplicated events in two collections: ones that are in-memory, ones // that are in the store. - let (in_memory_duplicated_event_ids, in_store_duplicated_event_ids) = { + let (in_memory_duplicated_event_ids, in_store_duplicated_event_ids, in_place_replacements) = { // Collect all in-memory chunk identifiers. let in_memory_chunk_identifiers = linked_chunk.chunks().map(|chunk| chunk.identifier()).collect::>(); let mut in_memory = vec![]; let mut in_store = vec![]; + let mut in_place_replacements = vec![]; for (duplicated_event_id, position) in duplicated_event_ids { - if in_memory_chunk_identifiers.contains(&position.chunk_identifier()) { + if replaceable_event_ids.contains(&duplicated_event_id) { + // The event is replaceable: find the new content of the event in `new_events`. + let Some(event) = new_events + .iter() + .find(|ev| ev.event_id().as_ref() == Some(&duplicated_event_id)) + else { + continue; + }; + + let location = if in_memory_chunk_identifiers.contains(&position.chunk_identifier()) + { + EventLocation::Memory(position) + } else { + EventLocation::Store + }; + + in_place_replacements.push(ReplaceableDuplicate { + event_id: duplicated_event_id, + position, + location, + event: event.clone(), + }); + } else if in_memory_chunk_identifiers.contains(&position.chunk_identifier()) { in_memory.push((duplicated_event_id, position)); } else { in_store.push((duplicated_event_id, position)); } } - (in_memory, in_store) + (in_memory, in_store, in_place_replacements) }; // See comment of `DeduplicationOutcome::non_empty_all_duplicates` for the @@ -86,7 +113,8 @@ pub async fn filter_duplicate_events( new_events.iter().any(|ev| ev.sender().is_some_and(|sender| sender != own_user_id)); let all_duplicates = (in_memory_duplicated_event_ids.len() - + in_store_duplicated_event_ids.len()) + + in_store_duplicated_event_ids.len() + + in_place_replacements.len()) == new_events.len(); let non_empty_all_duplicates = at_least_one_event_not_sent_by_me && all_duplicates; @@ -95,6 +123,7 @@ pub async fn filter_duplicate_events( all_events: new_events, in_memory_duplicated_event_ids, in_store_duplicated_event_ids, + in_place_replacements, non_empty_all_duplicates, }) } @@ -122,6 +151,10 @@ pub(super) struct DeduplicationOutcome { /// (position is descending). pub in_store_duplicated_event_ids: Vec<(OwnedEventId, Position)>, + /// Duplicates that can be replaced in place instead of being removed and + /// reinserted. + pub in_place_replacements: Vec, + /// Whether there's at least one new event sent by some other user, and all /// new events are duplicate. /// @@ -151,11 +184,27 @@ pub(super) struct DeduplicationOutcome { pub non_empty_all_duplicates: bool, } +/// A duplicate event that can be replaced in place, instead of being removed +/// and reinserted. +pub(super) struct ReplaceableDuplicate { + /// The ID of the duplicated event. + pub event_id: OwnedEventId, + /// The previous position of the event in the linked chunk. + pub position: Position, + /// The location of the duplicated event in the linked chunk (memory or + /// store). + pub location: EventLocation, + /// The new version of the duplicated event that can be used as a + /// replacement. + pub event: Event, +} + #[cfg(test)] #[cfg(not(target_family = "wasm"))] // These tests uses the cross-process lock, so need time support. mod tests { - use std::ops::Not as _; + use std::{collections::HashSet, ops::Not as _}; + use assert_matches2::assert_let; use matrix_sdk_base::{ deserialized_responses::TimelineEvent, event_cache::store::EventCacheStoreLock, linked_chunk::ChunkIdentifier, @@ -254,6 +303,7 @@ mod tests { event_cache_store_guard, LinkedChunkId::Room(room_id), &linked_chunk, + &HashSet::new(), vec![event_0.clone(), event_1.clone(), event_2.clone(), event_3.clone()], ) .await @@ -270,6 +320,7 @@ mod tests { event_cache_store_guard, LinkedChunkId::Room(room_id), &linked_chunk, + &HashSet::new(), vec![event_0, event_1, event_2, event_3, event_4], ) .await @@ -311,6 +362,7 @@ mod tests { outcome.in_store_duplicated_event_ids[1], (event_id_1, Position::new(ChunkIdentifier::new(42), 1)) ); + assert!(outcome.in_place_replacements.is_empty()); } #[async_test] @@ -384,12 +436,14 @@ mod tests { all_events: events, in_memory_duplicated_event_ids, in_store_duplicated_event_ids, + in_place_replacements, non_empty_all_duplicates, } = filter_duplicate_events( user_id, event_cache_store_guard, LinkedChunkId::Room(room_id), &linked_chunk, + &HashSet::new(), vec![ev1, ev2, ev3, ev4], ) .await @@ -403,6 +457,7 @@ mod tests { assert_eq!(events[2].event_id().as_deref(), Some(eid3)); assert!(in_memory_duplicated_event_ids.is_empty()); + assert!(in_place_replacements.is_empty()); assert_eq!(in_store_duplicated_event_ids.len(), 2); assert_eq!( @@ -414,4 +469,83 @@ mod tests { (eid2.to_owned(), Position::new(ChunkIdentifier::new(43), 0)) ); } + + #[async_test] + async fn test_in_place_replaceable_duplicates() { + use std::sync::Arc; + + use matrix_sdk_base::{ + event_cache::store::{EventCacheStore, MemoryStore}, + linked_chunk::Update, + }; + use ruma::room_id; + + let user_id = user_id!("@user:example.com"); + let room_id = room_id!("!fondue:raclette.ch"); + + // Simulate an event that's been saved in the send queue already, and that could + // be replaced later. + let event_id = owned_event_id!("$ev0"); + let event = timeline_event(&event_id); + + let event_cache_store = Arc::new(MemoryStore::new()); + event_cache_store + .handle_linked_chunk_updates( + LinkedChunkId::Room(room_id), + vec![ + Update::NewItemsChunk { + previous: None, + new: ChunkIdentifier::new(0), + next: None, + }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(0), 0), + items: vec![event.clone()], + }, + ], + ) + .await + .unwrap(); + + let event_cache_store = EventCacheStoreLock::new( + event_cache_store, + CrossProcessLockConfig::multi_process("hodor"), + ); + let event_cache_store = event_cache_store.lock().await.unwrap(); + let event_cache_store_guard = event_cache_store.as_clean().unwrap(); + + let mut linked_chunk = EventLinkedChunk::new(); + linked_chunk.push_events([event.clone()]); + + // Indicate that the event is replaceable, for example because it's been + // inserted by the send queue earlier. + let replaceable_event_ids = HashSet::from([event_id.clone()]); + + let outcome = filter_duplicate_events( + user_id, + event_cache_store_guard, + LinkedChunkId::Room(room_id), + &linked_chunk, + &replaceable_event_ids, + vec![event], + ) + .await + .unwrap(); + + assert!(outcome.in_memory_duplicated_event_ids.is_empty()); + assert!(outcome.in_store_duplicated_event_ids.is_empty()); + + // The event can be replaced in-place. + assert_eq!(outcome.in_place_replacements.len(), 1); + + let replacement = &outcome.in_place_replacements[0]; + assert_eq!(replacement.event_id, event_id); + + assert_let!(EventLocation::Memory(pos) = &replacement.location); + assert_eq!(*pos, Position::new(ChunkIdentifier::new(0), 0)); + + assert_eq!(replacement.position, Position::new(ChunkIdentifier::new(0), 0)); + + assert_eq!(replacement.event.event_id().unwrap(), event_id); + } } diff --git a/crates/matrix-sdk/tests/integration/event_cache/mod.rs b/crates/matrix-sdk/tests/integration/event_cache/mod.rs index 7dee4fc95..ee88286b5 100644 --- a/crates/matrix-sdk/tests/integration/event_cache/mod.rs +++ b/crates/matrix-sdk/tests/integration/event_cache/mod.rs @@ -3046,3 +3046,108 @@ async fn test_backpaginate_on_a_single_event_inserted_via_send_queue_from_an_emp // assume so. assert!(reached_start.not()); } + +#[async_test] +async fn test_sync_remote_echo_of_send_queue_event_replaces_in_place() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + let event_cache = client.event_cache(); + event_cache.subscribe().unwrap(); + + let room_id = room_id!("!omelette:fromage.fr"); + let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c")); + let own_user_id = client.user_id().unwrap().to_owned(); + + let room = server + .sync_room( + &client, + JoinedRoomBuilder::new(room_id) + .add_timeline_event(f.text_msg("hello").event_id(event_id!("$1"))), + ) + .await; + + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); + assert_eq!(events.len(), 1); + + let event_id_2 = event_id!("$2"); + let event_id_3 = event_id!("$3"); + server.mock_room_state_encryption().plain().mount().await; + server.mock_room_send().ok(event_id_2).mock_once().mount().await; + + // We send an event with the send queue. + room.send_queue() + .send(RoomMessageEventContent::text_plain("Hello, World!").into()) + .await + .unwrap(); + + // It's inserted in the room's event cache. + assert_matches!( + room_stream.recv().await, + Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) => { + assert_eq!(diffs.len(), 1); + assert_let!(VectorDiff::Append { values } = &diffs[0]); + assert_eq!(values.len(), 1); + assert_eq!(values[0].event_id().as_deref(), Some(event_id_2)); + } + ); + + // Then we send a second message in this room. + server.mock_room_send().ok(event_id_3).mock_once().mount().await; + room.send_queue() + .send(RoomMessageEventContent::text_plain("Bonjour, Monde!").into()) + .await + .unwrap(); + + // It's saved in the room's event cache too. + assert_matches!( + room_stream.recv().await, + Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) => { + assert_eq!(diffs.len(), 1); + assert_let!(VectorDiff::Append { values } = &diffs[0]); + assert_eq!(values.len(), 1); + assert_eq!(values[0].event_id().as_deref(), Some(event_id_3)); + } + ); + + let f = EventFactory::new().room(room_id).sender(&own_user_id); + + // Then sync comes back with the first event. + server + .sync_room( + &client, + JoinedRoomBuilder::new(room_id) + .add_timeline_event(f.text_msg("Hello, World!").event_id(event_id_2)), + ) + .await; + + // The event is replaced in place, not removed and appended again. + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) = + room_stream.recv() + ); + assert_eq!(diffs.len(), 1); + assert_let!(VectorDiff::Set { index: 1, value: event } = &diffs[0]); + assert_eq!(event.event_id().as_deref(), Some(event_id_2)); + + // Then a second sync comes back with the second event. + server + .sync_room( + &client, + JoinedRoomBuilder::new(room_id) + .add_timeline_event(f.text_msg("Bonjour, Monde!").event_id(event_id_3)), + ) + .await; + + // And the next event is replaced in place too. + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) = + room_stream.recv() + ); + assert_eq!(diffs.len(), 1); + assert_let!(VectorDiff::Set { index: 2, value: event } = &diffs[0]); + assert_eq!(event.event_id().as_deref(), Some(event_id_3)); + + // That's all, folks! +}