From 1dd413e4bc92c7f5fb8a6b1fbc7032219492efca Mon Sep 17 00:00:00 2001 From: Stefan Ceriu Date: Mon, 23 Mar 2026 09:22:43 +0200 Subject: [PATCH] feat(ui): subscribe to room event cache for live thread updates When ThreadListService is created it now immediately spawns a background task (via the client's TaskMonitor) that subscribes to the room event cache and listens for RoomEventCacheUpdate::UpdateTimelineEvents. For every incoming event that carries an m.thread relation pointing to a root we are already tracking, the service rebuilds a ThreadListItemEvent via the existing build_latest_event helper and replaces latest_event on the matching ThreadListItem and increments num_replies by 1 --- bindings/matrix-sdk-ffi/src/room/mod.rs | 2 +- .../src/timeline/thread_list_service.rs | 173 ++++++++++++++++-- 2 files changed, 161 insertions(+), 14 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/room/mod.rs b/bindings/matrix-sdk-ffi/src/room/mod.rs index 18ba0f036..0b9eac610 100644 --- a/bindings/matrix-sdk-ffi/src/room/mod.rs +++ b/bindings/matrix-sdk-ffi/src/room/mod.rs @@ -1248,7 +1248,7 @@ impl Room { /// [`ThreadListService::subscribe_to_items_updates`] / /// [`ThreadListService::subscribe_to_pagination_state_updates`] to observe /// changes. - pub fn thread_list_service(&self) -> Arc { + pub async fn thread_list_service(&self) -> Arc { Arc::new(ThreadListService::new(&self.inner)) } diff --git a/crates/matrix-sdk-ui/src/timeline/thread_list_service.rs b/crates/matrix-sdk-ui/src/timeline/thread_list_service.rs index 642854768..5da6ae744 100644 --- a/crates/matrix-sdk-ui/src/timeline/thread_list_service.rs +++ b/crates/matrix-sdk-ui/src/timeline/thread_list_service.rs @@ -15,13 +15,19 @@ use std::sync::Arc; use eyeball::{ObservableWriteGuard, SharedObservable, Subscriber}; -use eyeball_im::{ObservableVector, VectorSubscriberBatchedStream}; +use eyeball_im::{ObservableVector, VectorDiff, VectorSubscriberBatchedStream}; use futures_util::future::join_all; use imbl::Vector; use matrix_sdk::{ - Result, Room, deserialized_responses::TimelineEvent, locks::Mutex, paginators::PaginationToken, + Result, Room, + deserialized_responses::TimelineEvent, + event_cache::{RoomEventCacheSubscriber, RoomEventCacheUpdate}, + locks::Mutex, + paginators::PaginationToken, room::ListThreadsOptions, + task_monitor::BackgroundTaskHandle, }; +use matrix_sdk_common::serde_helpers::extract_thread_root; use ruma::{ MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, events::{ @@ -29,7 +35,7 @@ use ruma::{ }, }; use tokio::sync::Mutex as AsyncMutex; -use tracing::error; +use tracing::{error, trace, warn}; use crate::timeline::{ Profile, TimelineDetails, TimelineItemContent, @@ -120,6 +126,12 @@ pub enum ThreadListServiceError { /// [`ThreadListItem`]s. It exposes methods to paginate forward through the /// thread list as well as subscribe to state changes. /// +/// When created, the service automatically starts a background task that +/// listens to room event cache updates (from `/sync` and other sources). +/// Whenever a new event belonging to a known thread arrives, the service +/// updates that thread's `latest_event` and `num_replies` fields in real time, +/// emitting observable diffs to all subscribers. +/// /// # Example /// /// ```no_run @@ -154,18 +166,68 @@ pub struct ThreadListService { /// The current list of thread items. items: Arc>>, + + /// Handle to the background task listening for event cache updates. + /// Dropping this aborts the task. + _event_cache_task: BackgroundTaskHandle, } impl ThreadListService { /// Creates a new [`ThreadListService`] for the given room. + /// + /// This immediately spawns a background task that listens to the room's + /// event cache for live updates. The task self-bootstraps by performing + /// the async event cache subscription internally. pub fn new(room: Room) -> Self { + let items: Arc>> = + Arc::new(Mutex::new(ObservableVector::new())); + + // Eagerly subscribe the event cache to sync responses (this is a cheap, + // synchronous, idempotent call). + if let Err(e) = room.client().event_cache().subscribe() { + warn!("ThreadListService: failed to subscribe event cache to sync: {e}"); + } + + let event_cache_task = room + .client() + .task_monitor() + .spawn_background_task("thread_list_service::event_cache_listener", { + let room = room.clone(); + let items = items.clone(); + async move { + // Obtain the room event cache and a subscriber. + let (_event_cache_drop, mut subscriber) = match async { + let (room_event_cache, drop_handles) = room.event_cache().await?; + let (_, subscriber) = room_event_cache.subscribe().await?; + matrix_sdk::event_cache::Result::Ok((drop_handles, subscriber)) + } + .await + { + Ok(pair) => pair, + Err(e) => { + error!( + "ThreadListService: failed to subscribe to room event cache, \ + live updates will not work: {e}" + ); + return; + } + }; + + trace!("ThreadListService: event cache listener started"); + + Self::event_cache_listener_loop(&room, &mut subscriber, items).await; + } + }) + .abort_on_drop(); + Self { room, token: AsyncMutex::new(PaginationToken::None), pagination_state: SharedObservable::new(ThreadListPaginationState::Idle { end_reached: false, }), - items: Arc::new(Mutex::new(ObservableVector::new())), + items, + _event_cache_task: event_cache_task, } } @@ -273,8 +335,8 @@ impl ThreadListService { let list_items = join_all( thread_roots .chunk - .iter() - .map(|timeline_event| self.build_thread_list_item(timeline_event.clone())) + .into_iter() + .map(|timeline_event| Self::build_thread_list_item(&self.room, timeline_event)) .collect::>(), ) .await @@ -285,8 +347,8 @@ impl ThreadListService { Ok(ThreadList { items: list_items, prev_batch_token: thread_roots.prev_batch_token }) } - pub(super) async fn build_thread_list_item( - &self, + async fn build_thread_list_item( + room: &Room, timeline_event: TimelineEvent, ) -> Option { // Extract thread summary info before consuming the event. @@ -302,10 +364,9 @@ impl ThreadListService { let root_event_id = any_sync_timeline_event.event_id().to_owned(); let timestamp = any_sync_timeline_event.origin_server_ts(); let sender = any_sync_timeline_event.sender().to_owned(); - let is_own = self.room.own_user_id() == sender; + let is_own = room.own_user_id() == sender; - let profile = self - .room + let profile = room .profile_from_user_id(&sender) .await .map(TimelineDetails::Ready) @@ -314,7 +375,7 @@ impl ThreadListService { let content: Option = match TimelineAction::from_event( any_sync_timeline_event, &raw_any_sync_timeline_event, - &self.room, + room, None, None, None, @@ -346,7 +407,7 @@ impl ThreadListService { let num_replies = thread_summary.as_ref().map(|s| s.num_replies).unwrap_or(0); let latest_event = if let Some(ev) = bundled_latest_thread_event.map(|b| *b) { - Self::build_latest_event(&self.room, ev).await + Self::build_latest_event(room, ev).await } else { None }; @@ -423,6 +484,92 @@ impl ThreadListService { Some(ThreadListItemEvent { event_id, timestamp, sender, is_own, sender_profile, content }) } + + /// The main loop of the event-cache listener task. + /// + /// Listens for [`RoomEventCacheUpdate`]s and, for each new timeline event + /// that belongs to a thread we are tracking, updates the corresponding + /// [`ThreadListItem`]'s `latest_event` and `num_replies`. + async fn event_cache_listener_loop( + room: &Room, + subscriber: &mut RoomEventCacheSubscriber, + items: Arc>>, + ) { + use tokio::sync::broadcast::error::RecvError; + + loop { + let update = match subscriber.recv().await { + Ok(update) => update, + Err(RecvError::Closed) => { + error!("ThreadListService: event cache channel closed, stopping listener"); + break; + } + Err(RecvError::Lagged(n)) => { + warn!("ThreadListService: lagged behind {n} event cache updates"); + continue; + } + }; + + if let RoomEventCacheUpdate::UpdateTimelineEvents(timeline_diffs) = update { + let new_events = Self::collect_events_from_diffs(timeline_diffs.diffs); + + for event in new_events { + // Check if this event has a thread relation pointing to a known root. + let Some(thread_root) = extract_thread_root(event.raw()) else { continue }; + + // Find the position of this thread root in our list. + let position = { + let guard = items.lock(); + guard.iter().position(|item| item.root_event.event_id == thread_root) + }; + + if let Some(index) = position { + // Build the latest event representation from the raw event. + if let Some(latest_event) = Self::build_latest_event(room, event).await { + let mut guard = items.lock(); + + // Re-check the position — the vector may have changed while + // we were awaiting the profile lookup above. + if index < guard.len() + && guard[index].root_event.event_id == thread_root + { + let mut updated = guard[index].clone(); + updated.latest_event = Some(latest_event); + updated.num_replies = updated.num_replies.saturating_add(1); + guard.set(index, updated); + } + } + } + } + } + } + } + + /// Extracts all events from a list of [`VectorDiff`]s. + fn collect_events_from_diffs( + diffs: Vec>, + ) -> Vec { + let mut events = Vec::new(); + + for diff in diffs { + match diff { + VectorDiff::Append { values } => events.extend(values), + VectorDiff::PushBack { value } + | VectorDiff::PushFront { value } + | VectorDiff::Insert { value, .. } + | VectorDiff::Set { value, .. } => events.push(value), + VectorDiff::Reset { values } => events.extend(values), + // These diffs don't carry new events. + VectorDiff::Clear + | VectorDiff::PopBack + | VectorDiff::PopFront + | VectorDiff::Remove { .. } + | VectorDiff::Truncate { .. } => {} + } + } + + events + } } /// A structure wrapping a Thread List endpoint response i.e.