diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index 5892a1b55..23e9c148e 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -1082,6 +1082,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { let RoomEventCacheState { room_id, weak_room, + own_user_id, store, linked_chunk_update_sender, threads, @@ -1093,6 +1094,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { let thread_event_cache = ThreadEventCache::new( room_id.clone(), root_event_id, + own_user_id.clone(), weak_room.clone(), store.clone(), linked_chunk_update_sender.clone(), diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs index f7a80fa7a..426aea03b 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs @@ -20,7 +20,7 @@ mod state; use std::{fmt, sync::Arc}; use matrix_sdk_base::event_cache::{Event, store::EventCacheStoreLock}; -use ruma::{EventId, OwnedEventId, OwnedRoomId}; +use ruma::{EventId, OwnedEventId, OwnedRoomId, OwnedUserId}; pub(super) use state::LockedThreadEventCacheState; use tokio::sync::broadcast::{Receiver, Sender}; use tracing::error; @@ -59,6 +59,7 @@ impl ThreadEventCache { pub async fn new( room_id: OwnedRoomId, thread_id: OwnedEventId, + own_user_id: OwnedUserId, weak_room: WeakRoom, store: EventCacheStoreLock, linked_chunk_update_sender: Sender, @@ -70,6 +71,7 @@ impl ThreadEventCache { state: LockedThreadEventCacheState::new( room_id, thread_id, + own_user_id, store, linked_chunk_update_sender, ) diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/pagination.rs b/crates/matrix-sdk/src/event_cache/caches/thread/pagination.rs index aa950b99e..2ee5e6404 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/pagination.rs @@ -18,15 +18,18 @@ use eyeball::SharedObservable; use eyeball_im::VectorDiff; use matrix_sdk_base::{ event_cache::{Event, Gap}, - linked_chunk::ChunkContent, + linked_chunk::{ChunkContent, LinkedChunkId}, }; use ruma::api::Direction; use tracing::trace; use super::{ - super::super::{ - EventCacheError, EventsOrigin, Result, TimelineVectorDiffs, - caches::pagination::{ + super::{ + super::{ + EventCacheError, EventsOrigin, Result, TimelineVectorDiffs, + deduplicator::{DeduplicationOutcome, filter_duplicate_events}, + }, + pagination::{ BackPaginationOutcome, LoadMoreEventsBackwardsOutcome, PaginatedCache, Pagination, }, }, @@ -230,21 +233,31 @@ impl PaginatedCache for ThreadEventCacheWrapper { let topo_ordered_events = events.iter().cloned().rev().collect::>(); let new_gap = new_token.map(|token| Gap { token }); - let deduplication = state.filter_duplicate_events(topo_ordered_events); + let DeduplicationOutcome { + all_events: events, + in_memory_duplicated_event_ids, + in_store_duplicated_event_ids, + non_empty_all_duplicates: all_duplicates, + } = filter_duplicate_events( + &state.state.own_user_id, + &state.store, + LinkedChunkId::Thread(&state.state.room_id, &state.state.thread_id), + state.thread_linked_chunk(), + topo_ordered_events, + ) + .await?; - let (events, new_gap) = if deduplication.non_empty_all_duplicates { + let (events, new_gap) = if all_duplicates { // If all events are duplicates, we don't need to do anything; ignore // the new events and the new gap. (Vec::new(), None) } else { - assert!( - deduplication.in_store_duplicated_event_ids.is_empty(), - "persistent storage for threads is not implemented yet" - ); - state.remove_events(deduplication.in_memory_duplicated_event_ids, vec![]).await?; + state + .remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids) + .await?; // Keep events and the gap. - (deduplication.all_events, new_gap) + (events, new_gap) }; // Add the paginated events to the thread chunk. diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs index b545059cc..af2fbfa9e 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeSet; - use eyeball_im::VectorDiff; use matrix_sdk_base::{ event_cache::{ @@ -25,14 +23,14 @@ use matrix_sdk_base::{ }, }; use matrix_sdk_common::executor::spawn; -use ruma::{OwnedEventId, OwnedRoomId}; +use ruma::{OwnedEventId, OwnedRoomId, OwnedUserId}; use tokio::sync::broadcast::Sender; use tracing::{debug, error, instrument}; use super::super::{ super::{ EventCacheError, EventsOrigin, Result, - deduplicator::DeduplicationOutcome, + deduplicator::{DeduplicationOutcome, filter_duplicate_events}, persistence::{load_linked_chunk_metadata, send_updates_to_store}, }, TimelineVectorDiffs, @@ -43,11 +41,14 @@ use super::super::{ pub struct ThreadEventCacheState { /// The room owning this thread. - room_id: OwnedRoomId, + pub room_id: OwnedRoomId, /// The ID of the thread root event, which is the first event in the thread /// (and eventually the first in the linked chunk). - thread_id: OwnedEventId, + pub thread_id: OwnedEventId, + + /// The user's own user id. + pub own_user_id: OwnedUserId, /// Reference to the underlying backing store. store: EventCacheStoreLock, @@ -97,6 +98,7 @@ impl LockedThreadEventCacheState { pub async fn new( room_id: OwnedRoomId, thread_id: OwnedEventId, + own_user_id: OwnedUserId, store: EventCacheStoreLock, linked_chunk_update_sender: Sender, ) -> Result { @@ -159,6 +161,7 @@ impl LockedThreadEventCacheState { Ok(Self::new_inner(ThreadEventCacheState { room_id, thread_id, + own_user_id, store, thread_linked_chunk: EventLinkedChunk::with_initial_linked_chunk( linked_chunk, @@ -276,22 +279,28 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> { } pub async fn handle_sync(&mut self, events: Vec) -> Result>> { - let deduplication = self.filter_duplicate_events(events); + let DeduplicationOutcome { + all_events: events, + in_memory_duplicated_event_ids, + in_store_duplicated_event_ids, + non_empty_all_duplicates: all_duplicates, + } = filter_duplicate_events( + &self.state.own_user_id, + &self.store, + LinkedChunkId::Thread(&self.state.room_id, &self.state.thread_id), + &self.state.thread_linked_chunk, + events, + ) + .await?; - if deduplication.non_empty_all_duplicates { + if all_duplicates { // If all events are duplicates, we don't need to do anything; ignore // the new events. return Ok(Vec::new()); } // Remove the duplicated events from the thread chunk. - self.remove_events(deduplication.in_memory_duplicated_event_ids, vec![]).await?; - assert!( - deduplication.in_store_duplicated_event_ids.is_empty(), - "persistent storage for threads is not implemented yet" - ); - - let events = deduplication.all_events; + self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids).await?; self.state.thread_linked_chunk.push_live_events(None, &events); @@ -423,45 +432,4 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> { ) .await } - - /// Find duplicates in a thread, until there's persistent storage for - /// those. - /// - /// TODO: when persistent storage is implemented for thread, only use - /// the regular `filter_duplicate_events` method. - pub fn filter_duplicate_events(&self, mut new_events: Vec) -> DeduplicationOutcome { - let mut new_event_ids = BTreeSet::new(); - - new_events.retain(|event| { - // Only keep events with IDs, and those for which `insert` returns `true` - // (meaning they were not in the set). - event.event_id().is_some_and(|event_id| new_event_ids.insert(event_id)) - }); - - let in_memory_duplicated_event_ids: Vec<_> = self - .state - .thread_linked_chunk - .events() - .filter_map(|(position, event)| { - let event_id = event.event_id()?; - new_event_ids.contains(&event_id).then_some((event_id, position)) - }) - .collect(); - - // Right now, there's no persistent storage for threads. - let in_store_duplicated_event_ids = Vec::new(); - - let at_least_one_event = !new_events.is_empty(); - let all_duplicates = (in_memory_duplicated_event_ids.len() - + in_store_duplicated_event_ids.len()) - == new_events.len(); - let non_empty_all_duplicates = at_least_one_event && all_duplicates; - - DeduplicationOutcome { - all_events: new_events, - in_memory_duplicated_event_ids, - in_store_duplicated_event_ids, - non_empty_all_duplicates, - } - } }