diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index f72732509..1b7511bb9 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -1058,6 +1058,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { weak_room, own_user_id, store, + update_sender, linked_chunk_update_sender, threads, .. @@ -1071,6 +1072,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { own_user_id.clone(), weak_room.clone(), store.clone(), + update_sender.generic_update_sender().clone(), linked_chunk_update_sender.clone(), ) .await?; diff --git a/crates/matrix-sdk/src/event_cache/caches/room/updates.rs b/crates/matrix-sdk/src/event_cache/caches/room/updates.rs index ed47e227d..e7b40b4dd 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/updates.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/updates.rs @@ -138,6 +138,11 @@ impl RoomEventCacheUpdateSender { } } + /// Get the generic update sender. + pub(super) fn generic_update_sender(&self) -> &Sender { + &self.generic_sender + } + /// Create a new [`Receiver`] of [`RoomEventCacheUpdate`]. pub(super) fn new_room_receiver(&self) -> Receiver { self.room_sender.subscribe() diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs index 17e4bbd18..ee70f64e2 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs @@ -31,7 +31,9 @@ use tracing::{error, trace}; pub(super) use self::state::LockedThreadEventCacheState; use self::{pagination::ThreadPagination, updates::ThreadEventCacheUpdateSender}; use super::{ - super::Result, EventsOrigin, TimelineVectorDiffs, room::RoomEventCacheLinkedChunkUpdate, + super::Result, + EventsOrigin, TimelineVectorDiffs, + room::{RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate}, }; use crate::room::WeakRoom; @@ -42,6 +44,9 @@ pub(super) struct ThreadEventCache { /// The (non-cloneable) details of the `RoomEventCache`. struct ThreadEventCacheInner { + /// The room ID. + room_id: OwnedRoomId, + /// The thread root ID. thread_id: OwnedEventId, @@ -72,12 +77,14 @@ impl ThreadEventCache { own_user_id: OwnedUserId, weak_room: WeakRoom, store: EventCacheStoreLock, + generic_update_sender: Sender, linked_chunk_update_sender: Sender, ) -> Result { - let update_sender = ThreadEventCacheUpdateSender::new(); + let update_sender = ThreadEventCacheUpdateSender::new(generic_update_sender); Ok(Self { inner: Arc::new(ThreadEventCacheInner { + room_id: room_id.clone(), thread_id: thread_id.clone(), weak_room, state: LockedThreadEventCacheState::new( @@ -113,15 +120,17 @@ impl ThreadEventCache { ThreadPagination::new(self.inner.clone()) } - /// Clear a thread, after a gappy sync for instance. + /// Clear a thread. pub async fn clear(&mut self) -> Result<()> { let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?; if !updates_as_vector_diffs.is_empty() { - self.inner.update_sender.send(TimelineVectorDiffs { - diffs: updates_as_vector_diffs, - origin: EventsOrigin::Cache, - }); + self.inner.update_sender.send( + TimelineVectorDiffs { diffs: updates_as_vector_diffs, origin: EventsOrigin::Cache }, + // This function is part of the `RoomEventCache` flow. The generic update is + // handled by it. + None, + ); } Ok(()) @@ -150,10 +159,12 @@ impl ThreadEventCache { } if !timeline_event_diffs.is_empty() { - self.inner.update_sender.send(TimelineVectorDiffs { - diffs: timeline_event_diffs, - origin: EventsOrigin::Sync, - }); + self.inner.update_sender.send( + TimelineVectorDiffs { diffs: timeline_event_diffs, origin: EventsOrigin::Sync }, + // This function is part of the `RoomEventCache` flow. The generic update is + // handled by it. + None, + ); } Ok(()) @@ -180,10 +191,12 @@ impl ThreadEventCache { let timeline_event_diffs = state.thread_linked_chunk_mut().updates_as_vector_diffs(); if !timeline_event_diffs.is_empty() { - self.inner.update_sender.send(TimelineVectorDiffs { - diffs: timeline_event_diffs, - origin: EventsOrigin::Sync, - }); + self.inner.update_sender.send( + TimelineVectorDiffs { diffs: timeline_event_diffs, origin: EventsOrigin::Sync }, + // This function is part of the `RoomEventCache` flow. The generic update is + // handled by it. + None, + ); } Ok(()) diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/pagination.rs b/crates/matrix-sdk/src/event_cache/caches/thread/pagination.rs index 20b7ea3ad..761345b50 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/pagination.rs @@ -31,14 +31,13 @@ use super::{ }, pagination::{ BackPaginationOutcome, LoadMoreEventsBackwardsOutcome, PaginatedCache, Pagination, + SharedPaginationStatus, }, + room::RoomEventCacheGenericUpdate, }, ThreadEventCacheInner, }; -use crate::{ - event_cache::caches::pagination::SharedPaginationStatus, - room::{IncludeRelations, RelationsOptions}, -}; +use crate::room::{IncludeRelations, RelationsOptions}; /// Intermediate type because the `ThreadEventCache` state doesn't provide all /// the feature for the moment. @@ -261,10 +260,10 @@ impl PaginatedCache for ThreadEventCacheWrapper { reached_start: bool, ) -> BackPaginationOutcome { if !timeline_event_diffs.is_empty() { - self.cache.update_sender.send(TimelineVectorDiffs { - diffs: timeline_event_diffs, - origin: EventsOrigin::Cache, - }); + self.cache.update_sender.send( + TimelineVectorDiffs { diffs: timeline_event_diffs, origin: EventsOrigin::Cache }, + Some(RoomEventCacheGenericUpdate { room_id: self.cache.room_id.clone() }), + ); } BackPaginationOutcome { @@ -383,10 +382,13 @@ impl PaginatedCache for ThreadEventCacheWrapper { let timeline_event_diffs = state.thread_linked_chunk_mut().updates_as_vector_diffs(); if !timeline_event_diffs.is_empty() { - self.cache.update_sender.send(TimelineVectorDiffs { - diffs: timeline_event_diffs, - origin: EventsOrigin::Pagination, - }); + self.cache.update_sender.send( + TimelineVectorDiffs { + diffs: timeline_event_diffs, + origin: EventsOrigin::Pagination, + }, + Some(RoomEventCacheGenericUpdate { room_id: state.state.room_id.clone() }), + ); } Ok(Some(BackPaginationOutcome { reached_start, events })) diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs index 05a1cefed..e2f549fa3 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/state.rs @@ -37,7 +37,7 @@ use super::{ EventLocation, TimelineVectorDiffs, event_linked_chunk::{EventLinkedChunk, sort_positions_descending}, lock, - room::RoomEventCacheLinkedChunkUpdate, + room::{RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate}, }, ThreadEventCacheUpdateSender, }; @@ -201,9 +201,10 @@ impl<'a> lock::Reload for ThreadEventCacheStateLockWriteGuard<'a> { let diffs = self.state.thread_linked_chunk.updates_as_vector_diffs(); if !diffs.is_empty() { - self.state - .update_sender - .send(TimelineVectorDiffs { diffs, origin: EventsOrigin::Cache }); + self.state.update_sender.send( + TimelineVectorDiffs { diffs, origin: EventsOrigin::Cache }, + Some(RoomEventCacheGenericUpdate { room_id: self.room_id.to_owned() }), + ); } Ok(()) diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/updates.rs b/crates/matrix-sdk/src/event_cache/caches/thread/updates.rs index f042ba51a..e9b3378df 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/updates.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/updates.rs @@ -14,23 +14,32 @@ use tokio::sync::broadcast::{Receiver, Sender}; -use crate::event_cache::TimelineVectorDiffs; +use super::super::{super::RoomEventCacheGenericUpdate, TimelineVectorDiffs}; /// A small type to send updates in all channels. #[derive(Clone)] pub struct ThreadEventCacheUpdateSender { thread_sender: Sender, + generic_sender: Sender, } impl ThreadEventCacheUpdateSender { /// Create a new [`ThreadEventCacheUpdateSender`]. - pub fn new() -> Self { - Self { thread_sender: Sender::new(32) } + pub fn new(generic_sender: Sender) -> Self { + Self { thread_sender: Sender::new(32), generic_sender } } /// Send a [`TimelineVectorDiffs`]. - pub fn send(&self, thread_update: TimelineVectorDiffs) { + pub fn send( + &self, + thread_update: TimelineVectorDiffs, + generic_update: Option, + ) { let _ = self.thread_sender.send(thread_update); + + if let Some(generic_update) = generic_update { + let _ = self.generic_sender.send(generic_update); + } } /// Create a new [`Receiver`] of [`TimelineVectorDiffs`].