feat(base): Add EventCacheStore::filter_duplicated_events.

This patch adds and implements the
`EventCacheStore::filter_duplicated_events` method. It is implemented on
the `MemoryStore` and the `SqliteEventCacheStore`.

This method remove the unique events and reutrn the duplicated events.
This commit is contained in:
Ivan Enderlin
2025-02-12 15:34:11 +01:00
committed by Benjamin Bouvier
parent ed16e91aed
commit d45addee10
5 changed files with 246 additions and 5 deletions

View File

@@ -123,6 +123,9 @@ pub trait EventCacheStoreIntegrationTests {
/// Test that removing a room from storage empties all associated data.
async fn test_remove_room(&self);
/// Test that filtering duplicated events works as expected.
async fn test_filter_duplicated_events(&self);
}
fn rebuild_linked_chunk(raws: Vec<RawChunk<Event, Gap>>) -> Option<LinkedChunk<3, Event, Gap>> {
@@ -502,6 +505,79 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
let r1_linked_chunk = self.reload_linked_chunk(r1).await.unwrap();
assert!(!r1_linked_chunk.is_empty());
}
async fn test_filter_duplicated_events(&self) {
let room_id = room_id!("!r0:matrix.org");
let another_room_id = room_id!("!r1:matrix.org");
let event = |msg: &str| make_test_event(room_id, msg);
let event_comte = event("comté");
let event_brigand = event("brigand du jorat");
let event_raclette = event("raclette");
let event_morbier = event("morbier");
let event_gruyere = event("gruyère");
let event_tome = event("tome");
let event_mont_dor = event("mont d'or");
self.handle_linked_chunk_updates(
room_id,
vec![
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Update::PushItems {
at: Position::new(CId::new(0), 0),
items: vec![event_comte.clone(), event_brigand.clone()],
},
Update::NewGapChunk {
previous: Some(CId::new(0)),
new: CId::new(1),
next: None,
gap: Gap { prev_token: "brillat-savarin".to_owned() },
},
Update::NewItemsChunk { previous: Some(CId::new(1)), new: CId::new(2), next: None },
Update::PushItems {
at: Position::new(CId::new(2), 0),
items: vec![event_morbier.clone(), event_mont_dor.clone()],
},
],
)
.await
.unwrap();
// Add other events in another room, to ensure filtering take the `room_id` into
// account.
self.handle_linked_chunk_updates(
another_room_id,
vec![
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Update::PushItems {
at: Position::new(CId::new(0), 0),
items: vec![event_tome.clone()],
},
],
)
.await
.unwrap();
let duplicated_events = self
.filter_duplicated_events(
room_id,
vec![
event_comte.event_id().unwrap().to_owned(),
event_raclette.event_id().unwrap().to_owned(),
event_morbier.event_id().unwrap().to_owned(),
event_gruyere.event_id().unwrap().to_owned(),
event_tome.event_id().unwrap().to_owned(),
event_mont_dor.event_id().unwrap().to_owned(),
],
)
.await
.unwrap();
assert_eq!(duplicated_events.len(), 3);
assert_eq!(duplicated_events[0], event_comte.event_id().unwrap());
assert_eq!(duplicated_events[1], event_morbier.event_id().unwrap());
assert_eq!(duplicated_events[2], event_mont_dor.event_id().unwrap());
}
}
/// Macro building to allow your `EventCacheStore` implementation to run the
@@ -584,6 +660,13 @@ macro_rules! event_cache_store_integration_tests {
get_event_cache_store().await.unwrap().into_event_cache_store();
event_cache_store.test_remove_room().await;
}
#[async_test]
async fn test_filter_duplicated_events() {
let event_cache_store =
get_event_cache_store().await.unwrap().into_event_cache_store();
event_cache_store.test_filter_duplicated_events().await;
}
}
};
}

View File

