refactor(event cache): move the gist of deduplication into BloomFilterDeduplicator

This commit is contained in:
Benjamin Bouvier
2025-02-12 17:03:53 +01:00
parent 28cd8beb77
commit b95cf79a6d
2 changed files with 49 additions and 35 deletions

View File

@@ -18,7 +18,8 @@
use std::{collections::BTreeSet, fmt, sync::Mutex};
use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
use tracing::warn;
use ruma::OwnedEventId;
use tracing::{debug, warn};
use super::room::events::{Event, RoomEvents};
@@ -72,6 +73,48 @@ impl BloomFilterDeduplicator {
Self { bloom_filter: Mutex::new(bloom_filter) }
}
/// Find duplicates in the given collection of events, and return both
/// valid events (those with an event id) as well as the event ids of
/// duplicate events.
pub fn filter_duplicate_events<'a, I>(
&'a self,
events: I,
room_events: &'a RoomEvents,
) -> (Vec<Event>, Vec<OwnedEventId>)
where
I: Iterator<Item = Event> + 'a,
{
let mut duplicated_event_ids = Vec::new();
let events = self
.scan_and_learn(events, room_events)
.filter_map(|decorated_event| match decorated_event {
Decoration::Unique(event) => Some(event),
Decoration::Duplicated(event) => {
debug!(event_id = ?event.event_id(), "Found a duplicated event");
duplicated_event_ids.push(
event
.event_id()
// SAFETY: An event with no ID is decorated as
// `Decoration::Invalid`. Thus, it's
// safe to unwrap the `Option<OwnedEventId>` here.
.expect("The event has no ID"),
);
// Keep the new event!
Some(event)
}
Decoration::Invalid(event) => {
warn!(?event, "Found an event with no ID");
None
}
})
.collect::<Vec<_>>();
(events, duplicated_event_ids)
}
/// Scan a collection of events and detect duplications.
///
/// This method takes a collection of events `new_events_to_scan` and
@@ -82,7 +125,7 @@ impl BloomFilterDeduplicator {
/// Each scanned event will update `Self`'s internal state.
///
/// `existing_events` represents all events of a room that already exist.
pub fn scan_and_learn<'a, I>(
fn scan_and_learn<'a, I>(
&'a self,
new_events_to_scan: I,
existing_events: &'a RoomEvents,

View File

@@ -540,13 +540,10 @@ mod private {
};
use once_cell::sync::OnceCell;
use ruma::{serde::Raw, OwnedEventId, OwnedRoomId, RoomId};
use tracing::{debug, error, instrument, trace, warn};
use tracing::{error, instrument, trace};
use super::{chunk_debug_string, events::RoomEvents};
use crate::event_cache::{
deduplicator::{BloomFilterDeduplicator, Decoration},
EventCacheError,
};
use crate::event_cache::{deduplicator::BloomFilterDeduplicator, EventCacheError};
/// State for a single room's event cache.
///
@@ -668,34 +665,8 @@ mod private {
where
I: Iterator<Item = Event> + 'a,
{
let mut duplicated_event_ids = Vec::new();
let events = self
.deduplicator
.scan_and_learn(events, &self.events)
.filter_map(|decorated_event| match decorated_event {
Decoration::Unique(event) => Some(event),
Decoration::Duplicated(event) => {
debug!(event_id = ?event.event_id(), "Found a duplicated event");
duplicated_event_ids.push(
event
.event_id()
// SAFETY: An event with no ID is decorated as
// `Decoration::Invalid`. Thus, it's
// safe to unwrap the `Option<OwnedEventId>` here.
.expect("The event has no ID"),
);
// Keep the new event!
Some(event)
}
Decoration::Invalid(event) => {
warn!(?event, "Found an event with no ID");
None
}
})
.collect::<Vec<_>>();
let (events, duplicated_event_ids) =
self.deduplicator.filter_duplicate_events(events, &self.events);
let all_duplicates = !events.is_empty() && events.len() == duplicated_event_ids.len();