feat(sdk): ThreadEventCacheUpdateSender can send RoomEventCacheGenericUpdate.

This patch updates `ThreadEventCacheUpdateSender` to send
`RoomEventCacheGenericUpdate`. This is useful for `ThreadPagination`.
This commit is contained in:
Ivan Enderlin
2026-04-17 16:02:02 +02:00
parent 63a30a2ea9
commit 1693e0f30d
6 changed files with 67 additions and 35 deletions

View File

@@ -1058,6 +1058,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
weak_room,
own_user_id,
store,
update_sender,
linked_chunk_update_sender,
threads,
..
@@ -1071,6 +1072,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
own_user_id.clone(),
weak_room.clone(),
store.clone(),
update_sender.generic_update_sender().clone(),
linked_chunk_update_sender.clone(),
)
.await?;

View File

@@ -138,6 +138,11 @@ impl RoomEventCacheUpdateSender {
}
}
/// Get the generic update sender.
pub(super) fn generic_update_sender(&self) -> &Sender<RoomEventCacheGenericUpdate> {
&self.generic_sender
}
/// Create a new [`Receiver`] of [`RoomEventCacheUpdate`].
pub(super) fn new_room_receiver(&self) -> Receiver<RoomEventCacheUpdate> {
self.room_sender.subscribe()

View File

@@ -31,7 +31,9 @@ use tracing::{error, trace};
pub(super) use self::state::LockedThreadEventCacheState;
use self::{pagination::ThreadPagination, updates::ThreadEventCacheUpdateSender};
use super::{
super::Result, EventsOrigin, TimelineVectorDiffs, room::RoomEventCacheLinkedChunkUpdate,
super::Result,
EventsOrigin, TimelineVectorDiffs,
room::{RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate},
};
use crate::room::WeakRoom;
@@ -42,6 +44,9 @@ pub(super) struct ThreadEventCache {
/// The (non-cloneable) details of the `RoomEventCache`.
struct ThreadEventCacheInner {
/// The room ID.
room_id: OwnedRoomId,
/// The thread root ID.
thread_id: OwnedEventId,
@@ -72,12 +77,14 @@ impl ThreadEventCache {
own_user_id: OwnedUserId,
weak_room: WeakRoom,
store: EventCacheStoreLock,
generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
) -> Result<Self> {
let update_sender = ThreadEventCacheUpdateSender::new();
let update_sender = ThreadEventCacheUpdateSender::new(generic_update_sender);
Ok(Self {
inner: Arc::new(ThreadEventCacheInner {
room_id: room_id.clone(),
thread_id: thread_id.clone(),
weak_room,
state: LockedThreadEventCacheState::new(
@@ -113,15 +120,17 @@ impl ThreadEventCache {
ThreadPagination::new(self.inner.clone())
}
/// Clear a thread, after a gappy sync for instance.
/// Clear a thread.
pub async fn clear(&mut self) -> Result<()> {
let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
if !updates_as_vector_diffs.is_empty() {
self.inner.update_sender.send(TimelineVectorDiffs {
diffs: updates_as_vector_diffs,
origin: EventsOrigin::Cache,
});
self.inner.update_sender.send(
TimelineVectorDiffs { diffs: updates_as_vector_diffs, origin: EventsOrigin::Cache },
// This function is part of the `RoomEventCache` flow. The generic update is
// handled by it.
None,
);
}
Ok(())
@@ -150,10 +159,12 @@ impl ThreadEventCache {
}
if !timeline_event_diffs.is_empty() {
self.inner.update_sender.send(TimelineVectorDiffs {
diffs: timeline_event_diffs,
origin: EventsOrigin::Sync,
});
self.inner.update_sender.send(
TimelineVectorDiffs { diffs: timeline_event_diffs, origin: EventsOrigin::Sync },
// This function is part of the `RoomEventCache` flow. The generic update is
// handled by it.
None,
);
}
Ok(())
@@ -180,10 +191,12 @@ impl ThreadEventCache {
let timeline_event_diffs = state.thread_linked_chunk_mut().updates_as_vector_diffs();
if !timeline_event_diffs.is_empty() {
self.inner.update_sender.send(TimelineVectorDiffs {
diffs: timeline_event_diffs,
origin: EventsOrigin::Sync,
});
self.inner.update_sender.send(
TimelineVectorDiffs { diffs: timeline_event_diffs, origin: EventsOrigin::Sync },
// This function is part of the `RoomEventCache` flow. The generic update is
// handled by it.
None,
);
}
Ok(())

View File

@@ -31,14 +31,13 @@ use super::{
},
pagination::{
BackPaginationOutcome, LoadMoreEventsBackwardsOutcome, PaginatedCache, Pagination,
SharedPaginationStatus,
},
room::RoomEventCacheGenericUpdate,
},
ThreadEventCacheInner,
};
use crate::{
event_cache::caches::pagination::SharedPaginationStatus,
room::{IncludeRelations, RelationsOptions},
};
use crate::room::{IncludeRelations, RelationsOptions};
/// Intermediate type because the `ThreadEventCache` state doesn't provide all
/// the feature for the moment.
@@ -261,10 +260,10 @@ impl PaginatedCache for ThreadEventCacheWrapper {
reached_start: bool,
) -> BackPaginationOutcome {
if !timeline_event_diffs.is_empty() {
self.cache.update_sender.send(TimelineVectorDiffs {
diffs: timeline_event_diffs,
origin: EventsOrigin::Cache,
});
self.cache.update_sender.send(
TimelineVectorDiffs { diffs: timeline_event_diffs, origin: EventsOrigin::Cache },
Some(RoomEventCacheGenericUpdate { room_id: self.cache.room_id.clone() }),
);
}
BackPaginationOutcome {
@@ -383,10 +382,13 @@ impl PaginatedCache for ThreadEventCacheWrapper {
let timeline_event_diffs = state.thread_linked_chunk_mut().updates_as_vector_diffs();
if !timeline_event_diffs.is_empty() {
self.cache.update_sender.send(TimelineVectorDiffs {
diffs: timeline_event_diffs,
origin: EventsOrigin::Pagination,
});
self.cache.update_sender.send(
TimelineVectorDiffs {
diffs: timeline_event_diffs,
origin: EventsOrigin::Pagination,
},
Some(RoomEventCacheGenericUpdate { room_id: state.state.room_id.clone() }),
);
}
Ok(Some(BackPaginationOutcome { reached_start, events }))

View File

@@ -37,7 +37,7 @@ use super::{
EventLocation, TimelineVectorDiffs,
event_linked_chunk::{EventLinkedChunk, sort_positions_descending},
lock,
room::RoomEventCacheLinkedChunkUpdate,
room::{RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate},
},
ThreadEventCacheUpdateSender,
};
@@ -201,9 +201,10 @@ impl<'a> lock::Reload for ThreadEventCacheStateLockWriteGuard<'a> {
let diffs = self.state.thread_linked_chunk.updates_as_vector_diffs();
if !diffs.is_empty() {
self.state
.update_sender
.send(TimelineVectorDiffs { diffs, origin: EventsOrigin::Cache });
self.state.update_sender.send(
TimelineVectorDiffs { diffs, origin: EventsOrigin::Cache },
Some(RoomEventCacheGenericUpdate { room_id: self.room_id.to_owned() }),
);
}
Ok(())

View File

@@ -14,23 +14,32 @@
use tokio::sync::broadcast::{Receiver, Sender};
use crate::event_cache::TimelineVectorDiffs;
use super::super::{super::RoomEventCacheGenericUpdate, TimelineVectorDiffs};
/// A small type to send updates in all channels.
#[derive(Clone)]
pub struct ThreadEventCacheUpdateSender {
thread_sender: Sender<TimelineVectorDiffs>,
generic_sender: Sender<RoomEventCacheGenericUpdate>,
}
impl ThreadEventCacheUpdateSender {
/// Create a new [`ThreadEventCacheUpdateSender`].
pub fn new() -> Self {
Self { thread_sender: Sender::new(32) }
pub fn new(generic_sender: Sender<RoomEventCacheGenericUpdate>) -> Self {
Self { thread_sender: Sender::new(32), generic_sender }
}
/// Send a [`TimelineVectorDiffs`].
pub fn send(&self, thread_update: TimelineVectorDiffs) {
pub fn send(
&self,
thread_update: TimelineVectorDiffs,
generic_update: Option<RoomEventCacheGenericUpdate>,
) {
let _ = self.thread_sender.send(thread_update);
if let Some(generic_update) = generic_update {
let _ = self.generic_sender.send(generic_update);
}
}
/// Create a new [`Receiver`] of [`TimelineVectorDiffs`].