diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index e692a0415..18ba73395 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -1288,7 +1288,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case. target_event.replace_raw(redacted_event.cast_unchecked()); - self.replace_event_at(location, target_event).await?; + self.replace_event_at(location, target_event.clone()).await?; // If the redacted event was part of a thread, remove it in the thread linked // chunk too, and make sure to update the thread root's summary @@ -1300,7 +1300,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { if let Some(thread_root) = thread_root && let Some(thread_cache) = self.state.threads.get_mut(&thread_root) { - thread_cache.remove_if_present(event_id).await?; + thread_cache.replace_event_if_present(event_id, target_event).await?; // The number of replies may have changed, so update the thread summary if // needs be. diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs index 0e1400760..a2e7450f7 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs @@ -159,22 +159,21 @@ impl ThreadEventCache { Ok(()) } - /// Remove an event from an thread event linked chunk, if it exists. + /// Replaces a single event, be it saved in memory or in the store. /// - /// If the event has been found and removed, then an update will be - /// propagated to observers. - pub(super) async fn remove_if_present(&mut self, event_id: &EventId) -> Result<()> { + /// If it was saved in memory, this will emit a notification to + /// observers that a single item has been replaced. Otherwise, + /// such a notification is not emitted, because observers are + /// unlikely to observe the store updates directly. + pub(super) async fn replace_event_if_present( + &mut self, + event_id: &EventId, + new_event: Event, + ) -> Result<()> { let mut state = self.inner.state.write().await?; - let Some(position) = state.thread_linked_chunk().events().find_map(|(position, event)| { - (event.event_id().as_deref() == Some(event_id)).then_some(position) - }) else { - // Event not found in the linked chunk, nothing to do. - return Ok(()); - }; - - if let Err(err) = state.remove_events(vec![(event_id.to_owned(), position)], vec![]).await { - error!(%err, "a thread linked chunk position was valid a few lines above, but invalid when deleting"); + if let Err(err) = state.replace_event_if_present(event_id, new_event).await { + error!(%err, "failed to replace an event"); return Err(err); } diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs index 20d84d6f3..c1d076f21 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs @@ -22,9 +22,10 @@ use matrix_sdk_base::{ ChunkIdentifierGenerator, LinkedChunkId, OwnedLinkedChunkId, Position, Update, lazy_loader, }, }; -use ruma::{OwnedEventId, OwnedRoomId, OwnedUserId}; +use matrix_sdk_common::executor::spawn; +use ruma::{EventId, OwnedEventId, OwnedRoomId, OwnedUserId}; use tokio::sync::broadcast::Sender; -use tracing::{debug, error, instrument}; +use tracing::{debug, error, instrument, trace}; use super::{ super::{ @@ -33,7 +34,7 @@ use super::{ deduplicator::{DeduplicationOutcome, filter_duplicate_events}, persistence::{load_linked_chunk_metadata, send_updates_to_store}, }, - TimelineVectorDiffs, + EventLocation, TimelineVectorDiffs, event_linked_chunk::{EventLinkedChunk, sort_positions_descending}, lock, room::RoomEventCacheLinkedChunkUpdate, @@ -368,6 +369,76 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> { Ok(()) } + async fn find_event(&self, event_id: &EventId) -> Result> { + // There are supposedly fewer events loaded in memory than in the store. Let's + // start by looking up in the `EventLinkedChunk`. + for (position, event) in self.thread_linked_chunk.revents() { + if event.event_id().as_deref() == Some(event_id) { + return Ok(Some((EventLocation::Memory(position), event.clone()))); + } + } + + Ok(self + .store + .find_event(&self.room_id, event_id) + .await? + .map(|event| (EventLocation::Store, event))) + } + + /// Replaces a single event, be it saved in memory or in the store. + /// + /// If it was saved in memory, this will emit a notification to + /// observers that a single item has been replaced. Otherwise, + /// such a notification is not emitted, because observers are + /// unlikely to observe the store updates directly. + pub async fn replace_event_if_present( + &mut self, + event_id: &EventId, + new_event: Event, + ) -> Result<()> { + let Some((location, _event)) = self.find_event(event_id).await? else { + trace!("redacted event is missing from the thread linked chunk"); + return Ok(()); + }; + + match location { + EventLocation::Memory(position) => { + self.state + .thread_linked_chunk + .replace_event_at(position, new_event) + .expect("should have been a valid position of an item"); + // We just changed the in-memory representation; synchronize this with + // the store. + self.propagate_changes().await?; + } + EventLocation::Store => { + self.save_events([new_event]).await?; + } + } + + Ok(()) + } + + /// Save events into the database, without notifying observers. + pub async fn save_events(&mut self, events: impl IntoIterator) -> Result<()> { + let store = self.store.clone(); + let room_id = self.state.room_id.clone(); + let events = events.into_iter().collect::>(); + + // Spawn a task so the save is uninterrupted by task cancellation. + spawn(async move { + for event in events { + store.save_event(&room_id, event).await?; + } + + Result::Ok(()) + }) + .await + .expect("joining failed")?; + + Ok(()) + } + /// Remove events by their position, in `EventLinkedChunk`. /// /// This method is purposely isolated because it must ensure that diff --git a/crates/matrix-sdk/tests/integration/event_cache/threads.rs b/crates/matrix-sdk/tests/integration/event_cache/threads.rs index 6b85de184..998c77f43 100644 --- a/crates/matrix-sdk/tests/integration/event_cache/threads.rs +++ b/crates/matrix-sdk/tests/integration/event_cache/threads.rs @@ -711,8 +711,15 @@ async fn test_redact_touches_threads() { assert_eq!(summary.num_replies, 2); } - assert_eq!(room_events[1].event_id().as_ref(), Some(&thread_resp1)); - assert_eq!(room_events[2].event_id().as_ref(), Some(&thread_resp2)); + // Second event. + { + assert_eq!(room_events[1].event_id().as_ref(), Some(&thread_resp1)); + } + + // Third event. + { + assert_eq!(room_events[2].event_id().as_ref(), Some(&thread_resp2)); + } assert!(thread_stream.is_empty()); assert!(room_stream.is_empty()); @@ -727,17 +734,21 @@ async fn test_redact_touches_threads() { ) .await; - // The redaction affects the thread cache: it *removes* the redacted event. + // The redaction affects the thread cache: it updates the redacted event. { assert_let_timeout!(Ok(TimelineVectorDiffs { diffs, .. }) = thread_stream.recv()); assert_eq!(diffs.len(), 1); - assert_let!(VectorDiff::Remove { index: 1 } = &diffs[0]); + + assert_let!(VectorDiff::Set { index: 1, value: new_event } = &diffs[0]); + + let deserialized = new_event.raw().deserialize().unwrap(); + assert!(deserialized.is_redacted()); assert!(thread_stream.is_empty()); } // The redaction affects the room cache too: - // - the redaction event is pushed to the room history, + // - the redaction event is added to the “timeline”, // - the redaction's target is, well, redacted, // - the thread summary is updated correctly. { @@ -748,9 +759,11 @@ async fn test_redact_touches_threads() { assert_eq!(diffs.len(), 3); // The redaction event is appended to the room cache. - assert_let!(VectorDiff::Append { values: new_events } = &diffs[0]); - assert_eq!(new_events.len(), 1); - assert_eq!(new_events[0].event_id().as_deref(), Some(thread_resp1_redaction)); + { + assert_let!(VectorDiff::Append { values: new_events } = &diffs[0]); + assert_eq!(new_events.len(), 1); + assert_eq!(new_events[0].event_id().as_deref(), Some(thread_resp1_redaction)); + } // The room event is redacted. { @@ -779,17 +792,21 @@ async fn test_redact_touches_threads() { ) .await; - // The redaction affects the thread cache: it *removes* the redacted event. + // The redaction affects the thread cache: it updates the redacted event. { assert_let_timeout!(Ok(TimelineVectorDiffs { diffs, .. }) = thread_stream.recv()); assert_eq!(diffs.len(), 1); - assert_let!(VectorDiff::Remove { index: 1 } = &diffs[0]); + + assert_let!(VectorDiff::Set { index: 2, value: new_event } = &diffs[0]); + + let deserialized = new_event.raw().deserialize().unwrap(); + assert!(deserialized.is_redacted()); assert!(thread_stream.is_empty()); } // The redaction affects the room cache too: - // - the redaction event is pushed to the room history, + // - the redaction event is added to the “timeline”, // - the redaction's target is, well, redacted, // - the thread summary is removed from the thread root. {