feat(event cache): reload the linked chunk from the store, if storage's enabled

This commit is contained in:
Benjamin Bouvier
2024-12-02 12:26:44 +01:00
parent 019de4ffa0
commit 7de74e2c04
4 changed files with 63 additions and 30 deletions

View File

@@ -18,6 +18,7 @@
use std::{collections::BTreeSet, fmt, sync::Mutex};
use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
use tracing::warn;
use super::room::events::{Event, RoomEvents};
@@ -46,16 +47,29 @@ impl Deduplicator {
const APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS: usize = 1_000;
const DESIRED_FALSE_POSITIVE_RATE: f64 = 0.01;
/// Create a new `Deduplicator`.
/// Create a new `Deduplicator` with no prior knowledge of known events.
#[cfg(test)]
pub fn new() -> Self {
Self {
bloom_filter: Mutex::new(
GrowableBloomBuilder::new()
.estimated_insertions(Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS)
.desired_error_ratio(Self::DESIRED_FALSE_POSITIVE_RATE)
.build(),
),
Self::with_initial_events(std::iter::empty())
}
/// Create a new `Deduplicator` filled with initial events.
///
/// This won't detect duplicates in the initial events, only learn about
/// those events.
pub fn with_initial_events<'a>(events: impl Iterator<Item = &'a Event>) -> Self {
let mut bloom_filter = GrowableBloomBuilder::new()
.estimated_insertions(Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS)
.desired_error_ratio(Self::DESIRED_FALSE_POSITIVE_RATE)
.build();
for e in events {
let Some(event_id) = e.event_id() else {
warn!("initial event in deduplicator had no event id");
continue;
};
bloom_filter.insert(event_id);
}
Self { bloom_filter: Mutex::new(bloom_filter) }
}
/// Scan a collection of events and detect duplications.

View File

@@ -42,6 +42,7 @@ use matrix_sdk_base::{
};
use matrix_sdk_common::executor::{spawn, JoinHandle};
use once_cell::sync::OnceCell;
use room::RoomEventCacheState;
use ruma::{
events::{relation::RelationType, AnySyncEphemeralRoomEvent},
serde::Raw,
@@ -458,9 +459,12 @@ impl EventCacheInner {
return Ok(room.clone());
}
let room_state =
RoomEventCacheState::new(room_id.to_owned(), self.store.clone()).await?;
let room_event_cache = RoomEventCache::new(
self.client.clone(),
self.store.clone(),
room_state,
room_id.to_owned(),
self.all_events.clone(),
);

View File

@@ -52,14 +52,28 @@ impl Default for RoomEvents {
impl RoomEvents {
/// Build a new [`RoomEvents`] struct with zero events.
pub fn new() -> Self {
let mut chunks = LinkedChunk::new_with_update_history();
Self::with_initial_chunks(None)
}
/// Build a new [`RoomEvents`] struct with prior chunks knowledge.
///
/// The provided [`LinkedChunk`] must have been built with update history.
pub fn with_initial_chunks(
chunks: Option<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>>,
) -> Self {
let mut chunks = chunks.unwrap_or_else(LinkedChunk::new_with_update_history);
let chunks_updates_as_vectordiffs = chunks
.as_vector()
// SAFETY: The `LinkedChunk` has been built with `new_with_update_history`, so
// `as_vector` must return `Some(…)`.
.expect("`LinkedChunk` must have been constructor with `new_with_update_history`");
.expect("`LinkedChunk` must have been built with `new_with_update_history`");
Self { chunks, chunks_updates_as_vectordiffs, deduplicator: Deduplicator::new() }
// Let the deduplicator know about initial events.
let deduplicator =
Deduplicator::with_initial_events(chunks.items().map(|(_pos, event)| event));
Self { chunks, chunks_updates_as_vectordiffs, deduplicator }
}
/// Clear all events.

View File

@@ -19,10 +19,8 @@ use std::{collections::BTreeMap, fmt, sync::Arc};
use events::Gap;
use matrix_sdk_base::{
deserialized_responses::{AmbiguityChange, SyncTimelineEvent},
event_cache::store::EventCacheStoreLock,
sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
};
use once_cell::sync::OnceCell;
use ruma::{
events::{
relation::RelationType,
@@ -65,11 +63,11 @@ impl RoomEventCache {
/// Create a new [`RoomEventCache`] using the given room and store.
pub(super) fn new(
client: WeakClient,
store: Arc<OnceCell<EventCacheStoreLock>>,
state: RoomEventCacheState,
room_id: OwnedRoomId,
all_events_cache: Arc<RwLock<AllEventsCache>>,
) -> Self {
Self { inner: Arc::new(RoomEventCacheInner::new(client, store, room_id, all_events_cache)) }
Self { inner: Arc::new(RoomEventCacheInner::new(client, state, room_id, all_events_cache)) }
}
/// Subscribe to room updates for this room, after getting the initial list
@@ -234,18 +232,15 @@ impl RoomEventCacheInner {
/// to handle new timeline events.
fn new(
client: WeakClient,
store: Arc<OnceCell<EventCacheStoreLock>>,
state: RoomEventCacheState,
room_id: OwnedRoomId,
all_events_cache: Arc<RwLock<AllEventsCache>>,
) -> Self {
let sender = Sender::new(32);
let weak_room = WeakRoom::new(client, room_id);
let room_id = weak_room.room_id().to_owned();
Self {
room_id: room_id.clone(),
state: RwLock::new(RoomEventCacheState::new(room_id, store)),
room_id: weak_room.room_id().to_owned(),
state: RwLock::new(state),
all_events: all_events_cache,
sender,
pagination_batch_token_notifier: Default::default(),
@@ -576,14 +571,20 @@ mod private {
}
impl RoomEventCacheState {
/// Create a new empty state.
pub fn new(room: OwnedRoomId, store: Arc<OnceCell<EventCacheStoreLock>>) -> Self {
Self {
room,
store,
events: RoomEvents::default(),
waited_for_initial_prev_token: false,
}
/// Create a new state, or reload it from storage if it's been enabled.
pub async fn new(
room: OwnedRoomId,
store: Arc<OnceCell<EventCacheStoreLock>>,
) -> Result<Self, EventCacheError> {
let events = if let Some(store) = store.get() {
let locked = store.lock().await?;
let chunks = locked.reload_linked_chunk(&room).await?;
RoomEvents::with_initial_chunks(chunks)
} else {
RoomEvents::default()
};
Ok(Self { room, store, events, waited_for_initial_prev_token: false })
}
/// Propagate changes to the underlying storage.