From b6edf826b00a05ad411462e07cf6c2128286fb67 Mon Sep 17 00:00:00 2001 From: Stefan Ceriu Date: Tue, 17 Mar 2026 17:16:32 +0200 Subject: [PATCH] feat(ui): introduce a new ThreadListService `ThreadListService` is the FFI-facing wrapper around [`matrix_sdk_ui::timeline::thread_list_service::ThreadListService`]. It maintains an observable list of [`ThreadListItem`]s and exposes a pagination state publisher, making it straightforward to build reactive UIs on top of the thread list. --- .../matrix-sdk-ffi/src/timeline/threads.rs | 41 +- crates/matrix-sdk-ui/src/timeline/mod.rs | 2 + .../src/timeline/thread_list_service.rs | 464 ++++++++++++++++++ crates/matrix-sdk-ui/src/timeline/threads.rs | 29 +- crates/matrix-sdk-ui/src/timeline/traits.rs | 12 + 5 files changed, 536 insertions(+), 12 deletions(-) create mode 100644 crates/matrix-sdk-ui/src/timeline/thread_list_service.rs diff --git a/bindings/matrix-sdk-ffi/src/timeline/threads.rs b/bindings/matrix-sdk-ffi/src/timeline/threads.rs index 746b53743..c057bdce7 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/threads.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/threads.rs @@ -102,11 +102,10 @@ impl From for SdkIncludeThreads { /// [`ThreadListItem`]s and the current pagination token. #[derive(uniffi::Record)] pub struct ThreadList { - /// The events that are thread roots in the current batch. + /// The thread-root events that belong to this page of results. pub items: Vec, - /// Token to paginate backwards in a subsequent query to - /// [`Room::list_threads`]. + /// Opaque pagination token returned by the homeserver. pub prev_batch_token: Option, } @@ -119,14 +118,48 @@ impl From for ThreadList { } } -/// An individual Thread as retrieved from through Thread List API. +/// Each `ThreadListItem` represents one thread root event in the room. The +/// fields are pre-resolved from the raw homeserver response: the sender's +/// profile is fetched eagerly and the event content is parsed into a +/// `TimelineItemContent` so that consumers can render the item without any +/// additional work. +/// +/// `ThreadListItem`s are produced page by page via `Room::load_thread_list()` +/// and are accumulated inside the `ThreadListService` as pages are fetched +/// through `ThreadListService::paginate()`. #[derive(uniffi::Record)] pub struct ThreadListItem { + /// The event ID of the thread's root message. + /// + /// Use this to open a per-thread `Timeline` or to navigate the user to + /// the thread view. root_event_id: String, + + /// The `origin_server_ts` of the thread root event. + /// + /// Suitable for display as a "last active" timestamp or for sorting + /// threads in the UI. timestamp: Timestamp, + + /// The Matrix user ID of the thread root event's sender. sender: String, + + /// The sender's profile (display name and avatar URL) at the time the + /// event was received. + /// + /// This is fetched eagerly when the item is built. It will be + /// `ProfileDetails.Unavailable` if the profile could not be retrieved. sender_profile: ProfileDetails, + + /// Whether the thread root was sent by the current user. is_own: bool, + + /// The parsed content of the thread root event, if available. + /// + /// `None` when the event could not be deserialized into a known + /// `TimelineItemContent` variant (e.g. an unsupported or redacted event + /// type). Callers should handle `None` gracefully, for example by + /// rendering a generic placeholder. content: Option, } diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index 73c9ada8e..06a69701f 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -79,6 +79,7 @@ mod subscriber; mod tasks; #[cfg(test)] mod tests; +pub mod thread_list_service; pub mod threads; mod traits; mod virtual_item; @@ -99,6 +100,7 @@ pub use self::{ }, item::{TimelineItem, TimelineItemKind, TimelineUniqueId}, latest_event::{LatestEventValue, LatestEventValueLocalState}, + thread_list_service::{ThreadListPaginationState, ThreadListService}, traits::RoomExt, virtual_item::VirtualTimelineItem, }; diff --git a/crates/matrix-sdk-ui/src/timeline/thread_list_service.rs b/crates/matrix-sdk-ui/src/timeline/thread_list_service.rs new file mode 100644 index 000000000..b63dd8633 --- /dev/null +++ b/crates/matrix-sdk-ui/src/timeline/thread_list_service.rs @@ -0,0 +1,464 @@ +// Copyright 2026 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use eyeball::{ObservableWriteGuard, SharedObservable, Subscriber}; +use eyeball_im::{ObservableVector, VectorSubscriberBatchedStream}; +use imbl::Vector; +use matrix_sdk::{Room, locks::Mutex, paginators::PaginationToken, room::ListThreadsOptions}; +use tokio::sync::Mutex as AsyncMutex; + +use crate::timeline::{threads::ThreadListItem, traits::RoomExt}; + +/// The pagination state of a [`ThreadListService`]. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum ThreadListPaginationState { + /// The list is idle (not currently loading). + Idle { + /// Whether the end of the thread list has been reached (no more pages + /// to load). + end_reached: bool, + }, + /// The list is currently loading the next page. + Loading, +} + +/// An error that occurred while using a [`ThreadListService`]. +#[derive(Debug, thiserror::Error)] +pub enum ThreadListServiceError { + /// An error from the underlying Matrix SDK. + #[error(transparent)] + Sdk(#[from] matrix_sdk::Error), +} + +/// A paginated list of threads for a given room. +/// +/// `ThreadListService` provides an observable, paginated list of +/// [`ThreadListItem`]s. It exposes methods to paginate forward through the +/// thread list as well as subscribe to state changes. +/// +/// # Example +/// +/// ```no_run +/// use matrix_sdk::Room; +/// use matrix_sdk_ui::timeline::thread_list_service::{ +/// ThreadListPaginationState, ThreadListService, +/// }; +/// +/// # async { +/// # let room: Room = todo!(); +/// let service = ThreadListService::new(room); +/// +/// assert_eq!( +/// service.pagination_state(), +/// ThreadListPaginationState::Idle { end_reached: false } +/// ); +/// +/// service.paginate().await.unwrap(); +/// +/// let items = service.items(); +/// # anyhow::Ok(()) }; +/// ``` +pub struct ThreadListService { + /// The room whose threads are being listed. + room: Room, + + /// The pagination token used to fetch subsequent pages. + token: AsyncMutex, + + /// The current pagination state. + pagination_state: SharedObservable, + + /// The current list of thread items. + items: Arc>>, +} + +impl ThreadListService { + /// Creates a new [`ThreadListService`] for the given room. + pub fn new(room: Room) -> Self { + Self { + room, + token: AsyncMutex::new(PaginationToken::None), + pagination_state: SharedObservable::new(ThreadListPaginationState::Idle { + end_reached: false, + }), + items: Arc::new(Mutex::new(ObservableVector::new())), + } + } + + /// Returns the current pagination state. + pub fn pagination_state(&self) -> ThreadListPaginationState { + self.pagination_state.get() + } + + /// Subscribes to pagination state updates. + /// + /// The returned [`Subscriber`] will emit a new value every time the + /// pagination state changes. + pub fn subscribe_to_pagination_state_updates(&self) -> Subscriber { + self.pagination_state.subscribe() + } + + /// Returns the current list of thread items as a snapshot. + pub fn items(&self) -> Vec { + self.items.lock().iter().cloned().collect() + } + + /// Subscribes to updates of the thread item list. + /// + /// Returns a snapshot of the current items alongside a batched stream of + /// [`eyeball_im::VectorDiff`]s that describe subsequent changes. + pub fn subscribe_to_items_updates( + &self, + ) -> (Vector, VectorSubscriberBatchedStream) { + self.items.lock().subscribe().into_values_and_batched_stream() + } + + /// Fetches the next page of threads, appending the results to the item + /// list. + /// + /// - If the list is already loading or the end has been reached, this + /// method returns immediately with `Ok(())`. + /// - On a network/SDK error the pagination state is reset to `Idle { + /// end_reached: false }` and the error is propagated. + pub async fn paginate(&self) -> Result<(), ThreadListServiceError> { + // Guard: do nothing if we are already loading or have reached the end. + { + let mut pagination_state = self.pagination_state.write(); + + match *pagination_state { + ThreadListPaginationState::Idle { end_reached: true } + | ThreadListPaginationState::Loading => return Ok(()), + _ => {} + } + + ObservableWriteGuard::set(&mut pagination_state, ThreadListPaginationState::Loading); + } + + let mut pagination_token = self.token.lock().await; + + // Build the options for this page, using the current token if we have one. + let from = match &*pagination_token { + PaginationToken::HasMore(token) => Some(token.clone()), + _ => None, + }; + + let opts = ListThreadsOptions { from, ..Default::default() }; + + match self.room.load_thread_list(opts).await { + Ok(thread_list) => { + // Update the pagination token based on whether there are more pages. + *pagination_token = match &thread_list.prev_batch_token { + Some(token) => PaginationToken::HasMore(token.clone()), + None => PaginationToken::HitEnd, + }; + + let end_reached = thread_list.prev_batch_token.is_none(); + + // Append new items to the observable vector. + self.items.lock().append(thread_list.items.into()); + + self.pagination_state.set(ThreadListPaginationState::Idle { end_reached }); + + Ok(()) + } + Err(err) => { + self.pagination_state.set(ThreadListPaginationState::Idle { end_reached: false }); + Err(ThreadListServiceError::Sdk(err)) + } + } + } + + /// Resets the service back to its initial state. + /// + /// Clears all loaded items, discards the current pagination token, and + /// sets the pagination state to `Idle { end_reached: false }`. The next + /// call to [`Self::paginate`] will therefore start from the beginning of + /// the thread list. + pub async fn reset(&self) { + let mut pagination_token = self.token.lock().await; + *pagination_token = PaginationToken::None; + + self.items.lock().clear(); + + self.pagination_state.set(ThreadListPaginationState::Idle { end_reached: false }); + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use futures_util::pin_mut; + use matrix_sdk::test_utils::mocks::MatrixMockServer; + use matrix_sdk_test::{async_test, event_factory::EventFactory}; + use ruma::{event_id, events::AnyTimelineEvent, room_id, serde::Raw, user_id}; + use serde_json::json; + use stream_assert::{assert_next_matches, assert_pending}; + use wiremock::ResponseTemplate; + + use super::{ThreadListPaginationState, ThreadListService}; + + #[async_test] + async fn test_initial_state() { + let server = MatrixMockServer::new().await; + let service = make_service(&server).await; + + assert_eq!( + service.pagination_state(), + ThreadListPaginationState::Idle { end_reached: false } + ); + assert!(service.items().is_empty()); + } + + #[async_test] + async fn test_pagination() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + let room_id = room_id!("!a:b.c"); + let sender_id = user_id!("@alice:b.c"); + + let f = EventFactory::new().room(room_id).sender(sender_id); + + let eid1 = event_id!("$1"); + let eid2 = event_id!("$2"); + + server + .mock_room_threads() + .ok( + vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()], + Some("next_page_token".to_owned()), + ) + .mock_once() + .mount() + .await; + + server + .mock_room_threads() + .match_from("next_page_token") + .ok(vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()], None) + .mock_once() + .mount() + .await; + + let room = server.sync_joined_room(&client, room_id).await; + let service = ThreadListService::new(room); + + service.paginate().await.expect("first paginate failed"); + + assert_eq!( + service.pagination_state(), + ThreadListPaginationState::Idle { end_reached: false } + ); + assert_eq!(service.items().len(), 1); + assert_eq!(service.items()[0].root_event_id, eid1); + + service.paginate().await.expect("second paginate failed"); + + assert_eq!( + service.pagination_state(), + ThreadListPaginationState::Idle { end_reached: true } + ); + assert_eq!(service.items().len(), 2); + assert_eq!(service.items()[1].root_event_id, eid2); + } + + #[async_test] + async fn test_pagination_end_reached() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + let room_id = room_id!("!a:b.c"); + let sender_id = user_id!("@alice:b.c"); + let f = EventFactory::new().room(room_id).sender(sender_id); + let eid1 = event_id!("$1"); + + server + .mock_room_threads() + .ok(vec![f.text_msg("Thread root").event_id(eid1).into_raw()], None) + .mock_once() + .mount() + .await; + + let room = server.sync_joined_room(&client, room_id).await; + let service = ThreadListService::new(room); + + service.paginate().await.expect("paginate failed"); + assert_eq!( + service.pagination_state(), + ThreadListPaginationState::Idle { end_reached: true } + ); + assert_eq!(service.items().len(), 1); + + service.paginate().await.expect("second paginate should be a no-op"); + assert_eq!(service.items().len(), 1); + assert_eq!( + service.pagination_state(), + ThreadListPaginationState::Idle { end_reached: true } + ); + } + + /// Two concurrent calls to [`ThreadListService::paginate`] must not result + /// in two concurrent HTTP requests. The second call should detect that a + /// pagination is already in progress (state is `Loading`) and return + /// immediately without making another network request. + #[async_test] + async fn test_concurrent_pagination_is_not_possible() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + let room_id = room_id!("!a:b.c"); + let sender_id = user_id!("@alice:b.c"); + let f = EventFactory::new().room(room_id).sender(sender_id); + let eid1 = event_id!("$1"); + + // Set up a slow mock response so both `paginate()` calls overlap in + // flight. Using `expect(1)` means the test will panic during server + // teardown if the endpoint is hit more than once. + let chunk: Vec> = + vec![f.text_msg("Thread root").event_id(eid1).into_raw()]; + server + .mock_room_threads() + .respond_with( + ResponseTemplate::new(200) + .set_body_json(json!({ "chunk": chunk, "next_batch": null })) + .set_delay(Duration::from_millis(100)), + ) + .expect(1) + .mount() + .await; + + let room = server.sync_joined_room(&client, room_id).await; + let service = ThreadListService::new(room); + + // Run two paginations concurrently. + let (first, second) = tokio::join!(service.paginate(), service.paginate()); + + first.expect("first paginate should succeed"); + second.expect("second (concurrent) paginate should succeed as a no-op"); + + // Only one HTTP request was made, so we have exactly one item. + assert_eq!(service.items().len(), 1); + assert_eq!(service.items()[0].root_event_id, eid1); + assert_eq!( + service.pagination_state(), + ThreadListPaginationState::Idle { end_reached: true } + ); + } + + /// When the server returns an error, [`ThreadListService::paginate`] must + /// propagate the error *and* reset the pagination state back to + /// `Idle { end_reached: false }` so that the caller can retry. + #[async_test] + async fn test_pagination_error() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + let room_id = room_id!("!a:b.c"); + + server.mock_room_threads().error500().mock_once().mount().await; + + let room = server.sync_joined_room(&client, room_id).await; + let service = ThreadListService::new(room); + + // Pagination must surface the server error. + service.paginate().await.expect_err("paginate should fail on a 500 response"); + + // The state must be reset so the caller can retry; it must *not* be + // stuck in `Loading`. + assert_eq!( + service.pagination_state(), + ThreadListPaginationState::Idle { end_reached: false } + ); + + // No items should have been added. + assert!(service.items().is_empty()); + } + + #[async_test] + async fn test_reset() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + let room_id = room_id!("!a:b.c"); + let sender_id = user_id!("@alice:b.c"); + let f = EventFactory::new().room(room_id).sender(sender_id); + let eid1 = event_id!("$1"); + + server + .mock_room_threads() + .ok(vec![f.text_msg("Thread root").event_id(eid1).into_raw()], None) + .expect(2) + .mount() + .await; + + let room = server.sync_joined_room(&client, room_id).await; + let service = ThreadListService::new(room); + + service.paginate().await.expect("first paginate failed"); + assert_eq!(service.items().len(), 1); + assert_eq!( + service.pagination_state(), + ThreadListPaginationState::Idle { end_reached: true } + ); + + service.reset().await; + assert!(service.items().is_empty()); + assert_eq!( + service.pagination_state(), + ThreadListPaginationState::Idle { end_reached: false } + ); + + service.paginate().await.expect("paginate after reset failed"); + assert_eq!(service.items().len(), 1); + } + + #[async_test] + async fn test_pagination_state_subscriber() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + let room_id = room_id!("!a:b.c"); + let sender_id = user_id!("@alice:b.c"); + let f = EventFactory::new().room(room_id).sender(sender_id); + let eid1 = event_id!("$1"); + + server + .mock_room_threads() + .ok( + vec![f.text_msg("Thread root").event_id(eid1).into_raw()], + Some("next_token".to_owned()), + ) + .mock_once() + .mount() + .await; + + let room = server.sync_joined_room(&client, room_id).await; + let service = ThreadListService::new(room); + + let subscriber = service.subscribe_to_pagination_state_updates(); + pin_mut!(subscriber); + + assert_pending!(subscriber); + + service.paginate().await.expect("paginate failed"); + + assert_next_matches!(subscriber, ThreadListPaginationState::Idle { end_reached: false }); + } + + /// Builds a [`ThreadListService`] and makes the room known to the client + /// by performing a sync. + async fn make_service(server: &MatrixMockServer) -> ThreadListService { + let client = server.client_builder().build().await; + let room_id = room_id!("!a:b.c"); + let room = server.sync_joined_room(&client, room_id).await; + ThreadListService::new(room) + } +} diff --git a/crates/matrix-sdk-ui/src/timeline/threads.rs b/crates/matrix-sdk-ui/src/timeline/threads.rs index 03e6e8be5..d9b83ceac 100644 --- a/crates/matrix-sdk-ui/src/timeline/threads.rs +++ b/crates/matrix-sdk-ui/src/timeline/threads.rs @@ -34,16 +34,25 @@ use crate::timeline::{ /// A structure wrapping a Thread List endpoint response i.e. /// [`ThreadListItem`]s and the current pagination token. +#[derive(Clone, Debug)] pub struct ThreadList { - /// The list items + /// The thread-root events that belong to this page of results. pub items: Vec, - /// Token to paginate backwards in a subsequent query to - /// [`super::Room::list_threads`]. + /// Opaque pagination token returned by the homeserver. pub prev_batch_token: Option, } -/// An individual Thread as retrieved from through Thread List API. +/// Each `ThreadListItem` represents one thread root event in the room. The +/// fields are pre-resolved from the raw homeserver response: the sender's +/// profile is fetched eagerly and the event content is parsed into a +/// [`TimelineItemContent`] so that consumers can render the item without any +/// additional work. +/// +/// `ThreadListItem`s are produced by [`load_thread_list`] and are accumulated +/// inside [`super::thread_list_service::ThreadListService`] as pages are +/// fetched via [`super::thread_list_service::ThreadListService::paginate`]. +#[derive(Clone, Debug)] pub struct ThreadListItem { /// The thread's root event identifier. pub root_event_id: OwnedEventId, @@ -51,16 +60,20 @@ pub struct ThreadListItem { /// The timestamp of the remote event. pub timestamp: MilliSecondsSinceUnixEpoch, - /// The sender of the remote event. + /// The Matrix user ID of the thread root event's sender. pub sender: OwnedUserId, - /// Has this event been sent by the current logged user? + /// Whether the thread root was sent by the current user. pub is_own: bool, - /// The sender's profile. + /// The sender's profile (display name and avatar URL) pub sender_profile: TimelineDetails, - /// The content of the remote event. + /// The parsed content of the thread root event, if available. + /// + /// `None` when the event could not be deserialized into a known + /// [`TimelineItemContent`] variant (e.g. an unsupported or redacted event + /// type) pub content: Option, } diff --git a/crates/matrix-sdk-ui/src/timeline/traits.rs b/crates/matrix-sdk-ui/src/timeline/traits.rs index ed71848fb..8910d019f 100644 --- a/crates/matrix-sdk-ui/src/timeline/traits.rs +++ b/crates/matrix-sdk-ui/src/timeline/traits.rs @@ -38,6 +38,7 @@ use super::{Profile, RedactError, TimelineBuilder}; use crate::timeline::{ self, Timeline, TimelineReadReceiptTracking, latest_event::LatestEventValue, + thread_list_service::ThreadListService, threads::{ThreadList, load_thread_list}, }; @@ -69,6 +70,13 @@ pub trait RoomExt { &self, opts: ListThreadsOptions, ) -> impl Future> + SendOutsideWasm; + + /// Create a [`ThreadListService`] for this room. + /// + /// The returned service provides a paginated, observable list of thread + /// roots for the room and can be used to page through threads and + /// subscribe to updates. + fn thread_list_service(&self) -> ThreadListService; } impl RoomExt for Room { @@ -93,6 +101,10 @@ impl RoomExt for Room { async fn load_thread_list(&self, opts: ListThreadsOptions) -> Result { load_thread_list(self, opts).await } + + fn thread_list_service(&self) -> ThreadListService { + ThreadListService::new(self.clone()) + } } pub(super) trait RoomDataProvider: