From 48da03a14871cb5dc214d0feca21ed2b44489d87 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 27 Feb 2025 18:09:24 +0100 Subject: [PATCH] refactor(event cache): don't make the store optional in the event cache --- crates/matrix-sdk/src/client/mod.rs | 7 +- crates/matrix-sdk/src/event_cache/mod.rs | 21 +--- crates/matrix-sdk/src/event_cache/room/mod.rs | 106 +++++------------- 3 files changed, 38 insertions(+), 96 deletions(-) diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 0c9b5050f..21c1a7e31 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -392,7 +392,12 @@ impl ClientInner { let _ = client .event_cache - .get_or_init(|| async { EventCache::new(WeakClient::from_inner(&client)) }) + .get_or_init(|| async { + EventCache::new( + WeakClient::from_inner(&client), + client.base_client.event_cache_store().clone(), + ) + }) .await; client diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 5e05438a1..58d3fe6e9 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -44,7 +44,6 @@ use matrix_sdk_base::{ sync::RoomUpdates, }; use matrix_sdk_common::executor::{spawn, JoinHandle}; -use once_cell::sync::OnceCell; use room::RoomEventCacheState; use ruma::{ events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, @@ -155,11 +154,11 @@ impl Debug for EventCache { impl EventCache { /// Create a new [`EventCache`] for the given client. - pub(crate) fn new(client: WeakClient) -> Self { + pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self { Self { inner: Arc::new(EventCacheInner { client, - store: Default::default(), + store: event_cache_store, multiple_room_updates_lock: Default::default(), by_room: Default::default(), drop_handles: Default::default(), @@ -176,12 +175,6 @@ impl EventCache { pub fn subscribe(&self) -> Result<()> { let client = self.inner.client()?; - // Initialize storage. - let _ = self.inner.store.get_or_try_init::<_, EventCacheError>(|| { - let client = self.inner.client()?; - Ok(client.event_cache_store().clone()) - })?; - // Initialize the drop handles. let _ = self.inner.drop_handles.get_or_init(|| { // Spawn the task that will listen to all the room updates at once. @@ -366,10 +359,7 @@ struct EventCacheInner { client: WeakClient, /// Reference to the underlying store. - /// - /// Set to none if we shouldn't use storage for reading / writing linked - /// chunks. - store: Arc>, + store: EventCacheStoreLock, /// A lock used when many rooms must be updated at once. /// @@ -452,10 +442,7 @@ impl EventCacheInner { .await; // Clear the storage for all the rooms, using the storage facility. - if let Some(store) = self.store.get() { - let store_guard = store.lock().await?; - store_guard.clear_all_rooms_chunks().await?; - } + self.store.lock().await?.clear_all_rooms_chunks().await?; // At this point, all the in-memory linked chunks are desynchronized from the // storage. Resynchronize them manually by calling reset(), and diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 585276d84..3caaeb268 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -537,7 +537,6 @@ mod private { linked_chunk::{lazy_loader, ChunkContent, ChunkIdentifierGenerator, Position, Update}, }; use matrix_sdk_common::executor::spawn; - use once_cell::sync::OnceCell; use ruma::{ events::{ relation::RelationType, room::redaction::SyncRoomRedactionEvent, AnySyncTimelineEvent, @@ -570,10 +569,7 @@ mod private { room_version: RoomVersionId, /// Reference to the underlying backing store. - /// - /// Set to none if the room shouldn't read the linked chunk from - /// storage, and shouldn't store updates to storage. - store: Arc>, + store: EventCacheStoreLock, /// The events of the room. events: RoomEvents, @@ -607,43 +603,35 @@ mod private { pub async fn new( room_id: OwnedRoomId, room_version: RoomVersionId, - store: Arc>, + store: EventCacheStoreLock, pagination_status: SharedObservable, ) -> Result { - let (events, deduplicator) = if let Some(store) = store.get() { - let store_lock = store.lock().await?; + let store_lock = store.lock().await?; - let linked_chunk = match store_lock - .load_last_chunk(&room_id) - .await - .map_err(EventCacheError::from) - .and_then(|(last_chunk, chunk_identifier_generator)| { - lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator) - .map_err(EventCacheError::from) - }) { - Ok(linked_chunk) => linked_chunk, + let linked_chunk = match store_lock + .load_last_chunk(&room_id) + .await + .map_err(EventCacheError::from) + .and_then(|(last_chunk, chunk_identifier_generator)| { + lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator) + .map_err(EventCacheError::from) + }) { + Ok(linked_chunk) => linked_chunk, - Err(err) => { - error!("error when reloading a linked chunk from memory: {err}"); + Err(err) => { + error!("error when reloading a linked chunk from memory: {err}"); - // Clear storage for this room. - store_lock - .handle_linked_chunk_updates(&room_id, vec![Update::Clear]) - .await?; + // Clear storage for this room. + store_lock.handle_linked_chunk_updates(&room_id, vec![Update::Clear]).await?; - // Restart with an empty linked chunk. - None - } - }; - - ( - RoomEvents::with_initial_linked_chunk(linked_chunk), - Deduplicator::new_store_based(room_id.clone(), store.clone()), - ) - } else { - (RoomEvents::default(), Deduplicator::new_memory_based()) + // Restart with an empty linked chunk. + None + } }; + let events = RoomEvents::with_initial_linked_chunk(linked_chunk); + let deduplicator = Deduplicator::new_store_based(room_id.clone(), store.clone()); + Ok(Self { room: room_id, room_version, @@ -727,22 +715,6 @@ mod private { pub(in super::super) async fn load_more_events_backwards( &mut self, ) -> Result { - let Some(store) = self.store.get() else { - // No store to reload events from. Pretend the caller has to act as if a gap was - // present. Limited syncs will always clear and push a gap, in this mode. - // There's no lazy-loading. - - // Look for a gap in the in-memory chunk, iterating in reverse so as to get the - // most recent one. - if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) { - return Ok(LoadMoreEventsBackwardsOutcome::Gap { - prev_token: Some(prev_token), - }); - } - - return Ok(self.conclude_load_more_for_fully_loaded_chunk()); - }; - // If any in-memory chunk is a gap, don't load more events, and let the caller // resolve the gap. if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) { @@ -754,7 +726,7 @@ mod private { let first_chunk_identifier = self.events.chunks().next().expect("a linked chunk is never empty").identifier(); - let store = store.lock().await?; + let store = self.store.lock().await?; // The first chunk is not a gap, we can load its previous chunk. let new_first_chunk = @@ -835,13 +807,7 @@ mod private { pub(super) async fn shrink_to_last_chunk( &mut self, ) -> Result>>, EventCacheError> { - let Some(store) = self.store.get() else { - // No need to do anything if there's no storage; we'll already reset the - // timeline after a limited response. - return Ok(None); - }; - - let store_lock = store.lock().await?; + let store_lock = self.store.lock().await?; // Attempt to load the last chunk. let (last_chunk, chunk_identifier_generator) = match store_lock @@ -1014,10 +980,6 @@ mod private { &mut self, mut updates: Vec>, ) -> Result<(), EventCacheError> { - let Some(store) = self.store.get() else { - return Ok(()); - }; - if updates.is_empty() { return Ok(()); } @@ -1045,7 +1007,7 @@ mod private { // The store cross-process locking involves an actual mutex, which ensures that // storing updates happens in the expected order. - let store = store.clone(); + let store = self.store.clone(); let room_id = self.room.clone(); spawn(async move { @@ -1111,12 +1073,7 @@ mod private { } } - let Some(store) = self.store.get() else { - // No store, event is not present. - return Ok(None); - }; - - let store = store.lock().await?; + let store = self.store.lock().await?; Ok(store .find_event(&self.room, event_id) @@ -1134,12 +1091,7 @@ mod private { event_id: &EventId, filters: Option>, ) -> Result)>, EventCacheError> { - let Some(store) = self.store.get() else { - // No store, event is not present. - return Ok(None); - }; - - let store = store.lock().await?; + let store = self.store.lock().await?; // First, hit storage to get the target event and its related events. let found = store.find_event(&self.room, event_id).await?; @@ -1315,9 +1267,7 @@ mod private { &self, events: impl IntoIterator, ) -> Result<(), EventCacheError> { - let Some(store) = self.store.get() else { return Ok(()) }; - - let store = store.clone(); + let store = self.store.clone(); let room_id = self.room.clone(); let events = events.into_iter().collect::>();