fix(sdk): EventCacheInner::all_caches_for_room returns a reference to Caches.

This patch updates `EventCacheInner::all_caches_for_rooms` by returning
an `OwnedRwLockReadGuard` instead of `Caches`. `Caches` no longer
implement `Clone`, which prevents cloning all the event caches per room
when we need only one for example.

`Caches` has two new methods: `handle_joined_room_update` and
`handle_left_room_update`. This type acts more like a dispatcher now,
which was the initial idea. That way, we don't need to dispatch manually
on all event caches: `Caches` is responsible for that.

We need to be a bit careful now since `all_caches_for_rooms` returns an
owned read-lock over `EventCacheInner::by_room`.
This commit is contained in:
Ivan Enderlin
2026-03-09 10:30:37 +01:00
parent 9608aa5840
commit 0ce36d0bfb
3 changed files with 66 additions and 31 deletions

View File

@@ -17,6 +17,7 @@ use eyeball_im::VectorDiff;
use matrix_sdk_base::{
ThreadingSupport,
event_cache::{Event, store::EventCacheStoreLock},
sync::{JoinedRoomUpdate, LeftRoomUpdate},
};
use ruma::{OwnedRoomId, RoomId};
use tokio::sync::{broadcast::Sender, mpsc};
@@ -33,7 +34,7 @@ pub mod room;
pub mod thread;
/// A type to hold all the caches for a given room.
#[derive(Clone)]
#[derive(Debug)]
pub(super) struct Caches {
pub room: room::RoomEventCache,
}
@@ -102,6 +103,24 @@ impl Caches {
Ok(Self { room: room_event_cache })
}
/// Update all the event caches with a [`JoinedRoomUpdate`].
pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
let Self { room } = &self;
room.handle_joined_room_update(updates).await?;
Ok(())
}
/// Update all the event caches with a [`LeftRoomUpdate`].
pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
let Self { room } = &self;
room.handle_left_room_update(updates).await?;
Ok(())
}
/// Try to acquire exclusive locks over all the event caches managed by
/// this [`Caches`], in order to reset all the in-memory data.
///

View File

@@ -45,7 +45,7 @@ use matrix_sdk_base::{
};
use ruma::{OwnedRoomId, RoomId};
use tokio::sync::{
Mutex, RwLock,
Mutex, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock,
broadcast::{Receiver, Sender, channel},
mpsc,
};
@@ -348,7 +348,7 @@ impl EventCache {
return Err(EventCacheError::NotSubscribedYet);
};
let Caches { room } = self.inner.all_caches_for_room(room_id).await?;
let room = self.inner.all_caches_for_room(room_id).await?.room.clone();
Ok((room, drop_handles))
}
@@ -402,6 +402,8 @@ impl Default for EventCacheConfig {
}
}
type CachesByRoom = HashMap<OwnedRoomId, Caches>;
struct EventCacheInner {
/// A weak reference to the inner client, useful when trying to get a handle
/// on the owning client.
@@ -422,7 +424,9 @@ struct EventCacheInner {
multiple_room_updates_lock: Mutex<()>,
/// Lazily-filled cache of live [`RoomEventCache`], once per room.
by_room: RwLock<HashMap<OwnedRoomId, Caches>>,
//
// It's behind an `Arc` to get owned locks.
by_room: Arc<RwLock<CachesByRoom>>,
/// Handles to keep alive the task listening to updates.
drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
@@ -565,12 +569,12 @@ impl EventCacheInner {
// Left rooms.
for (room_id, left_room_update) in updates.left {
let Ok(Caches { room }) = self.all_caches_for_room(&room_id).await else {
let Ok(caches) = self.all_caches_for_room(&room_id).await else {
error!(?room_id, "Room must exist");
continue;
};
if let Err(err) = room.handle_left_room_update(left_room_update).await {
if let Err(err) = caches.handle_left_room_update(left_room_update).await {
// Non-fatal error, try to continue to the next room.
error!("handling left room update: {err}");
}
@@ -580,12 +584,12 @@ impl EventCacheInner {
for (room_id, joined_room_update) in updates.joined {
trace!(?room_id, "Handling a `JoinedRoomUpdate`");
let Ok(Caches { room }) = self.all_caches_for_room(&room_id).await else {
let Ok(caches) = self.all_caches_for_room(&room_id).await else {
error!(?room_id, "Room must exist");
continue;
};
if let Err(err) = room.handle_joined_room_update(joined_room_update).await {
if let Err(err) = caches.handle_joined_room_update(joined_room_update).await {
// Non-fatal error, try to continue to the next room.
error!(%room_id, "handling joined room update: {err}");
}
@@ -597,25 +601,32 @@ impl EventCacheInner {
Ok(())
}
/// Return a room-specific view over the [`EventCache`].
async fn all_caches_for_room(&self, room_id: &RoomId) -> Result<Caches> {
/// Return all the event caches associated to a specific room.
async fn all_caches_for_room(
&self,
room_id: &RoomId,
) -> Result<OwnedRwLockReadGuard<CachesByRoom, Caches>> {
// Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
// write lock.
let by_room_guard = self.by_room.read().await;
match OwnedRwLockReadGuard::try_map(self.by_room.clone().read_owned().await, |by_room| {
by_room.get(room_id)
}) {
Ok(caches) => Ok(caches),
match by_room_guard.get(room_id) {
Some(caches) => Ok(caches.clone()),
None => {
Err(by_room_guard) => {
// Slow-path: the entry doesn't exist; let's acquire a write lock.
drop(by_room_guard);
let mut by_room_guard = self.by_room.write().await;
let by_room_guard = self.by_room.clone().write_owned().await;
// In the meanwhile, some other caller might have obtained write access and done
// the same, so check for existence again.
if let Some(caches) = by_room_guard.get(room_id) {
return Ok(caches.clone());
}
let mut by_room_guard =
match OwnedRwLockWriteGuard::try_downgrade_map(by_room_guard, |by_room| {
by_room.get(room_id)
}) {
Ok(caches) => return Ok(caches),
Err(by_room_guard) => by_room_guard,
};
let caches = Caches::new(
&self.client,
@@ -631,9 +642,12 @@ impl EventCacheInner {
)
.await?;
by_room_guard.insert(room_id.to_owned(), caches.clone());
by_room_guard.insert(room_id.to_owned(), caches);
Ok(caches)
Ok(OwnedRwLockWriteGuard::try_downgrade_map(by_room_guard, |by_room| {
by_room.get(room_id)
})
.expect("`Caches` has just been inserted"))
}
}
}

View File

@@ -34,7 +34,7 @@ use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument,
use super::{
AutoShrinkChannelPayload, EventCacheError, EventCacheInner, EventsOrigin,
RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate, TimelineVectorDiffs, caches::Caches,
RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate, TimelineVectorDiffs,
};
use crate::{
client::WeakClient,
@@ -141,17 +141,19 @@ pub(super) async fn auto_shrink_linked_chunk_task(
return;
};
let caches = match inner.all_caches_for_room(&room_id).await {
Ok(caches) => caches,
Err(err) => {
warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}");
continue;
}
let room = {
let caches = match inner.all_caches_for_room(&room_id).await {
Ok(caches) => caches,
Err(err) => {
warn!(for_room = %room_id, "Failed to get the `Caches`: {err}");
continue;
}
};
caches.room.clone()
};
trace!("waiting for state lock…");
let Caches { room } = caches;
trace!("Waiting for state lock…");
let mut state = match room.state().write().await {
Ok(state) => state,