chore(sdk): Split maybe_apply_new_redaction between RoomEventCache and ThreadEventCache.

This patch handles `m.room.redaction` for thread caches.

This is a naive approach for now to have small commits.
This commit is contained in:
Ivan Enderlin
2026-04-28 18:49:58 +02:00
parent f153d85dd0
commit bcb9e4471a
4 changed files with 100 additions and 86 deletions

View File

@@ -22,7 +22,7 @@ use matrix_sdk_base::{
linked_chunk::Position,
sync::{JoinedRoomUpdate, LeftRoomUpdate},
};
use ruma::{OwnedEventId, OwnedRoomId, RoomId};
use ruma::{OwnedEventId, OwnedRoomId, RoomId, room_version_rules::RoomVersionRules};
use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock, broadcast::Sender, mpsc};
use super::{EventCacheError, EventsOrigin, Result, automatic_pagination::AutomaticPagination};
@@ -50,6 +50,7 @@ pub(super) struct Caches {
struct CachesInternals {
store: EventCacheStoreLock,
linked_chunk_update_sender: Sender<room::RoomEventCacheLinkedChunkUpdate>,
room_version_rules: RoomVersionRules,
}
impl Caches {
@@ -90,7 +91,7 @@ impl Caches {
own_user_id.clone(),
room_id.to_owned(),
weak_room.clone(),
room_version_rules,
room_version_rules.clone(),
enabled_thread_support,
update_sender.clone(),
linked_chunk_update_sender.clone(),
@@ -123,7 +124,7 @@ impl Caches {
Ok(Self {
room: room_event_cache,
threads: Arc::new(RwLock::new(HashMap::new())),
internals: CachesInternals { store, linked_chunk_update_sender },
internals: CachesInternals { store, linked_chunk_update_sender, room_version_rules },
})
}
@@ -164,6 +165,7 @@ impl Caches {
room.room_id().to_owned(),
thread_id.clone(),
room.own_user_id().to_owned(),
self.internals.room_version_rules.clone(),
room.weak_room().to_owned(),
self.internals.store.clone(),
room.update_sender().generic_update_sender().clone(),

View File

@@ -975,35 +975,30 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
return Ok(());
};
let Some(event_id) = redaction.redacts(&self.state.room_version_rules.redaction) else {
let Some(target_event_id) = redaction.redacts(&self.room_version_rules.redaction) else {
warn!("missing target event id from the redaction event");
return Ok(());
};
// Replace the redacted event by a redacted form, if we knew about it.
let Some((location, mut target_event)) = self.find_event(event_id).await? else {
let Some((location, mut target_event)) = self.find_event(target_event_id).await? else {
trace!("redacted event is missing from the linked chunk");
return Ok(());
};
// Don't redact already redacted events.
let thread_root = if let Ok(deserialized) = target_event.raw().deserialize() {
if deserialized.is_redacted() {
return Ok(());
}
let target_event_raw = target_event.raw();
// If the event is part of a thread, update the thread linked chunk and the
// summary.
extract_thread_root(target_event.raw())
} else {
warn!("failed to deserialize the event to redact");
None
};
// Don't redact already redacted events.
if let Ok(deserialized) = target_event_raw.deserialize()
&& deserialized.is_redacted()
{
return Ok(());
}
if let Some(redacted_event) = apply_redaction(
target_event.raw(),
target_event_raw,
event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
&self.state.room_version_rules.redaction,
&self.room_version_rules.redaction,
) {
// It's safe to cast `redacted_event` here:
// - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent`
@@ -1012,30 +1007,6 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
target_event.replace_raw(redacted_event.cast_unchecked());
self.replace_event_at(location, target_event.clone()).await?;
// If the redacted event was part of a thread, remove it in the thread linked
// chunk too, and make sure to update the thread root's summary
// as well.
//
// Note: there is an ordering issue here: the above `replace_event_at` must
// happen BEFORE we recompute the summary, otherwise the set of
// replies may include the to-be-redacted event.
if let Some(thread_root) = thread_root
&& let Some(thread_cache) = self.state.threads.get_mut(&thread_root)
{
thread_cache.replace_event_if_present(event_id, target_event).await?;
// The number of replies may have changed, so update the thread summary if
// needs be.
let latest_event_id = thread_cache.latest_event_id().await?;
self.maybe_update_thread_summary(
thread_root,
latest_event_id,
post_processing_origin,
)
.await?;
}
}
Ok(())

View File

@@ -84,6 +84,7 @@ impl ThreadEventCache {
room_id: OwnedRoomId,
thread_id: OwnedEventId,
own_user_id: OwnedUserId,
room_version_rules: RoomVersionRules,
weak_room: WeakRoom,
store: EventCacheStoreLock,
generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
@@ -95,6 +96,7 @@ impl ThreadEventCache {
room_id.clone(),
thread_id.clone(),
own_user_id,
room_version_rules,
store,
update_sender.clone(),
linked_chunk_update_sender,
@@ -209,38 +211,6 @@ impl ThreadEventCache {
Ok(())
}
/// Replaces a single event, be it saved in memory or in the store.
///
/// If it was saved in memory, this will emit a notification to
/// observers that a single item has been replaced. Otherwise,
/// such a notification is not emitted, because observers are
/// unlikely to observe the store updates directly.
pub(super) async fn replace_event_if_present(
&mut self,
event_id: &EventId,
new_event: Event,
) -> Result<()> {
let mut state = self.inner.state.write().await?;
if let Err(err) = state.replace_event_if_present(event_id, new_event).await {
error!(%err, "failed to replace an event");
return Err(err);
}
let timeline_event_diffs = state.thread_linked_chunk_mut().updates_as_vector_diffs();
if !timeline_event_diffs.is_empty() {
self.inner.update_sender.send(
TimelineVectorDiffs { diffs: timeline_event_diffs, origin: EventsOrigin::Sync },
// This function is part of the `RoomEventCache` flow. The generic update is
// handled by it.
None,
);
}
Ok(())
}
/// Force to reload the thread.
//
// TODO(@hywan): Temporary fix. All the states must be in a single struct behind

View File

@@ -14,7 +14,7 @@
use eyeball_im::VectorDiff;
use matrix_sdk_base::{
check_validity_of_replacement_events,
apply_redaction, check_validity_of_replacement_events,
deserialized_responses::{ThreadSummary, ThreadSummaryStatus},
event_cache::{
Event, Gap,
@@ -27,9 +27,16 @@ use matrix_sdk_base::{
sync::Timeline,
};
use matrix_sdk_common::executor::spawn;
use ruma::{EventId, OwnedEventId, OwnedRoomId, OwnedUserId, events::relation::RelationType};
use ruma::{
EventId, OwnedEventId, OwnedRoomId, OwnedUserId,
events::{
AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
relation::RelationType, room::redaction::SyncRoomRedactionEvent,
},
room_version_rules::RoomVersionRules,
};
use tokio::sync::broadcast::Sender;
use tracing::{debug, error, instrument, trace};
use tracing::{debug, error, instrument, trace, warn};
use super::{
super::{
@@ -60,6 +67,9 @@ pub struct ThreadEventCacheState {
/// The user's own user id.
pub own_user_id: OwnedUserId,
/// The rules for the version of this room.
room_version_rules: RoomVersionRules,
/// Reference to the underlying backing store.
store: EventCacheStoreLock,
@@ -191,6 +201,7 @@ impl LockedThreadEventCacheState {
room_id: OwnedRoomId,
thread_id: OwnedEventId,
own_user_id: OwnedUserId,
room_version_rules: RoomVersionRules,
store: EventCacheStoreLock,
update_sender: ThreadEventCacheUpdateSender,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
@@ -255,6 +266,7 @@ impl LockedThreadEventCacheState {
room_id,
thread_id,
own_user_id,
room_version_rules,
store,
thread_linked_chunk: EventLinkedChunk::with_initial_linked_chunk(
linked_chunk,
@@ -490,8 +502,11 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> {
self.state.shrink_to_last_chunk(&self.store).await?;
}
// Save `bundled_latest_thread_event`.
// Do stuff for each event.
for event in events {
// Handle redaction.
self.maybe_apply_new_redaction(&event).await?;
// Save a bundled thread event, if there was one.
if let Some(bundled_thread) = event.bundled_latest_thread_event {
self.save_events([*bundled_thread]).await?;
@@ -503,6 +518,67 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> {
Ok((has_new_gap, timeline_event_diffs))
}
/// If the given event is a redaction, try to retrieve the
/// to-be-redacted event in the chunk, and replace it by the
/// redacted form.
#[instrument(skip_all)]
async fn maybe_apply_new_redaction(&mut self, event: &Event) -> Result<()> {
let raw_event = event.raw();
// Do not deserialise the entire event if we aren't certain it's a
// `m.room.redaction`. It saves a non-negligible amount of computations.
let Ok(Some(MessageLikeEventType::RoomRedaction)) =
raw_event.get_field::<MessageLikeEventType>("type")
else {
return Ok(());
};
// It is a `m.room.redaction`! We can deserialize it entirely.
let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
redaction,
))) = raw_event.deserialize()
else {
return Ok(());
};
let Some(event_id) = redaction.redacts(&self.room_version_rules.redaction) else {
warn!("missing target event id from the redaction event");
return Ok(());
};
// Replace the redacted event by a redacted form, if we knew about it.
let Some((location, mut target_event)) = self.find_event(event_id).await? else {
trace!("redacted event is missing from the linked chunk");
return Ok(());
};
let target_event_raw = target_event.raw();
// Don't redact already redacted events.
if let Ok(deserialized) = target_event_raw.deserialize()
&& deserialized.is_redacted()
{
return Ok(());
};
if let Some(redacted_event) = apply_redaction(
target_event_raw,
event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
&self.room_version_rules.redaction,
) {
// It's safe to cast `redacted_event` here:
// - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent`
// when calling .raw(), so it's still one under the hood.
// - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case.
target_event.replace_raw(redacted_event.cast_unchecked());
self.replace_event_at(location, target_event.clone()).await?;
}
Ok(())
}
/// See documentation of [`find_event`].
pub(super) async fn find_event(
&self,
@@ -517,16 +593,11 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> {
/// observers that a single item has been replaced. Otherwise,
/// such a notification is not emitted, because observers are
/// unlikely to observe the store updates directly.
pub async fn replace_event_if_present(
pub async fn replace_event_at(
&mut self,
event_id: &EventId,
location: EventLocation,
new_event: Event,
) -> Result<()> {
let Some((location, _event)) = self.find_event(event_id).await? else {
trace!("redacted event is missing from the thread linked chunk");
return Ok(());
};
match location {
EventLocation::Memory(position) => {
self.state