feat(sdk): Use the in-memory and in-store deduplicator.

This patch replaces
`ThreadEventCacheStateLockWriteGuard::filter_duplicate_events` by
`deduplicator::filter_duplicate_events`. First off, this function is
shared between multiple event caches. Second, it deduplicates events
regarding in-memory and in-store state.

The calls to `ThreadEventCacheStateLockWriteGuard::remove_events` have
been updated to pass the duplicated in-store events.
This commit is contained in:
Ivan Enderlin
2026-03-19 09:13:06 +01:00
parent 0d765751eb
commit fd78d0c3fa
4 changed files with 54 additions and 69 deletions

View File

@@ -1082,6 +1082,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
let RoomEventCacheState {
room_id,
weak_room,
own_user_id,
store,
linked_chunk_update_sender,
threads,
@@ -1093,6 +1094,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
let thread_event_cache = ThreadEventCache::new(
room_id.clone(),
root_event_id,
own_user_id.clone(),
weak_room.clone(),
store.clone(),
linked_chunk_update_sender.clone(),

View File

@@ -20,7 +20,7 @@ mod state;
use std::{fmt, sync::Arc};
use matrix_sdk_base::event_cache::{Event, store::EventCacheStoreLock};
use ruma::{EventId, OwnedEventId, OwnedRoomId};
use ruma::{EventId, OwnedEventId, OwnedRoomId, OwnedUserId};
pub(super) use state::LockedThreadEventCacheState;
use tokio::sync::broadcast::{Receiver, Sender};
use tracing::error;
@@ -59,6 +59,7 @@ impl ThreadEventCache {
pub async fn new(
room_id: OwnedRoomId,
thread_id: OwnedEventId,
own_user_id: OwnedUserId,
weak_room: WeakRoom,
store: EventCacheStoreLock,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
@@ -70,6 +71,7 @@ impl ThreadEventCache {
state: LockedThreadEventCacheState::new(
room_id,
thread_id,
own_user_id,
store,
linked_chunk_update_sender,
)

View File

@@ -18,15 +18,18 @@ use eyeball::SharedObservable;
use eyeball_im::VectorDiff;
use matrix_sdk_base::{
event_cache::{Event, Gap},
linked_chunk::ChunkContent,
linked_chunk::{ChunkContent, LinkedChunkId},
};
use ruma::api::Direction;
use tracing::trace;
use super::{
super::super::{
EventCacheError, EventsOrigin, Result, TimelineVectorDiffs,
caches::pagination::{
super::{
super::{
EventCacheError, EventsOrigin, Result, TimelineVectorDiffs,
deduplicator::{DeduplicationOutcome, filter_duplicate_events},
},
pagination::{
BackPaginationOutcome, LoadMoreEventsBackwardsOutcome, PaginatedCache, Pagination,
},
},
@@ -230,21 +233,31 @@ impl PaginatedCache for ThreadEventCacheWrapper {
let topo_ordered_events = events.iter().cloned().rev().collect::<Vec<_>>();
let new_gap = new_token.map(|token| Gap { token });
let deduplication = state.filter_duplicate_events(topo_ordered_events);
let DeduplicationOutcome {
all_events: events,
in_memory_duplicated_event_ids,
in_store_duplicated_event_ids,
non_empty_all_duplicates: all_duplicates,
} = filter_duplicate_events(
&state.state.own_user_id,
&state.store,
LinkedChunkId::Thread(&state.state.room_id, &state.state.thread_id),
state.thread_linked_chunk(),
topo_ordered_events,
)
.await?;
let (events, new_gap) = if deduplication.non_empty_all_duplicates {
let (events, new_gap) = if all_duplicates {
// If all events are duplicates, we don't need to do anything; ignore
// the new events and the new gap.
(Vec::new(), None)
} else {
assert!(
deduplication.in_store_duplicated_event_ids.is_empty(),
"persistent storage for threads is not implemented yet"
);
state.remove_events(deduplication.in_memory_duplicated_event_ids, vec![]).await?;
state
.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
.await?;
// Keep events and the gap.
(deduplication.all_events, new_gap)
(events, new_gap)
};
// Add the paginated events to the thread chunk.

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeSet;
use eyeball_im::VectorDiff;
use matrix_sdk_base::{
event_cache::{
@@ -25,14 +23,14 @@ use matrix_sdk_base::{
},
};
use matrix_sdk_common::executor::spawn;
use ruma::{OwnedEventId, OwnedRoomId};
use ruma::{OwnedEventId, OwnedRoomId, OwnedUserId};
use tokio::sync::broadcast::Sender;
use tracing::{debug, error, instrument};
use super::super::{
super::{
EventCacheError, EventsOrigin, Result,
deduplicator::DeduplicationOutcome,
deduplicator::{DeduplicationOutcome, filter_duplicate_events},
persistence::{load_linked_chunk_metadata, send_updates_to_store},
},
TimelineVectorDiffs,
@@ -43,11 +41,14 @@ use super::super::{
pub struct ThreadEventCacheState {
/// The room owning this thread.
room_id: OwnedRoomId,
pub room_id: OwnedRoomId,
/// The ID of the thread root event, which is the first event in the thread
/// (and eventually the first in the linked chunk).
thread_id: OwnedEventId,
pub thread_id: OwnedEventId,
/// The user's own user id.
pub own_user_id: OwnedUserId,
/// Reference to the underlying backing store.
store: EventCacheStoreLock,
@@ -97,6 +98,7 @@ impl LockedThreadEventCacheState {
pub async fn new(
room_id: OwnedRoomId,
thread_id: OwnedEventId,
own_user_id: OwnedUserId,
store: EventCacheStoreLock,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
) -> Result<Self> {
@@ -159,6 +161,7 @@ impl LockedThreadEventCacheState {
Ok(Self::new_inner(ThreadEventCacheState {
room_id,
thread_id,
own_user_id,
store,
thread_linked_chunk: EventLinkedChunk::with_initial_linked_chunk(
linked_chunk,
@@ -276,22 +279,28 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> {
}
pub async fn handle_sync(&mut self, events: Vec<Event>) -> Result<Vec<VectorDiff<Event>>> {
let deduplication = self.filter_duplicate_events(events);
let DeduplicationOutcome {
all_events: events,
in_memory_duplicated_event_ids,
in_store_duplicated_event_ids,
non_empty_all_duplicates: all_duplicates,
} = filter_duplicate_events(
&self.state.own_user_id,
&self.store,
LinkedChunkId::Thread(&self.state.room_id, &self.state.thread_id),
&self.state.thread_linked_chunk,
events,
)
.await?;
if deduplication.non_empty_all_duplicates {
if all_duplicates {
// If all events are duplicates, we don't need to do anything; ignore
// the new events.
return Ok(Vec::new());
}
// Remove the duplicated events from the thread chunk.
self.remove_events(deduplication.in_memory_duplicated_event_ids, vec![]).await?;
assert!(
deduplication.in_store_duplicated_event_ids.is_empty(),
"persistent storage for threads is not implemented yet"
);
let events = deduplication.all_events;
self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids).await?;
self.state.thread_linked_chunk.push_live_events(None, &events);
@@ -423,45 +432,4 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> {
)
.await
}
/// Find duplicates in a thread, until there's persistent storage for
/// those.
///
/// TODO: when persistent storage is implemented for thread, only use
/// the regular `filter_duplicate_events` method.
pub fn filter_duplicate_events(&self, mut new_events: Vec<Event>) -> DeduplicationOutcome {
let mut new_event_ids = BTreeSet::new();
new_events.retain(|event| {
// Only keep events with IDs, and those for which `insert` returns `true`
// (meaning they were not in the set).
event.event_id().is_some_and(|event_id| new_event_ids.insert(event_id))
});
let in_memory_duplicated_event_ids: Vec<_> = self
.state
.thread_linked_chunk
.events()
.filter_map(|(position, event)| {
let event_id = event.event_id()?;
new_event_ids.contains(&event_id).then_some((event_id, position))
})
.collect();
// Right now, there's no persistent storage for threads.
let in_store_duplicated_event_ids = Vec::new();
let at_least_one_event = !new_events.is_empty();
let all_duplicates = (in_memory_duplicated_event_ids.len()
+ in_store_duplicated_event_ids.len())
== new_events.len();
let non_empty_all_duplicates = at_least_one_event && all_duplicates;
DeduplicationOutcome {
all_events: new_events,
in_memory_duplicated_event_ids,
in_store_duplicated_event_ids,
non_empty_all_duplicates,
}
}
}