From bf6ff6dd6bf6c599907b69ec9d3d438dcb74e793 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 24 Mar 2026 18:32:06 +0100 Subject: [PATCH] feat(sdk): A redacted event in a thread is no longer removed: it's updated. This patch fixes an invalid behaviour in a thread: when an in-thread event was redacted, it was removed from the thread. This is inconsistent regarding in-room event redaction where a redacted event is updated to its redacted form and the redaction event is added to the timeline. This patch makes the behaviour consistent by updating the redacted event. Basically, it replaces `remove_if_present` (deleted) by `replace_event_if_present` (new). --- .../src/event_cache/caches/room/state.rs | 4 +- .../src/event_cache/caches/thread/mod.rs | 25 +++--- .../src/event_cache/caches/thread/state.rs | 77 ++++++++++++++++++- .../tests/integration/event_cache/threads.rs | 39 +++++++--- 4 files changed, 116 insertions(+), 29 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index e692a0415..18ba73395 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -1288,7 +1288,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case. target_event.replace_raw(redacted_event.cast_unchecked()); - self.replace_event_at(location, target_event).await?; + self.replace_event_at(location, target_event.clone()).await?; // If the redacted event was part of a thread, remove it in the thread linked // chunk too, and make sure to update the thread root's summary @@ -1300,7 +1300,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { if let Some(thread_root) = thread_root && let Some(thread_cache) = self.state.threads.get_mut(&thread_root) { - thread_cache.remove_if_present(event_id).await?; + thread_cache.replace_event_if_present(event_id, target_event).await?; // The number of replies may have changed, so update the thread summary if // needs be. diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs index 0e1400760..a2e7450f7 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs @@ -159,22 +159,21 @@ impl ThreadEventCache { Ok(()) } - /// Remove an event from an thread event linked chunk, if it exists. + /// Replaces a single event, be it saved in memory or in the store. /// - /// If the event has been found and removed, then an update will be - /// propagated to observers. - pub(super) async fn remove_if_present(&mut self, event_id: &EventId) -> Result<()> { + /// If it was saved in memory, this will emit a notification to + /// observers that a single item has been replaced. Otherwise, + /// such a notification is not emitted, because observers are + /// unlikely to observe the store updates directly. + pub(super) async fn replace_event_if_present( + &mut self, + event_id: &EventId, + new_event: Event, + ) -> Result<()> { let mut state = self.inner.state.write().await?; - let Some(position) = state.thread_linked_chunk().events().find_map(|(position, event)| { - (event.event_id().as_deref() == Some(event_id)).then_some(position) - }) else { - // Event not found in the linked chunk, nothing to do. - return Ok(()); - }; - - if let Err(err) = state.remove_events(vec![(event_id.to_owned(), position)], vec![]).await { - error!(%err, "a thread linked chunk position was valid a few lines above, but invalid when deleting"); + if let Err(err) = state.replace_event_if_present(event_id, new_event).await { + error!(%err, "failed to replace an event"); return Err(err); } diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs index 20d84d6f3..c1d076f21 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs @@ -22,9 +22,10 @@ use matrix_sdk_base::{ ChunkIdentifierGenerator, LinkedChunkId, OwnedLinkedChunkId, Position, Update, lazy_loader, }, }; -use ruma::{OwnedEventId, OwnedRoomId, OwnedUserId}; +use matrix_sdk_common::executor::spawn; +use ruma::{EventId, OwnedEventId, OwnedRoomId, OwnedUserId}; use tokio::sync::broadcast::Sender; -use tracing::{debug, error, instrument}; +use tracing::{debug, error, instrument, trace}; use super::{ super::{ @@ -33,7 +34,7 @@ use super::{ deduplicator::{DeduplicationOutcome, filter_duplicate_events}, persistence::{load_linked_chunk_metadata, send_updates_to_store}, }, - TimelineVectorDiffs, + EventLocation, TimelineVectorDiffs, event_linked_chunk::{EventLinkedChunk, sort_positions_descending}, lock, room::RoomEventCacheLinkedChunkUpdate, @@ -368,6 +369,76 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> { Ok(()) } + async fn find_event(&self, event_id: &EventId) -> Result> { + // There are supposedly fewer events loaded in memory than in the store. Let's + // start by looking up in the `EventLinkedChunk`. + for (position, event) in self.thread_linked_chunk.revents() { + if event.event_id().as_deref() == Some(event_id) { + return Ok(Some((EventLocation::Memory(position), event.clone()))); + } + } + + Ok(self + .store + .find_event(&self.room_id, event_id) + .await? + .map(|event| (EventLocation::Store, event))) + } + + /// Replaces a single event, be it saved in memory or in the store. + /// + /// If it was saved in memory, this will emit a notification to + /// observers that a single item has been replaced. Otherwise, + /// such a notification is not emitted, because observers are + /// unlikely to observe the store updates directly. + pub async fn replace_event_if_present( + &mut self, + event_id: &EventId, + new_event: Event, + ) -> Result<()> { + let Some((location, _event)) = self.find_event(event_id).await? else { + trace!("redacted event is missing from the thread linked chunk"); + return Ok(()); + }; + + match location { + EventLocation::Memory(position) => { + self.state + .thread_linked_chunk + .replace_event_at(position, new_event) + .expect("should have been a valid position of an item"); + // We just changed the in-memory representation; synchronize this with + // the store. + self.propagate_changes().await?; + } + EventLocation::Store => { + self.save_events([new_event]).await?; + } + } + + Ok(()) + } + + /// Save events into the database, without notifying observers. + pub async fn save_events(&mut self, events: impl IntoIterator) -> Result<()> { + let store = self.store.clone(); + let room_id = self.state.room_id.clone(); + let events = events.into_iter().collect::>(); + + // Spawn a task so the save is uninterrupted by task cancellation. + spawn(async move { + for event in events { + store.save_event(&room_id, event).await?; + } + + Result::Ok(()) + }) + .await + .expect("joining failed")?; + + Ok(()) + } + /// Remove events by their position, in `EventLinkedChunk`. /// /// This method is purposely isolated because it must ensure that diff --git a/crates/matrix-sdk/tests/integration/event_cache/threads.rs b/crates/matrix-sdk/tests/integration/event_cache/threads.rs index 6b85de184..998c77f43 100644 --- a/crates/matrix-sdk/tests/integration/event_cache/threads.rs +++ b/crates/matrix-sdk/tests/integration/event_cache/threads.rs @@ -711,8 +711,15 @@ async fn test_redact_touches_threads() { assert_eq!(summary.num_replies, 2); } - assert_eq!(room_events[1].event_id().as_ref(), Some(&thread_resp1)); - assert_eq!(room_events[2].event_id().as_ref(), Some(&thread_resp2)); + // Second event. + { + assert_eq!(room_events[1].event_id().as_ref(), Some(&thread_resp1)); + } + + // Third event. + { + assert_eq!(room_events[2].event_id().as_ref(), Some(&thread_resp2)); + } assert!(thread_stream.is_empty()); assert!(room_stream.is_empty()); @@ -727,17 +734,21 @@ async fn test_redact_touches_threads() { ) .await; - // The redaction affects the thread cache: it *removes* the redacted event. + // The redaction affects the thread cache: it updates the redacted event. { assert_let_timeout!(Ok(TimelineVectorDiffs { diffs, .. }) = thread_stream.recv()); assert_eq!(diffs.len(), 1); - assert_let!(VectorDiff::Remove { index: 1 } = &diffs[0]); + + assert_let!(VectorDiff::Set { index: 1, value: new_event } = &diffs[0]); + + let deserialized = new_event.raw().deserialize().unwrap(); + assert!(deserialized.is_redacted()); assert!(thread_stream.is_empty()); } // The redaction affects the room cache too: - // - the redaction event is pushed to the room history, + // - the redaction event is added to the “timeline”, // - the redaction's target is, well, redacted, // - the thread summary is updated correctly. { @@ -748,9 +759,11 @@ async fn test_redact_touches_threads() { assert_eq!(diffs.len(), 3); // The redaction event is appended to the room cache. - assert_let!(VectorDiff::Append { values: new_events } = &diffs[0]); - assert_eq!(new_events.len(), 1); - assert_eq!(new_events[0].event_id().as_deref(), Some(thread_resp1_redaction)); + { + assert_let!(VectorDiff::Append { values: new_events } = &diffs[0]); + assert_eq!(new_events.len(), 1); + assert_eq!(new_events[0].event_id().as_deref(), Some(thread_resp1_redaction)); + } // The room event is redacted. { @@ -779,17 +792,21 @@ async fn test_redact_touches_threads() { ) .await; - // The redaction affects the thread cache: it *removes* the redacted event. + // The redaction affects the thread cache: it updates the redacted event. { assert_let_timeout!(Ok(TimelineVectorDiffs { diffs, .. }) = thread_stream.recv()); assert_eq!(diffs.len(), 1); - assert_let!(VectorDiff::Remove { index: 1 } = &diffs[0]); + + assert_let!(VectorDiff::Set { index: 2, value: new_event } = &diffs[0]); + + let deserialized = new_event.raw().deserialize().unwrap(); + assert!(deserialized.is_redacted()); assert!(thread_stream.is_empty()); } // The redaction affects the room cache too: - // - the redaction event is pushed to the room history, + // - the redaction event is added to the “timeline”, // - the redaction's target is, well, redacted, // - the thread summary is removed from the thread root. {