diff --git a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs index c694933be..d863641ae 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs @@ -123,6 +123,9 @@ pub trait EventCacheStoreIntegrationTests { /// Test that removing a room from storage empties all associated data. async fn test_remove_room(&self); + + /// Test that filtering duplicated events works as expected. + async fn test_filter_duplicated_events(&self); } fn rebuild_linked_chunk(raws: Vec>) -> Option> { @@ -502,6 +505,79 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore { let r1_linked_chunk = self.reload_linked_chunk(r1).await.unwrap(); assert!(!r1_linked_chunk.is_empty()); } + + async fn test_filter_duplicated_events(&self) { + let room_id = room_id!("!r0:matrix.org"); + let another_room_id = room_id!("!r1:matrix.org"); + let event = |msg: &str| make_test_event(room_id, msg); + + let event_comte = event("comté"); + let event_brigand = event("brigand du jorat"); + let event_raclette = event("raclette"); + let event_morbier = event("morbier"); + let event_gruyere = event("gruyère"); + let event_tome = event("tome"); + let event_mont_dor = event("mont d'or"); + + self.handle_linked_chunk_updates( + room_id, + vec![ + Update::NewItemsChunk { previous: None, new: CId::new(0), next: None }, + Update::PushItems { + at: Position::new(CId::new(0), 0), + items: vec![event_comte.clone(), event_brigand.clone()], + }, + Update::NewGapChunk { + previous: Some(CId::new(0)), + new: CId::new(1), + next: None, + gap: Gap { prev_token: "brillat-savarin".to_owned() }, + }, + Update::NewItemsChunk { previous: Some(CId::new(1)), new: CId::new(2), next: None }, + Update::PushItems { + at: Position::new(CId::new(2), 0), + items: vec![event_morbier.clone(), event_mont_dor.clone()], + }, + ], + ) + .await + .unwrap(); + + // Add other events in another room, to ensure filtering take the `room_id` into + // account. + self.handle_linked_chunk_updates( + another_room_id, + vec![ + Update::NewItemsChunk { previous: None, new: CId::new(0), next: None }, + Update::PushItems { + at: Position::new(CId::new(0), 0), + items: vec![event_tome.clone()], + }, + ], + ) + .await + .unwrap(); + + let duplicated_events = self + .filter_duplicated_events( + room_id, + vec![ + event_comte.event_id().unwrap().to_owned(), + event_raclette.event_id().unwrap().to_owned(), + event_morbier.event_id().unwrap().to_owned(), + event_gruyere.event_id().unwrap().to_owned(), + event_tome.event_id().unwrap().to_owned(), + event_mont_dor.event_id().unwrap().to_owned(), + ], + ) + .await + .unwrap(); + + assert_eq!(duplicated_events.len(), 3); + assert_eq!(duplicated_events[0], event_comte.event_id().unwrap()); + assert_eq!(duplicated_events[1], event_morbier.event_id().unwrap()); + assert_eq!(duplicated_events[2], event_mont_dor.event_id().unwrap()); + } } /// Macro building to allow your `EventCacheStore` implementation to run the @@ -584,6 +660,13 @@ macro_rules! event_cache_store_integration_tests { get_event_cache_store().await.unwrap().into_event_cache_store(); event_cache_store.test_remove_room().await; } + + #[async_test] + async fn test_filter_duplicated_events() { + let event_cache_store = + get_event_cache_store().await.unwrap().into_event_cache_store(); + event_cache_store.test_filter_duplicated_events().await; + } } }; } diff --git a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs index 69dce88a7..27f1ec0b4 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs @@ -26,7 +26,7 @@ use matrix_sdk_common::{ }; use ruma::{ time::{Instant, SystemTime}, - MxcUri, OwnedMxcUri, RoomId, + MxcUri, OwnedEventId, OwnedMxcUri, RoomId, }; use super::{ @@ -148,6 +148,35 @@ impl EventCacheStore for MemoryStore { Ok(()) } + async fn filter_duplicated_events( + &self, + room_id: &RoomId, + mut events: Vec, + ) -> Result, Self::Error> { + // Collect all duplicated events. + let inner = self.inner.read().unwrap(); + + let mut duplicated_events = Vec::new(); + + for event in inner.events.unordered_events(room_id) { + // If `events` is empty, we can short-circuit. + if events.is_empty() { + break; + } + + if let Some(event_id_a) = event.event_id() { + // This event exists in the store event! + if let Some(position) = + events.iter().position(|event_id_b| &event_id_a == event_id_b) + { + duplicated_events.push(events.remove(position)); + } + } + } + + Ok(duplicated_events) + } + async fn add_media_content( &self, request: &MediaRequestParameters, diff --git a/crates/matrix-sdk-base/src/event_cache/store/traits.rs b/crates/matrix-sdk-base/src/event_cache/store/traits.rs index b9fdeddec..a7c54c29a 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/traits.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/traits.rs @@ -19,7 +19,7 @@ use matrix_sdk_common::{ linked_chunk::{RawChunk, Update}, AsyncTraitDeps, }; -use ruma::{MxcUri, RoomId}; +use ruma::{MxcUri, OwnedEventId, RoomId}; use super::{ media::{IgnoreMediaRetentionPolicy, MediaRetentionPolicy}, @@ -80,6 +80,14 @@ pub trait EventCacheStore: AsyncTraitDeps { /// using the above [`Self::handle_linked_chunk_updates`] methods. async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error>; + /// Given a set of event ID, remove the unique events and return the + /// duplicated events. + async fn filter_duplicated_events( + &self, + room_id: &RoomId, + events: Vec, + ) -> Result, Self::Error>; + /// Add a media file's content in the media store. /// /// # Arguments @@ -247,6 +255,14 @@ impl EventCacheStore for EraseEventCacheStoreError { self.0.clear_all_rooms_chunks().await.map_err(Into::into) } + async fn filter_duplicated_events( + &self, + room_id: &RoomId, + events: Vec, + ) -> Result, Self::Error> { + self.0.filter_duplicated_events(room_id, events).await.map_err(Into::into) + } + async fn add_media_content( &self, request: &MediaRequestParameters, diff --git a/crates/matrix-sdk-common/src/linked_chunk/relational.rs b/crates/matrix-sdk-common/src/linked_chunk/relational.rs index 833c0f724..f1d923c9b 100644 --- a/crates/matrix-sdk-common/src/linked_chunk/relational.rs +++ b/crates/matrix-sdk-common/src/linked_chunk/relational.rs @@ -292,6 +292,21 @@ impl RelationalLinkedChunk { } } } + + /// Return an iterator that yields events of a particular room with no + /// particular order. + pub fn unordered_events<'a>(&'a self, room_id: &'a RoomId) -> impl Iterator { + self.items.iter().filter_map(move |item_row| { + if item_row.room_id == room_id { + match &item_row.item { + Either::Item(item) => Some(item), + Either::Gap(..) => None, + } + } else { + None + } + }) + } } impl RelationalLinkedChunk @@ -1002,4 +1017,39 @@ mod tests { ], ); } + + #[test] + fn test_unordered_events() { + let room_id = room_id!("!r0:matrix.org"); + let other_room_id = room_id!("!r1:matrix.org"); + let mut relational_linked_chunk = RelationalLinkedChunk::::new(); + + relational_linked_chunk.apply_updates( + room_id, + vec![ + Update::NewItemsChunk { previous: None, new: CId::new(0), next: None }, + Update::PushItems { at: Position::new(CId::new(0), 0), items: vec!['a', 'b', 'c'] }, + Update::NewItemsChunk { previous: Some(CId::new(0)), new: CId::new(1), next: None }, + Update::PushItems { at: Position::new(CId::new(0), 0), items: vec!['d', 'e', 'f'] }, + ], + ); + + relational_linked_chunk.apply_updates( + other_room_id, + vec![ + Update::NewItemsChunk { previous: None, new: CId::new(0), next: None }, + Update::PushItems { at: Position::new(CId::new(0), 0), items: vec!['x', 'y', 'z'] }, + ], + ); + + let mut events = relational_linked_chunk.unordered_events(room_id); + + assert_eq!(*events.next().unwrap(), 'a'); + assert_eq!(*events.next().unwrap(), 'b'); + assert_eq!(*events.next().unwrap(), 'c'); + assert_eq!(*events.next().unwrap(), 'd'); + assert_eq!(*events.next().unwrap(), 'e'); + assert_eq!(*events.next().unwrap(), 'f'); + assert!(events.next().is_none()); + } } diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index c02ee8a94..83a69f2a0 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -14,7 +14,7 @@ //! A sqlite-based backend for the [`EventCacheStore`]. -use std::{borrow::Cow, fmt, path::Path, sync::Arc}; +use std::{borrow::Cow, fmt, iter::once, path::Path, sync::Arc}; use async_trait::async_trait; use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime}; @@ -33,8 +33,8 @@ use matrix_sdk_base::{ media::{MediaRequestParameters, UniqueKey}, }; use matrix_sdk_store_encryption::StoreCipher; -use ruma::{time::SystemTime, MilliSecondsSinceUnixEpoch, MxcUri, RoomId}; -use rusqlite::{params_from_iter, OptionalExtension, Transaction, TransactionBehavior}; +use ruma::{time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri, OwnedEventId, RoomId}; +use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior}; use tokio::fs; use tracing::{debug, trace}; @@ -622,6 +622,69 @@ impl EventCacheStore for SqliteEventCacheStore { Ok(()) } + async fn filter_duplicated_events( + &self, + room_id: &RoomId, + events: Vec, + ) -> Result, Self::Error> { + // Select all events that exist in the store, i.e. the duplicates. + let room_id = room_id.to_owned(); + let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id); + + self.acquire() + .await? + .with_transaction(move |txn| -> Result<_> { + txn.chunk_large_query_over(events, None, move |txn, events| { + let query = format!( + "SELECT event_id FROM events WHERE room_id = ? AND event_id IN ({})", + repeat_vars(events.len()), + ); + let parameters = params_from_iter( + // parameter for `room_id = ?` + once( + hashed_room_id + .to_sql() + // SAFETY: it cannot fail since `Key::to_sql` never fails + .unwrap(), + ) + // parameters for `event_id IN (…)` + .chain(events.iter().map(|event| { + event + .as_str() + .to_sql() + // SAFETY: it cannot fail since `str::to_sql` never fails + .unwrap() + })), + ); + + let mut duplicated_events = Vec::new(); + + for duplicated_event in txn + .prepare(&query)? + .query_map(parameters, |row| row.get::<_, Option>(0))? + { + let duplicated_event = duplicated_event?; + + let Some(duplicated_event) = duplicated_event else { + // Event ID is malformed, let's skip it. + continue; + }; + + let Ok(duplicated_event) = EventId::parse(duplicated_event) else { + // Normally unreachable, but the event ID has been stored even if it is + // malformed, let's skip it. + continue; + }; + + duplicated_events.push(duplicated_event); + } + + Ok(duplicated_events) + }) + }) + .await + } + async fn add_media_content( &self, request: &MediaRequestParameters,