@@ -26,7 +26,7 @@ use matrix_sdk_common::{
};
use ruma::{
time::{Instant, SystemTime},
MxcUri, OwnedMxcUri, RoomId,
MxcUri, OwnedEventId, OwnedMxcUri, RoomId,
};
use super::{
@@ -148,6 +148,35 @@ impl EventCacheStore for MemoryStore {
Ok(())
}
async fn filter_duplicated_events(
&self,
room_id: &RoomId,
mut events: Vec<OwnedEventId>,
) -> Result<Vec<OwnedEventId>, Self::Error> {
// Collect all duplicated events.
let inner = self.inner.read().unwrap();
let mut duplicated_events = Vec::new();
for event in inner.events.unordered_events(room_id) {
// If `events` is empty, we can short-circuit.
if events.is_empty() {
break;
}
if let Some(event_id_a) = event.event_id() {
// This event exists in the store event!
if let Some(position) =
events.iter().position(|event_id_b| &event_id_a == event_id_b)
{
duplicated_events.push(events.remove(position));
}
}
}
Ok(duplicated_events)
}
async fn add_media_content(
&self,
request: &MediaRequestParameters,

View File

@@ -19,7 +19,7 @@ use matrix_sdk_common::{
linked_chunk::{RawChunk, Update},
AsyncTraitDeps,
};
use ruma::{MxcUri, RoomId};
use ruma::{MxcUri, OwnedEventId, RoomId};
use super::{
media::{IgnoreMediaRetentionPolicy, MediaRetentionPolicy},
@@ -80,6 +80,14 @@ pub trait EventCacheStore: AsyncTraitDeps {
/// using the above [`Self::handle_linked_chunk_updates`] methods.
async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error>;
/// Given a set of event ID, remove the unique events and return the
/// duplicated events.
async fn filter_duplicated_events(
&self,
room_id: &RoomId,
events: Vec<OwnedEventId>,
) -> Result<Vec<OwnedEventId>, Self::Error>;
/// Add a media file's content in the media store.
///
/// # Arguments
@@ -247,6 +255,14 @@ impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
self.0.clear_all_rooms_chunks().await.map_err(Into::into)
}
async fn filter_duplicated_events(
&self,
room_id: &RoomId,
events: Vec<OwnedEventId>,
) -> Result<Vec<OwnedEventId>, Self::Error> {
self.0.filter_duplicated_events(room_id, events).await.map_err(Into::into)
}
async fn add_media_content(
&self,
request: &MediaRequestParameters,

View File

@@ -292,6 +292,21 @@ impl<Item, Gap> RelationalLinkedChunk<Item, Gap> {
}
}
}
/// Return an iterator that yields events of a particular room with no
/// particular order.
pub fn unordered_events<'a>(&'a self, room_id: &'a RoomId) -> impl Iterator<Item = &'a Item> {
self.items.iter().filter_map(move |item_row| {
if item_row.room_id == room_id {
match &item_row.item {
Either::Item(item) => Some(item),
Either::Gap(..) => None,
}
} else {
None
}
})
}
}
impl<Item, Gap> RelationalLinkedChunk<Item, Gap>
@@ -1002,4 +1017,39 @@ mod tests {
],
);
}
#[test]
fn test_unordered_events() {
let room_id = room_id!("!r0:matrix.org");
let other_room_id = room_id!("!r1:matrix.org");
let mut relational_linked_chunk = RelationalLinkedChunk::<char, ()>::new();
relational_linked_chunk.apply_updates(
room_id,
vec![
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Update::PushItems { at: Position::new(CId::new(0), 0), items: vec!['a', 'b', 'c'] },
Update::NewItemsChunk { previous: Some(CId::new(0)), new: CId::new(1), next: None },
Update::PushItems { at: Position::new(CId::new(0), 0), items: vec!['d', 'e', 'f'] },
],
);
relational_linked_chunk.apply_updates(
other_room_id,
vec![
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Update::PushItems { at: Position::new(CId::new(0), 0), items: vec!['x', 'y', 'z'] },
],
);
let mut events = relational_linked_chunk.unordered_events(room_id);
assert_eq!(*events.next().unwrap(), 'a');
assert_eq!(*events.next().unwrap(), 'b');
assert_eq!(*events.next().unwrap(), 'c');
assert_eq!(*events.next().unwrap(), 'd');
assert_eq!(*events.next().unwrap(), 'e');
assert_eq!(*events.next().unwrap(), 'f');
assert!(events.next().is_none());
}
}

View File

@@ -14,7 +14,7 @@
//! A sqlite-based backend for the [`EventCacheStore`].
use std::{borrow::Cow, fmt, path::Path, sync::Arc};
use std::{borrow::Cow, fmt, iter::once, path::Path, sync::Arc};
use async_trait::async_trait;
use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
@@ -33,8 +33,8 @@ use matrix_sdk_base::{
media::{MediaRequestParameters, UniqueKey},
};
use matrix_sdk_store_encryption::StoreCipher;
use ruma::{time::SystemTime, MilliSecondsSinceUnixEpoch, MxcUri, RoomId};
use rusqlite::{params_from_iter, OptionalExtension, Transaction, TransactionBehavior};
use ruma::{time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri, OwnedEventId, RoomId};
use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
use tokio::fs;
use tracing::{debug, trace};
@@ -622,6 +622,69 @@ impl EventCacheStore for SqliteEventCacheStore {
Ok(())
}
async fn filter_duplicated_events(
&self,
room_id: &RoomId,
events: Vec<OwnedEventId>,
) -> Result<Vec<OwnedEventId>, Self::Error> {
// Select all events that exist in the store, i.e. the duplicates.
let room_id = room_id.to_owned();
let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
self.acquire()
.await?
.with_transaction(move |txn| -> Result<_> {
txn.chunk_large_query_over(events, None, move |txn, events| {
let query = format!(
"SELECT event_id FROM events WHERE room_id = ? AND event_id IN ({})",
repeat_vars(events.len()),
);
let parameters = params_from_iter(
// parameter for `room_id = ?`
once(
hashed_room_id
.to_sql()
// SAFETY: it cannot fail since `Key::to_sql` never fails
.unwrap(),
)
// parameters for `event_id IN (…)`
.chain(events.iter().map(|event| {
event
.as_str()
.to_sql()
// SAFETY: it cannot fail since `str::to_sql` never fails
.unwrap()
})),
);
let mut duplicated_events = Vec::new();
for duplicated_event in txn
.prepare(&query)?
.query_map(parameters, |row| row.get::<_, Option<String>>(0))?
{
let duplicated_event = duplicated_event?;
let Some(duplicated_event) = duplicated_event else {
// Event ID is malformed, let's skip it.
continue;
};
let Ok(duplicated_event) = EventId::parse(duplicated_event) else {
// Normally unreachable, but the event ID has been stored even if it is
// malformed, let's skip it.
continue;
};
duplicated_events.push(duplicated_event);
}
Ok(duplicated_events)
})
})
.await
}
async fn add_media_content(
&self,
request: &MediaRequestParameters,