From bcb9e4471ad057ded8aedb10d83d69779ee3d218 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 28 Apr 2026 18:49:58 +0200 Subject: [PATCH] chore(sdk): Split `maybe_apply_new_redaction` between `RoomEventCache` and `ThreadEventCache`. This patch handles `m.room.redaction` for thread caches. This is a naive approach for now to have small commits. --- .../matrix-sdk/src/event_cache/caches/mod.rs | 8 +- .../src/event_cache/caches/room/state.rs | 51 +++------- .../src/event_cache/caches/thread/mod.rs | 34 +------ .../src/event_cache/caches/thread/state.rs | 93 ++++++++++++++++--- 4 files changed, 100 insertions(+), 86 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/mod.rs b/crates/matrix-sdk/src/event_cache/caches/mod.rs index 962529281..ac4292449 100644 --- a/crates/matrix-sdk/src/event_cache/caches/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/mod.rs @@ -22,7 +22,7 @@ use matrix_sdk_base::{ linked_chunk::Position, sync::{JoinedRoomUpdate, LeftRoomUpdate}, }; -use ruma::{OwnedEventId, OwnedRoomId, RoomId}; +use ruma::{OwnedEventId, OwnedRoomId, RoomId, room_version_rules::RoomVersionRules}; use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock, broadcast::Sender, mpsc}; use super::{EventCacheError, EventsOrigin, Result, automatic_pagination::AutomaticPagination}; @@ -50,6 +50,7 @@ pub(super) struct Caches { struct CachesInternals { store: EventCacheStoreLock, linked_chunk_update_sender: Sender, + room_version_rules: RoomVersionRules, } impl Caches { @@ -90,7 +91,7 @@ impl Caches { own_user_id.clone(), room_id.to_owned(), weak_room.clone(), - room_version_rules, + room_version_rules.clone(), enabled_thread_support, update_sender.clone(), linked_chunk_update_sender.clone(), @@ -123,7 +124,7 @@ impl Caches { Ok(Self { room: room_event_cache, threads: Arc::new(RwLock::new(HashMap::new())), - internals: CachesInternals { store, linked_chunk_update_sender }, + internals: CachesInternals { store, linked_chunk_update_sender, room_version_rules }, }) } @@ -164,6 +165,7 @@ impl Caches { room.room_id().to_owned(), thread_id.clone(), room.own_user_id().to_owned(), + self.internals.room_version_rules.clone(), room.weak_room().to_owned(), self.internals.store.clone(), room.update_sender().generic_update_sender().clone(), diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index 61c89721d..db06d09b0 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -975,35 +975,30 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { return Ok(()); }; - let Some(event_id) = redaction.redacts(&self.state.room_version_rules.redaction) else { + let Some(target_event_id) = redaction.redacts(&self.room_version_rules.redaction) else { warn!("missing target event id from the redaction event"); return Ok(()); }; // Replace the redacted event by a redacted form, if we knew about it. - let Some((location, mut target_event)) = self.find_event(event_id).await? else { + let Some((location, mut target_event)) = self.find_event(target_event_id).await? else { trace!("redacted event is missing from the linked chunk"); return Ok(()); }; - // Don't redact already redacted events. - let thread_root = if let Ok(deserialized) = target_event.raw().deserialize() { - if deserialized.is_redacted() { - return Ok(()); - } + let target_event_raw = target_event.raw(); - // If the event is part of a thread, update the thread linked chunk and the - // summary. - extract_thread_root(target_event.raw()) - } else { - warn!("failed to deserialize the event to redact"); - None - }; + // Don't redact already redacted events. + if let Ok(deserialized) = target_event_raw.deserialize() + && deserialized.is_redacted() + { + return Ok(()); + } if let Some(redacted_event) = apply_redaction( - target_event.raw(), + target_event_raw, event.raw().cast_ref_unchecked::(), - &self.state.room_version_rules.redaction, + &self.room_version_rules.redaction, ) { // It's safe to cast `redacted_event` here: // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent` @@ -1012,30 +1007,6 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { target_event.replace_raw(redacted_event.cast_unchecked()); self.replace_event_at(location, target_event.clone()).await?; - - // If the redacted event was part of a thread, remove it in the thread linked - // chunk too, and make sure to update the thread root's summary - // as well. - // - // Note: there is an ordering issue here: the above `replace_event_at` must - // happen BEFORE we recompute the summary, otherwise the set of - // replies may include the to-be-redacted event. - if let Some(thread_root) = thread_root - && let Some(thread_cache) = self.state.threads.get_mut(&thread_root) - { - thread_cache.replace_event_if_present(event_id, target_event).await?; - - // The number of replies may have changed, so update the thread summary if - // needs be. - let latest_event_id = thread_cache.latest_event_id().await?; - - self.maybe_update_thread_summary( - thread_root, - latest_event_id, - post_processing_origin, - ) - .await?; - } } Ok(()) diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs index 444fbcb70..591757410 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs @@ -84,6 +84,7 @@ impl ThreadEventCache { room_id: OwnedRoomId, thread_id: OwnedEventId, own_user_id: OwnedUserId, + room_version_rules: RoomVersionRules, weak_room: WeakRoom, store: EventCacheStoreLock, generic_update_sender: Sender, @@ -95,6 +96,7 @@ impl ThreadEventCache { room_id.clone(), thread_id.clone(), own_user_id, + room_version_rules, store, update_sender.clone(), linked_chunk_update_sender, @@ -209,38 +211,6 @@ impl ThreadEventCache { Ok(()) } - /// Replaces a single event, be it saved in memory or in the store. - /// - /// If it was saved in memory, this will emit a notification to - /// observers that a single item has been replaced. Otherwise, - /// such a notification is not emitted, because observers are - /// unlikely to observe the store updates directly. - pub(super) async fn replace_event_if_present( - &mut self, - event_id: &EventId, - new_event: Event, - ) -> Result<()> { - let mut state = self.inner.state.write().await?; - - if let Err(err) = state.replace_event_if_present(event_id, new_event).await { - error!(%err, "failed to replace an event"); - return Err(err); - } - - let timeline_event_diffs = state.thread_linked_chunk_mut().updates_as_vector_diffs(); - - if !timeline_event_diffs.is_empty() { - self.inner.update_sender.send( - TimelineVectorDiffs { diffs: timeline_event_diffs, origin: EventsOrigin::Sync }, - // This function is part of the `RoomEventCache` flow. The generic update is - // handled by it. - None, - ); - } - - Ok(()) - } - /// Force to reload the thread. // // TODO(@hywan): Temporary fix. All the states must be in a single struct behind diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs index 45096e646..0a9dc1a92 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs @@ -14,7 +14,7 @@ use eyeball_im::VectorDiff; use matrix_sdk_base::{ - check_validity_of_replacement_events, + apply_redaction, check_validity_of_replacement_events, deserialized_responses::{ThreadSummary, ThreadSummaryStatus}, event_cache::{ Event, Gap, @@ -27,9 +27,16 @@ use matrix_sdk_base::{ sync::Timeline, }; use matrix_sdk_common::executor::spawn; -use ruma::{EventId, OwnedEventId, OwnedRoomId, OwnedUserId, events::relation::RelationType}; +use ruma::{ + EventId, OwnedEventId, OwnedRoomId, OwnedUserId, + events::{ + AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType, + relation::RelationType, room::redaction::SyncRoomRedactionEvent, + }, + room_version_rules::RoomVersionRules, +}; use tokio::sync::broadcast::Sender; -use tracing::{debug, error, instrument, trace}; +use tracing::{debug, error, instrument, trace, warn}; use super::{ super::{ @@ -60,6 +67,9 @@ pub struct ThreadEventCacheState { /// The user's own user id. pub own_user_id: OwnedUserId, + /// The rules for the version of this room. + room_version_rules: RoomVersionRules, + /// Reference to the underlying backing store. store: EventCacheStoreLock, @@ -191,6 +201,7 @@ impl LockedThreadEventCacheState { room_id: OwnedRoomId, thread_id: OwnedEventId, own_user_id: OwnedUserId, + room_version_rules: RoomVersionRules, store: EventCacheStoreLock, update_sender: ThreadEventCacheUpdateSender, linked_chunk_update_sender: Sender, @@ -255,6 +266,7 @@ impl LockedThreadEventCacheState { room_id, thread_id, own_user_id, + room_version_rules, store, thread_linked_chunk: EventLinkedChunk::with_initial_linked_chunk( linked_chunk, @@ -490,8 +502,11 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> { self.state.shrink_to_last_chunk(&self.store).await?; } - // Save `bundled_latest_thread_event`. + // Do stuff for each event. for event in events { + // Handle redaction. + self.maybe_apply_new_redaction(&event).await?; + // Save a bundled thread event, if there was one. if let Some(bundled_thread) = event.bundled_latest_thread_event { self.save_events([*bundled_thread]).await?; @@ -503,6 +518,67 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> { Ok((has_new_gap, timeline_event_diffs)) } + /// If the given event is a redaction, try to retrieve the + /// to-be-redacted event in the chunk, and replace it by the + /// redacted form. + #[instrument(skip_all)] + async fn maybe_apply_new_redaction(&mut self, event: &Event) -> Result<()> { + let raw_event = event.raw(); + + // Do not deserialise the entire event if we aren't certain it's a + // `m.room.redaction`. It saves a non-negligible amount of computations. + let Ok(Some(MessageLikeEventType::RoomRedaction)) = + raw_event.get_field::("type") + else { + return Ok(()); + }; + + // It is a `m.room.redaction`! We can deserialize it entirely. + + let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction( + redaction, + ))) = raw_event.deserialize() + else { + return Ok(()); + }; + + let Some(event_id) = redaction.redacts(&self.room_version_rules.redaction) else { + warn!("missing target event id from the redaction event"); + return Ok(()); + }; + + // Replace the redacted event by a redacted form, if we knew about it. + let Some((location, mut target_event)) = self.find_event(event_id).await? else { + trace!("redacted event is missing from the linked chunk"); + return Ok(()); + }; + + let target_event_raw = target_event.raw(); + + // Don't redact already redacted events. + if let Ok(deserialized) = target_event_raw.deserialize() + && deserialized.is_redacted() + { + return Ok(()); + }; + + if let Some(redacted_event) = apply_redaction( + target_event_raw, + event.raw().cast_ref_unchecked::(), + &self.room_version_rules.redaction, + ) { + // It's safe to cast `redacted_event` here: + // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent` + // when calling .raw(), so it's still one under the hood. + // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case. + target_event.replace_raw(redacted_event.cast_unchecked()); + + self.replace_event_at(location, target_event.clone()).await?; + } + + Ok(()) + } + /// See documentation of [`find_event`]. pub(super) async fn find_event( &self, @@ -517,16 +593,11 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> { /// observers that a single item has been replaced. Otherwise, /// such a notification is not emitted, because observers are /// unlikely to observe the store updates directly. - pub async fn replace_event_if_present( + pub async fn replace_event_at( &mut self, - event_id: &EventId, + location: EventLocation, new_event: Event, ) -> Result<()> { - let Some((location, _event)) = self.find_event(event_id).await? else { - trace!("redacted event is missing from the thread linked chunk"); - return Ok(()); - }; - match location { EventLocation::Memory(position) => { self.state