fix(sdk): Re-compute the ThreadSummary when R2D2 redecrypts an event in a thread.

This commit is contained in:
Ivan Enderlin
2026-05-26 14:31:18 +02:00
parent b79dcd6328
commit e79f83e5cc
3 changed files with 52 additions and 21 deletions

View File

@@ -261,7 +261,7 @@ impl RoomEventCache {
Ok(())
}
pub(super) async fn update_thread_summary(
pub(in super::super) async fn update_thread_summary(
&self,
thread_id: &EventId,
new_thread_summary: Option<ThreadSummary>,

View File

@@ -162,7 +162,7 @@ impl ThreadEventCache {
}
/// Return a reference to the state.
pub(super) fn state(&self) -> &LockedThreadEventCacheState {
pub(in super::super) fn state(&self) -> &LockedThreadEventCacheState {
&self.inner.state
}
@@ -260,20 +260,29 @@ impl ThreadEventCache {
/// Try to locate the events in the linked chunk corresponding to the given
/// list of decrypted events, and replace them, while alerting observers
/// about the update.
///
/// Return `true` if at least one event has been updated.
#[cfg(feature = "e2e-encryption")]
pub(in super::super) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result<()> {
pub(in super::super) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result<bool> {
let timeline_event_diffs = self.inner.state.write().await?.replace_utds(events).await?;
if let Some(timeline_event_diffs) = timeline_event_diffs
&& !timeline_event_diffs.is_empty()
{
self.inner.update_sender.send(
TimelineVectorDiffs { diffs: timeline_event_diffs, origin: EventsOrigin::Cache },
Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }),
);
}
Ok(
if let Some(timeline_event_diffs) = timeline_event_diffs
&& !timeline_event_diffs.is_empty()
{
self.inner.update_sender.send(
TimelineVectorDiffs {
diffs: timeline_event_diffs,
origin: EventsOrigin::Cache,
},
Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }),
);
Ok(())
true
} else {
false
},
)
}
}

View File

@@ -121,7 +121,11 @@ use std::{
use as_variant::as_variant;
use futures_core::Stream;
use futures_util::{StreamExt, future::join_all, pin_mut};
use futures_util::{
StreamExt,
future::{join_all, try_join_all},
pin_mut,
};
#[cfg(doc)]
use matrix_sdk_base::{BaseClient, crypto::OlmMachine};
use matrix_sdk_base::{
@@ -416,15 +420,33 @@ impl EventCache {
// accumulate over time. Consider keeping track of which linked
// chunk contain which event id, to avoid doing the linear searches
// here.
join_all(
all_caches
.threads
.read()
.await
.values()
.map(|thread_cache| thread_cache.replace_utds(&events)),
// Replaces UTDs in each thread, and maybe update the thread summary.
for (thread_id, thread_cache) in try_join_all(
all_caches.threads.read().await.iter().map(|(thread_id, thread_cache)| async {
Result::<_, EventCacheError>::Ok(
// If at least one event has been replaced, return the `thread_id` and the
// `thread_cache` to update the thread summary later.
thread_cache
.replace_utds(&events)
.await?
.then(|| (thread_id.clone(), thread_cache.clone())),
)
}),
)
.await;
.await?
.into_iter()
// Filter out results that are `None`, i.e. a thread where no UTD has been replaced.
.flatten()
{
all_caches
.room
.update_thread_summary(
&thread_id,
thread_cache.state().read().await?.compute_thread_summary().await?,
)
.await?;
}
}
// Resolve on the pinned-events cache.