From 7de74e2c04e8efaa57274d3c6d4632ff8d94188e Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 2 Dec 2024 12:26:44 +0100 Subject: [PATCH] feat(event cache): reload the linked chunk from the store, if storage's enabled --- .../src/event_cache/deduplicator.rs | 30 +++++++++++---- crates/matrix-sdk/src/event_cache/mod.rs | 6 ++- .../matrix-sdk/src/event_cache/room/events.rs | 20 ++++++++-- crates/matrix-sdk/src/event_cache/room/mod.rs | 37 ++++++++++--------- 4 files changed, 63 insertions(+), 30 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/deduplicator.rs b/crates/matrix-sdk/src/event_cache/deduplicator.rs index 9a6e8d933..7dbd54eec 100644 --- a/crates/matrix-sdk/src/event_cache/deduplicator.rs +++ b/crates/matrix-sdk/src/event_cache/deduplicator.rs @@ -18,6 +18,7 @@ use std::{collections::BTreeSet, fmt, sync::Mutex}; use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder}; +use tracing::warn; use super::room::events::{Event, RoomEvents}; @@ -46,16 +47,29 @@ impl Deduplicator { const APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS: usize = 1_000; const DESIRED_FALSE_POSITIVE_RATE: f64 = 0.01; - /// Create a new `Deduplicator`. + /// Create a new `Deduplicator` with no prior knowledge of known events. + #[cfg(test)] pub fn new() -> Self { - Self { - bloom_filter: Mutex::new( - GrowableBloomBuilder::new() - .estimated_insertions(Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS) - .desired_error_ratio(Self::DESIRED_FALSE_POSITIVE_RATE) - .build(), - ), + Self::with_initial_events(std::iter::empty()) + } + + /// Create a new `Deduplicator` filled with initial events. + /// + /// This won't detect duplicates in the initial events, only learn about + /// those events. + pub fn with_initial_events<'a>(events: impl Iterator) -> Self { + let mut bloom_filter = GrowableBloomBuilder::new() + .estimated_insertions(Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS) + .desired_error_ratio(Self::DESIRED_FALSE_POSITIVE_RATE) + .build(); + for e in events { + let Some(event_id) = e.event_id() else { + warn!("initial event in deduplicator had no event id"); + continue; + }; + bloom_filter.insert(event_id); } + Self { bloom_filter: Mutex::new(bloom_filter) } } /// Scan a collection of events and detect duplications. diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 0f4b5ffc4..cc0ac6d14 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -42,6 +42,7 @@ use matrix_sdk_base::{ }; use matrix_sdk_common::executor::{spawn, JoinHandle}; use once_cell::sync::OnceCell; +use room::RoomEventCacheState; use ruma::{ events::{relation::RelationType, AnySyncEphemeralRoomEvent}, serde::Raw, @@ -458,9 +459,12 @@ impl EventCacheInner { return Ok(room.clone()); } + let room_state = + RoomEventCacheState::new(room_id.to_owned(), self.store.clone()).await?; + let room_event_cache = RoomEventCache::new( self.client.clone(), - self.store.clone(), + room_state, room_id.to_owned(), self.all_events.clone(), ); diff --git a/crates/matrix-sdk/src/event_cache/room/events.rs b/crates/matrix-sdk/src/event_cache/room/events.rs index eec213c93..6313ac3d3 100644 --- a/crates/matrix-sdk/src/event_cache/room/events.rs +++ b/crates/matrix-sdk/src/event_cache/room/events.rs @@ -52,14 +52,28 @@ impl Default for RoomEvents { impl RoomEvents { /// Build a new [`RoomEvents`] struct with zero events. pub fn new() -> Self { - let mut chunks = LinkedChunk::new_with_update_history(); + Self::with_initial_chunks(None) + } + + /// Build a new [`RoomEvents`] struct with prior chunks knowledge. + /// + /// The provided [`LinkedChunk`] must have been built with update history. + pub fn with_initial_chunks( + chunks: Option>, + ) -> Self { + let mut chunks = chunks.unwrap_or_else(LinkedChunk::new_with_update_history); + let chunks_updates_as_vectordiffs = chunks .as_vector() // SAFETY: The `LinkedChunk` has been built with `new_with_update_history`, so // `as_vector` must return `Some(…)`. - .expect("`LinkedChunk` must have been constructor with `new_with_update_history`"); + .expect("`LinkedChunk` must have been built with `new_with_update_history`"); - Self { chunks, chunks_updates_as_vectordiffs, deduplicator: Deduplicator::new() } + // Let the deduplicator know about initial events. + let deduplicator = + Deduplicator::with_initial_events(chunks.items().map(|(_pos, event)| event)); + + Self { chunks, chunks_updates_as_vectordiffs, deduplicator } } /// Clear all events. diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 399d78160..2e2e6e306 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -19,10 +19,8 @@ use std::{collections::BTreeMap, fmt, sync::Arc}; use events::Gap; use matrix_sdk_base::{ deserialized_responses::{AmbiguityChange, SyncTimelineEvent}, - event_cache::store::EventCacheStoreLock, sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline}, }; -use once_cell::sync::OnceCell; use ruma::{ events::{ relation::RelationType, @@ -65,11 +63,11 @@ impl RoomEventCache { /// Create a new [`RoomEventCache`] using the given room and store. pub(super) fn new( client: WeakClient, - store: Arc>, + state: RoomEventCacheState, room_id: OwnedRoomId, all_events_cache: Arc>, ) -> Self { - Self { inner: Arc::new(RoomEventCacheInner::new(client, store, room_id, all_events_cache)) } + Self { inner: Arc::new(RoomEventCacheInner::new(client, state, room_id, all_events_cache)) } } /// Subscribe to room updates for this room, after getting the initial list @@ -234,18 +232,15 @@ impl RoomEventCacheInner { /// to handle new timeline events. fn new( client: WeakClient, - store: Arc>, + state: RoomEventCacheState, room_id: OwnedRoomId, all_events_cache: Arc>, ) -> Self { let sender = Sender::new(32); - let weak_room = WeakRoom::new(client, room_id); - let room_id = weak_room.room_id().to_owned(); - Self { - room_id: room_id.clone(), - state: RwLock::new(RoomEventCacheState::new(room_id, store)), + room_id: weak_room.room_id().to_owned(), + state: RwLock::new(state), all_events: all_events_cache, sender, pagination_batch_token_notifier: Default::default(), @@ -576,14 +571,20 @@ mod private { } impl RoomEventCacheState { - /// Create a new empty state. - pub fn new(room: OwnedRoomId, store: Arc>) -> Self { - Self { - room, - store, - events: RoomEvents::default(), - waited_for_initial_prev_token: false, - } + /// Create a new state, or reload it from storage if it's been enabled. + pub async fn new( + room: OwnedRoomId, + store: Arc>, + ) -> Result { + let events = if let Some(store) = store.get() { + let locked = store.lock().await?; + let chunks = locked.reload_linked_chunk(&room).await?; + RoomEvents::with_initial_chunks(chunks) + } else { + RoomEvents::default() + }; + + Ok(Self { room, store, events, waited_for_initial_prev_token: false }) } /// Propagate changes to the underlying storage.