From faa0e6e554f97f1b4c138399353e616d878ef71f Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Wed, 12 Feb 2025 17:13:54 +0100 Subject: [PATCH] feat(event cache): allow using the bloom filter OR the store to deduplicate events --- .../src/event_cache/deduplicator.rs | 127 ++++++++++++++---- .../matrix-sdk/src/event_cache/pagination.rs | 2 +- crates/matrix-sdk/src/event_cache/room/mod.rs | 30 ++--- 3 files changed, 120 insertions(+), 39 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/deduplicator.rs b/crates/matrix-sdk/src/event_cache/deduplicator.rs index 611e37ec5..0d4abd497 100644 --- a/crates/matrix-sdk/src/event_cache/deduplicator.rs +++ b/crates/matrix-sdk/src/event_cache/deduplicator.rs @@ -18,12 +18,109 @@ use std::{collections::BTreeSet, fmt, sync::Mutex}; use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder}; -use ruma::OwnedEventId; +use matrix_sdk_base::event_cache::store::EventCacheStoreLock; +use ruma::{OwnedEventId, OwnedRoomId}; use tracing::{debug, warn}; -use super::room::events::{Event, RoomEvents}; +use super::{ + room::events::{Event, RoomEvents}, + EventCacheError, +}; -/// `Deduplicator` is an efficient type to find duplicated events. +pub enum Deduplicator { + InMemory(BloomFilterDeduplicator), + PersistentStore(StoreDeduplicator), +} + +impl Deduplicator { + /// Create an empty deduplicator instance that uses an internal Bloom + /// filter. + /// + /// Such a deduplicator is stateful, with no initial known events, and it + /// will learn over time by using a Bloom filter which events are + /// duplicates or not. + /// + /// When the persistent storage is enabled by default, this constructor + /// (and the associated variant) will be removed. + pub fn new_memory_based() -> Self { + Self::InMemory(BloomFilterDeduplicator::new()) + } + + /// Create new store-based deduplicator that will run queries against the + /// store to find if any event is deduplicated or not. + /// + /// This deduplicator is stateless. + /// + /// When the persistent storage is enabled by default, this will become the + /// default, and [`Deduplicator`] will be replaced with + /// [`StoreDeduplicator`]. + pub fn new_store_based(room_id: OwnedRoomId, store: EventCacheStoreLock) -> Self { + Self::PersistentStore(StoreDeduplicator { room_id, store }) + } + + /// Find duplicates in the given collection of events, and return both + /// valid events (those with an event id) as well as the event ids of + /// duplicate events. + pub async fn filter_duplicate_events( + &self, + events: I, + room_events: &RoomEvents, + ) -> Result<(Vec, Vec), EventCacheError> + where + I: Iterator, + { + match self { + Deduplicator::InMemory(dedup) => Ok(dedup.filter_duplicate_events(events, room_events)), + Deduplicator::PersistentStore(dedup) => dedup.filter_duplicate_events(events).await, + } + } +} + +/// A deduplication mechanism based on the persistent storage associated to the +/// event cache. +/// +/// It will use queries to the persistent storage to figure where events are +/// duplicates or not, making it entirely stateless. +pub struct StoreDeduplicator { + /// The room this deduplicator applies to. + room_id: OwnedRoomId, + /// The actual event cache store implementation used to query events. + store: EventCacheStoreLock, +} + +impl StoreDeduplicator { + async fn filter_duplicate_events( + &self, + events: I, + ) -> Result<(Vec, Vec), EventCacheError> + where + I: Iterator, + { + let store = self.store.lock().await?; + + // Collect event ids as we "validate" events (i.e. check they have a valid event + // id.) + let mut event_ids = Vec::new(); + let events = events + .filter_map(|event| { + if let Some(event_id) = event.event_id() { + event_ids.push(event_id); + Some(event) + } else { + None + } + }) + .collect::>(); + + // Let the store do its magic ✨ + let duplicates = store.filter_duplicated_events(&self.room_id, event_ids).await?; + + Ok((events, duplicates)) + } +} + +/// `BloomFilterDeduplicator` is an efficient type to find duplicated events, +/// using an in-memory cache. /// /// It uses a [bloom filter] to provide a memory efficient probabilistic answer /// to: “has event E been seen already?”. False positives are possible, while @@ -49,34 +146,18 @@ impl BloomFilterDeduplicator { const DESIRED_FALSE_POSITIVE_RATE: f64 = 0.01; /// Create a new `Deduplicator` with no prior knowledge of known events. - #[cfg(test)] - pub fn new() -> Self { - Self::with_initial_events(std::iter::empty()) - } - - /// Create a new `Deduplicator` filled with initial events. - /// - /// This won't detect duplicates in the initial events, only learn about - /// those events. - pub fn with_initial_events<'a>(events: impl Iterator) -> Self { - let mut bloom_filter = GrowableBloomBuilder::new() + fn new() -> Self { + let bloom_filter = GrowableBloomBuilder::new() .estimated_insertions(Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS) .desired_error_ratio(Self::DESIRED_FALSE_POSITIVE_RATE) .build(); - for e in events { - let Some(event_id) = e.event_id() else { - warn!("initial event in deduplicator had no event id"); - continue; - }; - bloom_filter.insert(event_id); - } Self { bloom_filter: Mutex::new(bloom_filter) } } /// Find duplicates in the given collection of events, and return both /// valid events (those with an event id) as well as the event ids of /// duplicate events. - pub fn filter_duplicate_events<'a, I>( + fn filter_duplicate_events<'a, I>( &'a self, events: I, room_events: &'a RoomEvents, @@ -184,7 +265,7 @@ impl BloomFilterDeduplicator { /// Information about the scanned collection of events. #[derive(Debug)] -pub enum Decoration { +enum Decoration { /// This event is not duplicated. Unique(I), diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index c928edb50..46dfd25e2 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -181,7 +181,7 @@ impl RoomPagination { .collect::>(); let (new_events, duplicated_event_ids, all_deduplicated) = - state.collect_valid_and_duplicated_events(sync_events.clone().into_iter()); + state.collect_valid_and_duplicated_events(sync_events.clone().into_iter()).await?; let (backpagination_outcome, sync_timeline_events_diffs) = state .with_events_mut(move |room_events| { diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index d06bf9f6e..e5a574f04 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -424,8 +424,9 @@ impl RoomEventCacheInner { return Ok(()); } - let (events, duplicated_event_ids, all_duplicates) = - state.collect_valid_and_duplicated_events(sync_timeline_events.clone().into_iter()); + let (events, duplicated_event_ids, all_duplicates) = state + .collect_valid_and_duplicated_events(sync_timeline_events.clone().into_iter()) + .await?; let sync_timeline_events_diffs = if all_duplicates { // No new events, thus no need to change the room events. @@ -543,7 +544,7 @@ mod private { use tracing::{error, instrument, trace}; use super::{chunk_debug_string, events::RoomEvents}; - use crate::event_cache::{deduplicator::BloomFilterDeduplicator, EventCacheError}; + use crate::event_cache::{deduplicator::Deduplicator, EventCacheError}; /// State for a single room's event cache. /// @@ -563,7 +564,7 @@ mod private { events: RoomEvents, /// The events deduplicator instance to help finding duplicates. - deduplicator: BloomFilterDeduplicator, + deduplicator: Deduplicator, /// Have we ever waited for a previous-batch-token to come from sync, in /// the context of pagination? We do this at most once per room, @@ -602,7 +603,7 @@ mod private { room: OwnedRoomId, store: Arc>, ) -> Result { - let events = if let Some(store) = store.get() { + let (events, deduplicator) = if let Some(store) = store.get() { let locked = store.lock().await?; // Try to reload a linked chunk from storage. If it fails, log the error and @@ -620,15 +621,14 @@ mod private { } }; - RoomEvents::with_initial_chunks(linked_chunk) + ( + RoomEvents::with_initial_chunks(linked_chunk), + Deduplicator::new_store_based(room.clone(), store.clone()), + ) } else { - RoomEvents::default() + (RoomEvents::default(), Deduplicator::new_memory_based()) }; - let deduplicator = BloomFilterDeduplicator::with_initial_events( - events.events().map(|(_pos, event)| event), - ); - Ok(Self { room, store, events, deduplicator, waited_for_initial_prev_token: false }) } @@ -658,19 +658,19 @@ mod private { /// possibly misplace them. And we should not be missing /// events either: the already-known events would have their own /// previous-batch token (it might already be consumed). - pub fn collect_valid_and_duplicated_events<'a, I>( + pub async fn collect_valid_and_duplicated_events<'a, I>( &'a mut self, events: I, - ) -> (Vec, Vec, bool) + ) -> Result<(Vec, Vec, bool), EventCacheError> where I: Iterator + 'a, { let (events, duplicated_event_ids) = - self.deduplicator.filter_duplicate_events(events, &self.events); + self.deduplicator.filter_duplicate_events(events, &self.events).await?; let all_duplicates = !events.is_empty() && events.len() == duplicated_event_ids.len(); - (events, duplicated_event_ids, all_duplicates) + Ok((events, duplicated_event_ids, all_duplicates)) } /// Removes the bundled relations from an event, if they were present.