From 0ce36d0bfb4e4068e82b3487bf2cdac5b918e151 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 9 Mar 2026 10:30:37 +0100 Subject: [PATCH] fix(sdk): `EventCacheInner::all_caches_for_room` returns a reference to `Caches`. This patch updates `EventCacheInner::all_caches_for_rooms` by returning an `OwnedRwLockReadGuard` instead of `Caches`. `Caches` no longer implement `Clone`, which prevents cloning all the event caches per room when we need only one for example. `Caches` has two new methods: `handle_joined_room_update` and `handle_left_room_update`. This type acts more like a dispatcher now, which was the initial idea. That way, we don't need to dispatch manually on all event caches: `Caches` is responsible for that. We need to be a bit careful now since `all_caches_for_rooms` returns an owned read-lock over `EventCacheInner::by_room`. --- .../matrix-sdk/src/event_cache/caches/mod.rs | 21 +++++++- crates/matrix-sdk/src/event_cache/mod.rs | 54 ++++++++++++------- crates/matrix-sdk/src/event_cache/tasks.rs | 22 ++++---- 3 files changed, 66 insertions(+), 31 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/caches/mod.rs b/crates/matrix-sdk/src/event_cache/caches/mod.rs index 7bfefe2b7..c4af1d208 100644 --- a/crates/matrix-sdk/src/event_cache/caches/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/mod.rs @@ -17,6 +17,7 @@ use eyeball_im::VectorDiff; use matrix_sdk_base::{ ThreadingSupport, event_cache::{Event, store::EventCacheStoreLock}, + sync::{JoinedRoomUpdate, LeftRoomUpdate}, }; use ruma::{OwnedRoomId, RoomId}; use tokio::sync::{broadcast::Sender, mpsc}; @@ -33,7 +34,7 @@ pub mod room; pub mod thread; /// A type to hold all the caches for a given room. -#[derive(Clone)] +#[derive(Debug)] pub(super) struct Caches { pub room: room::RoomEventCache, } @@ -102,6 +103,24 @@ impl Caches { Ok(Self { room: room_event_cache }) } + /// Update all the event caches with a [`JoinedRoomUpdate`]. + pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> { + let Self { room } = &self; + + room.handle_joined_room_update(updates).await?; + + Ok(()) + } + + /// Update all the event caches with a [`LeftRoomUpdate`]. + pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> { + let Self { room } = &self; + + room.handle_left_room_update(updates).await?; + + Ok(()) + } + /// Try to acquire exclusive locks over all the event caches managed by /// this [`Caches`], in order to reset all the in-memory data. /// diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 5aaf8cf21..424012809 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -45,7 +45,7 @@ use matrix_sdk_base::{ }; use ruma::{OwnedRoomId, RoomId}; use tokio::sync::{ - Mutex, RwLock, + Mutex, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock, broadcast::{Receiver, Sender, channel}, mpsc, }; @@ -348,7 +348,7 @@ impl EventCache { return Err(EventCacheError::NotSubscribedYet); }; - let Caches { room } = self.inner.all_caches_for_room(room_id).await?; + let room = self.inner.all_caches_for_room(room_id).await?.room.clone(); Ok((room, drop_handles)) } @@ -402,6 +402,8 @@ impl Default for EventCacheConfig { } } +type CachesByRoom = HashMap; + struct EventCacheInner { /// A weak reference to the inner client, useful when trying to get a handle /// on the owning client. @@ -422,7 +424,9 @@ struct EventCacheInner { multiple_room_updates_lock: Mutex<()>, /// Lazily-filled cache of live [`RoomEventCache`], once per room. - by_room: RwLock>, + // + // It's behind an `Arc` to get owned locks. + by_room: Arc>, /// Handles to keep alive the task listening to updates. drop_handles: OnceLock>, @@ -565,12 +569,12 @@ impl EventCacheInner { // Left rooms. for (room_id, left_room_update) in updates.left { - let Ok(Caches { room }) = self.all_caches_for_room(&room_id).await else { + let Ok(caches) = self.all_caches_for_room(&room_id).await else { error!(?room_id, "Room must exist"); continue; }; - if let Err(err) = room.handle_left_room_update(left_room_update).await { + if let Err(err) = caches.handle_left_room_update(left_room_update).await { // Non-fatal error, try to continue to the next room. error!("handling left room update: {err}"); } @@ -580,12 +584,12 @@ impl EventCacheInner { for (room_id, joined_room_update) in updates.joined { trace!(?room_id, "Handling a `JoinedRoomUpdate`"); - let Ok(Caches { room }) = self.all_caches_for_room(&room_id).await else { + let Ok(caches) = self.all_caches_for_room(&room_id).await else { error!(?room_id, "Room must exist"); continue; }; - if let Err(err) = room.handle_joined_room_update(joined_room_update).await { + if let Err(err) = caches.handle_joined_room_update(joined_room_update).await { // Non-fatal error, try to continue to the next room. error!(%room_id, "handling joined room update: {err}"); } @@ -597,25 +601,32 @@ impl EventCacheInner { Ok(()) } - /// Return a room-specific view over the [`EventCache`]. - async fn all_caches_for_room(&self, room_id: &RoomId) -> Result { + /// Return all the event caches associated to a specific room. + async fn all_caches_for_room( + &self, + room_id: &RoomId, + ) -> Result> { // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a // write lock. - let by_room_guard = self.by_room.read().await; + match OwnedRwLockReadGuard::try_map(self.by_room.clone().read_owned().await, |by_room| { + by_room.get(room_id) + }) { + Ok(caches) => Ok(caches), - match by_room_guard.get(room_id) { - Some(caches) => Ok(caches.clone()), - - None => { + Err(by_room_guard) => { // Slow-path: the entry doesn't exist; let's acquire a write lock. drop(by_room_guard); - let mut by_room_guard = self.by_room.write().await; + let by_room_guard = self.by_room.clone().write_owned().await; // In the meanwhile, some other caller might have obtained write access and done // the same, so check for existence again. - if let Some(caches) = by_room_guard.get(room_id) { - return Ok(caches.clone()); - } + let mut by_room_guard = + match OwnedRwLockWriteGuard::try_downgrade_map(by_room_guard, |by_room| { + by_room.get(room_id) + }) { + Ok(caches) => return Ok(caches), + Err(by_room_guard) => by_room_guard, + }; let caches = Caches::new( &self.client, @@ -631,9 +642,12 @@ impl EventCacheInner { ) .await?; - by_room_guard.insert(room_id.to_owned(), caches.clone()); + by_room_guard.insert(room_id.to_owned(), caches); - Ok(caches) + Ok(OwnedRwLockWriteGuard::try_downgrade_map(by_room_guard, |by_room| { + by_room.get(room_id) + }) + .expect("`Caches` has just been inserted")) } } } diff --git a/crates/matrix-sdk/src/event_cache/tasks.rs b/crates/matrix-sdk/src/event_cache/tasks.rs index 0767bbd9a..95d6526fe 100644 --- a/crates/matrix-sdk/src/event_cache/tasks.rs +++ b/crates/matrix-sdk/src/event_cache/tasks.rs @@ -34,7 +34,7 @@ use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, use super::{ AutoShrinkChannelPayload, EventCacheError, EventCacheInner, EventsOrigin, - RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate, TimelineVectorDiffs, caches::Caches, + RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate, TimelineVectorDiffs, }; use crate::{ client::WeakClient, @@ -141,17 +141,19 @@ pub(super) async fn auto_shrink_linked_chunk_task( return; }; - let caches = match inner.all_caches_for_room(&room_id).await { - Ok(caches) => caches, - Err(err) => { - warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}"); - continue; - } + let room = { + let caches = match inner.all_caches_for_room(&room_id).await { + Ok(caches) => caches, + Err(err) => { + warn!(for_room = %room_id, "Failed to get the `Caches`: {err}"); + continue; + } + }; + + caches.room.clone() }; - trace!("waiting for state lock…"); - - let Caches { room } = caches; + trace!("Waiting for state lock…"); let mut state = match room.state().write().await { Ok(state) => state,