From e79f83e5cc7b561fae21d79283f64c135b95fc4e Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 26 May 2026 14:31:18 +0200 Subject: [PATCH] fix(sdk): Re-compute the `ThreadSummary` when R2D2 redecrypts an event in a thread. --- .../src/event_cache/caches/room/mod.rs | 2 +- .../src/event_cache/caches/thread/mod.rs | 31 +++++++++----- .../matrix-sdk/src/event_cache/redecryptor.rs | 40 ++++++++++++++----- 3 files changed, 52 insertions(+), 21 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs index 734896d64..faf59a53c 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs @@ -261,7 +261,7 @@ impl RoomEventCache { Ok(()) } - pub(super) async fn update_thread_summary( + pub(in super::super) async fn update_thread_summary( &self, thread_id: &EventId, new_thread_summary: Option, diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs index a9347ebc1..58733c0ff 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs @@ -162,7 +162,7 @@ impl ThreadEventCache { } /// Return a reference to the state. - pub(super) fn state(&self) -> &LockedThreadEventCacheState { + pub(in super::super) fn state(&self) -> &LockedThreadEventCacheState { &self.inner.state } @@ -260,20 +260,29 @@ impl ThreadEventCache { /// Try to locate the events in the linked chunk corresponding to the given /// list of decrypted events, and replace them, while alerting observers /// about the update. + /// + /// Return `true` if at least one event has been updated. #[cfg(feature = "e2e-encryption")] - pub(in super::super) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result<()> { + pub(in super::super) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result { let timeline_event_diffs = self.inner.state.write().await?.replace_utds(events).await?; - if let Some(timeline_event_diffs) = timeline_event_diffs - && !timeline_event_diffs.is_empty() - { - self.inner.update_sender.send( - TimelineVectorDiffs { diffs: timeline_event_diffs, origin: EventsOrigin::Cache }, - Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }), - ); - } + Ok( + if let Some(timeline_event_diffs) = timeline_event_diffs + && !timeline_event_diffs.is_empty() + { + self.inner.update_sender.send( + TimelineVectorDiffs { + diffs: timeline_event_diffs, + origin: EventsOrigin::Cache, + }, + Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }), + ); - Ok(()) + true + } else { + false + }, + ) } } diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 746e437ed..0bd04c6d6 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -121,7 +121,11 @@ use std::{ use as_variant::as_variant; use futures_core::Stream; -use futures_util::{StreamExt, future::join_all, pin_mut}; +use futures_util::{ + StreamExt, + future::{join_all, try_join_all}, + pin_mut, +}; #[cfg(doc)] use matrix_sdk_base::{BaseClient, crypto::OlmMachine}; use matrix_sdk_base::{ @@ -416,15 +420,33 @@ impl EventCache { // accumulate over time. Consider keeping track of which linked // chunk contain which event id, to avoid doing the linear searches // here. - join_all( - all_caches - .threads - .read() - .await - .values() - .map(|thread_cache| thread_cache.replace_utds(&events)), + + // Replaces UTDs in each thread, and maybe update the thread summary. + for (thread_id, thread_cache) in try_join_all( + all_caches.threads.read().await.iter().map(|(thread_id, thread_cache)| async { + Result::<_, EventCacheError>::Ok( + // If at least one event has been replaced, return the `thread_id` and the + // `thread_cache` to update the thread summary later. + thread_cache + .replace_utds(&events) + .await? + .then(|| (thread_id.clone(), thread_cache.clone())), + ) + }), ) - .await; + .await? + .into_iter() + // Filter out results that are `None`, i.e. a thread where no UTD has been replaced. + .flatten() + { + all_caches + .room + .update_thread_summary( + &thread_id, + thread_cache.state().read().await?.compute_thread_summary().await?, + ) + .await?; + } } // Resolve on the pinned-events cache.