feat(ui): subscribe to room event cache for live thread updates

When ThreadListService is created it now immediately spawns a background
task (via the client's TaskMonitor) that subscribes to the room event
cache and listens for RoomEventCacheUpdate::UpdateTimelineEvents.

For every incoming event that carries an m.thread relation pointing to a
root we are already tracking, the service rebuilds a ThreadListItemEvent via the
existing build_latest_event helper and replaces latest_event on the matching
ThreadListItem and increments num_replies by 1
This commit is contained in:
Stefan Ceriu
2026-03-23 09:22:43 +02:00
committed by Stefan Ceriu
parent fe45ba5cc2
commit 1dd413e4bc
2 changed files with 161 additions and 14 deletions

View File

@@ -1248,7 +1248,7 @@ impl Room {
/// [`ThreadListService::subscribe_to_items_updates`] /
/// [`ThreadListService::subscribe_to_pagination_state_updates`] to observe
/// changes.
pub fn thread_list_service(&self) -> Arc<ThreadListService> {
pub async fn thread_list_service(&self) -> Arc<ThreadListService> {
Arc::new(ThreadListService::new(&self.inner))
}

View File

@@ -15,13 +15,19 @@
use std::sync::Arc;
use eyeball::{ObservableWriteGuard, SharedObservable, Subscriber};
use eyeball_im::{ObservableVector, VectorSubscriberBatchedStream};
use eyeball_im::{ObservableVector, VectorDiff, VectorSubscriberBatchedStream};
use futures_util::future::join_all;
use imbl::Vector;
use matrix_sdk::{
Result, Room, deserialized_responses::TimelineEvent, locks::Mutex, paginators::PaginationToken,
Result, Room,
deserialized_responses::TimelineEvent,
event_cache::{RoomEventCacheSubscriber, RoomEventCacheUpdate},
locks::Mutex,
paginators::PaginationToken,
room::ListThreadsOptions,
task_monitor::BackgroundTaskHandle,
};
use matrix_sdk_common::serde_helpers::extract_thread_root;
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId,
events::{
@@ -29,7 +35,7 @@ use ruma::{
},
};
use tokio::sync::Mutex as AsyncMutex;
use tracing::error;
use tracing::{error, trace, warn};
use crate::timeline::{
Profile, TimelineDetails, TimelineItemContent,
@@ -120,6 +126,12 @@ pub enum ThreadListServiceError {
/// [`ThreadListItem`]s. It exposes methods to paginate forward through the
/// thread list as well as subscribe to state changes.
///
/// When created, the service automatically starts a background task that
/// listens to room event cache updates (from `/sync` and other sources).
/// Whenever a new event belonging to a known thread arrives, the service
/// updates that thread's `latest_event` and `num_replies` fields in real time,
/// emitting observable diffs to all subscribers.
///
/// # Example
///
/// ```no_run
@@ -154,18 +166,68 @@ pub struct ThreadListService {
/// The current list of thread items.
items: Arc<Mutex<ObservableVector<ThreadListItem>>>,
/// Handle to the background task listening for event cache updates.
/// Dropping this aborts the task.
_event_cache_task: BackgroundTaskHandle,
}
impl ThreadListService {
/// Creates a new [`ThreadListService`] for the given room.
///
/// This immediately spawns a background task that listens to the room's
/// event cache for live updates. The task self-bootstraps by performing
/// the async event cache subscription internally.
pub fn new(room: Room) -> Self {
let items: Arc<Mutex<ObservableVector<ThreadListItem>>> =
Arc::new(Mutex::new(ObservableVector::new()));
// Eagerly subscribe the event cache to sync responses (this is a cheap,
// synchronous, idempotent call).
if let Err(e) = room.client().event_cache().subscribe() {
warn!("ThreadListService: failed to subscribe event cache to sync: {e}");
}
let event_cache_task = room
.client()
.task_monitor()
.spawn_background_task("thread_list_service::event_cache_listener", {
let room = room.clone();
let items = items.clone();
async move {
// Obtain the room event cache and a subscriber.
let (_event_cache_drop, mut subscriber) = match async {
let (room_event_cache, drop_handles) = room.event_cache().await?;
let (_, subscriber) = room_event_cache.subscribe().await?;
matrix_sdk::event_cache::Result::Ok((drop_handles, subscriber))
}
.await
{
Ok(pair) => pair,
Err(e) => {
error!(
"ThreadListService: failed to subscribe to room event cache, \
live updates will not work: {e}"
);
return;
}
};
trace!("ThreadListService: event cache listener started");
Self::event_cache_listener_loop(&room, &mut subscriber, items).await;
}
})
.abort_on_drop();
Self {
room,
token: AsyncMutex::new(PaginationToken::None),
pagination_state: SharedObservable::new(ThreadListPaginationState::Idle {
end_reached: false,
}),
items: Arc::new(Mutex::new(ObservableVector::new())),
items,
_event_cache_task: event_cache_task,
}
}
@@ -273,8 +335,8 @@ impl ThreadListService {
let list_items = join_all(
thread_roots
.chunk
.iter()
.map(|timeline_event| self.build_thread_list_item(timeline_event.clone()))
.into_iter()
.map(|timeline_event| Self::build_thread_list_item(&self.room, timeline_event))
.collect::<Vec<_>>(),
)
.await
@@ -285,8 +347,8 @@ impl ThreadListService {
Ok(ThreadList { items: list_items, prev_batch_token: thread_roots.prev_batch_token })
}
pub(super) async fn build_thread_list_item(
&self,
async fn build_thread_list_item(
room: &Room,
timeline_event: TimelineEvent,
) -> Option<ThreadListItem> {
// Extract thread summary info before consuming the event.
@@ -302,10 +364,9 @@ impl ThreadListService {
let root_event_id = any_sync_timeline_event.event_id().to_owned();
let timestamp = any_sync_timeline_event.origin_server_ts();
let sender = any_sync_timeline_event.sender().to_owned();
let is_own = self.room.own_user_id() == sender;
let is_own = room.own_user_id() == sender;
let profile = self
.room
let profile = room
.profile_from_user_id(&sender)
.await
.map(TimelineDetails::Ready)
@@ -314,7 +375,7 @@ impl ThreadListService {
let content: Option<TimelineItemContent> = match TimelineAction::from_event(
any_sync_timeline_event,
&raw_any_sync_timeline_event,
&self.room,
room,
None,
None,
None,
@@ -346,7 +407,7 @@ impl ThreadListService {
let num_replies = thread_summary.as_ref().map(|s| s.num_replies).unwrap_or(0);
let latest_event = if let Some(ev) = bundled_latest_thread_event.map(|b| *b) {
Self::build_latest_event(&self.room, ev).await
Self::build_latest_event(room, ev).await
} else {
None
};
@@ -423,6 +484,92 @@ impl ThreadListService {
Some(ThreadListItemEvent { event_id, timestamp, sender, is_own, sender_profile, content })
}
/// The main loop of the event-cache listener task.
///
/// Listens for [`RoomEventCacheUpdate`]s and, for each new timeline event
/// that belongs to a thread we are tracking, updates the corresponding
/// [`ThreadListItem`]'s `latest_event` and `num_replies`.
async fn event_cache_listener_loop(
room: &Room,
subscriber: &mut RoomEventCacheSubscriber,
items: Arc<Mutex<ObservableVector<ThreadListItem>>>,
) {
use tokio::sync::broadcast::error::RecvError;
loop {
let update = match subscriber.recv().await {
Ok(update) => update,
Err(RecvError::Closed) => {
error!("ThreadListService: event cache channel closed, stopping listener");
break;
}
Err(RecvError::Lagged(n)) => {
warn!("ThreadListService: lagged behind {n} event cache updates");
continue;
}
};
if let RoomEventCacheUpdate::UpdateTimelineEvents(timeline_diffs) = update {
let new_events = Self::collect_events_from_diffs(timeline_diffs.diffs);
for event in new_events {
// Check if this event has a thread relation pointing to a known root.
let Some(thread_root) = extract_thread_root(event.raw()) else { continue };
// Find the position of this thread root in our list.
let position = {
let guard = items.lock();
guard.iter().position(|item| item.root_event.event_id == thread_root)
};
if let Some(index) = position {
// Build the latest event representation from the raw event.
if let Some(latest_event) = Self::build_latest_event(room, event).await {
let mut guard = items.lock();
// Re-check the position — the vector may have changed while
// we were awaiting the profile lookup above.
if index < guard.len()
&& guard[index].root_event.event_id == thread_root
{
let mut updated = guard[index].clone();
updated.latest_event = Some(latest_event);
updated.num_replies = updated.num_replies.saturating_add(1);
guard.set(index, updated);
}
}
}
}
}
}
}
/// Extracts all events from a list of [`VectorDiff`]s.
fn collect_events_from_diffs(
diffs: Vec<VectorDiff<matrix_sdk_base::event_cache::Event>>,
) -> Vec<matrix_sdk_base::event_cache::Event> {
let mut events = Vec::new();
for diff in diffs {
match diff {
VectorDiff::Append { values } => events.extend(values),
VectorDiff::PushBack { value }
| VectorDiff::PushFront { value }
| VectorDiff::Insert { value, .. }
| VectorDiff::Set { value, .. } => events.push(value),
VectorDiff::Reset { values } => events.extend(values),
// These diffs don't carry new events.
VectorDiff::Clear
| VectorDiff::PopBack
| VectorDiff::PopFront
| VectorDiff::Remove { .. }
| VectorDiff::Truncate { .. } => {}
}
}
events
}
}
/// A structure wrapping a Thread List endpoint response i.e.