feat(ui): add new Thread timeline focus mode and associated events loader (#5032)

… that allows building a timeline instance specific to a particular
thread root.

Creating a timeline in this mode will start by backpaginating root event
relations with `num_events` and automatically insert the thread root
event when reaching the end. It will include
`RelationsOfType(RelationType::Thread)` but also recurse over the
retrieved events to fetch reactions.
It will not however react to new events received over sync or that the
user sends (for now).

This patch will also help incrementally deliver the upstream client
support for creating such a timeline.

Part of #4833 (meta #4869).
This commit is contained in:
Stefan Ceriu
2025-05-14 17:14:29 +03:00
committed by GitHub
parent 36667c1298
commit 13a65c8dfe
11 changed files with 383 additions and 11 deletions

View File

@@ -65,6 +65,7 @@ impl From<FilterTimelineEventType> for TimelineEventType {
pub enum TimelineFocus {
Live,
Event { event_id: String, num_context_events: u16 },
Thread { root_event_id: String, num_events: u16 },
PinnedEvents { max_events_to_load: u16, max_concurrent_requests: u16 },
}
@@ -85,6 +86,16 @@ impl TryFrom<TimelineFocus> for matrix_sdk_ui::timeline::TimelineFocus {
Ok(Self::Event { target: parsed_event_id, num_context_events })
}
TimelineFocus::Thread { root_event_id, num_events } => {
let parsed_root_event_id = EventId::parse(&root_event_id).map_err(|err| {
FocusEventError::InvalidEventId {
event_id: root_event_id.clone(),
err: err.to_string(),
}
})?;
Ok(Self::Thread { root_event_id: parsed_root_event_id, num_events })
}
TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => {
Ok(Self::PinnedEvents { max_events_to_load, max_concurrent_requests })
}

View File

@@ -67,6 +67,7 @@ use super::{
event_item::{ReactionStatus, RemoteEventOrigin},
item::TimelineUniqueId,
subscriber::TimelineSubscriber,
threaded_events_loader::ThreadedEventsLoader,
traits::{Decryptor, RoomDataProvider},
DateDividerMode, Error, EventSendState, EventTimelineItem, InReplyToDetails, PaginationError,
Profile, RepliedToEvent, TimelineDetails, TimelineEventItemId, TimelineFocus, TimelineItem,
@@ -110,6 +111,13 @@ enum TimelineFocusData<P: RoomDataProvider> {
num_context_events: u16,
},
Thread {
loader: ThreadedEventsLoader<P>,
/// Number of relations events to requests for the first request
num_events: u16,
},
PinnedEvents {
loader: PinnedEventsLoader,
},
@@ -177,6 +185,7 @@ impl Default for TimelineSettings {
pub(super) enum TimelineFocusKind {
Live,
Event,
Thread,
PinnedEvents,
}
@@ -278,6 +287,14 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
)
}
TimelineFocus::Thread { root_event_id, num_events } => (
TimelineFocusData::Thread {
loader: ThreadedEventsLoader::new(room_data_provider.clone(), root_event_id),
num_events,
},
TimelineFocusKind::Thread,
),
TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => (
TimelineFocusData::PinnedEvents {
loader: PinnedEventsLoader::new(
@@ -361,6 +378,24 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
Ok(has_events)
}
TimelineFocusData::Thread { loader, num_events } => {
let result = loader
.paginate_backwards((*num_events).into())
.await
.map_err(PaginationError::Paginator)?;
drop(focus_guard);
// Events are in reverse topological order.
self.replace_with_initial_remote_events(
result.events.into_iter().rev(),
RemoteEventOrigin::Pagination,
)
.await;
Ok(true)
}
TimelineFocusData::PinnedEvents { loader } => {
let loaded_events = loader.load_events().await.map_err(Error::PinnedEventsError)?;
@@ -453,12 +488,16 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
) -> Result<bool, PaginationError> {
let PaginationResult { events, hit_end_of_timeline } = match &*self.focus.read().await {
TimelineFocusData::Live | TimelineFocusData::PinnedEvents { .. } => {
return Err(PaginationError::NotEventFocusMode)
return Err(PaginationError::NotSupported)
}
TimelineFocusData::Event { paginator, .. } => paginator
.paginate_backward(num_events.into())
.await
.map_err(PaginationError::Paginator)?,
TimelineFocusData::Thread { loader, num_events } => loader
.paginate_backwards((*num_events).into())
.await
.map_err(PaginationError::Paginator)?,
};
// Events are in reverse topological order.
@@ -481,9 +520,10 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
num_events: u16,
) -> Result<bool, PaginationError> {
let PaginationResult { events, hit_end_of_timeline } = match &*self.focus.read().await {
TimelineFocusData::Live | TimelineFocusData::PinnedEvents { .. } => {
return Err(PaginationError::NotEventFocusMode)
}
TimelineFocusData::Live
| TimelineFocusData::PinnedEvents { .. }
| TimelineFocusData::Thread { .. } => return Err(PaginationError::NotSupported),
TimelineFocusData::Event { paginator, .. } => paginator
.paginate_forward(num_events.into())
.await

View File

@@ -150,6 +150,7 @@ impl TimelineState {
let should_add_new_items = match self.timeline_focus {
TimelineFocusKind::Live => true,
TimelineFocusKind::Event | TimelineFocusKind::PinnedEvents => false,
TimelineFocusKind::Thread => false,
};
let ctx = TimelineEventContext {

View File

@@ -276,6 +276,13 @@ impl<'a> TimelineStateTransaction<'a> {
// filtering: the event *should* be added!
true
}
TimelineFocusKind::Thread => {
// The thread timeline doesn't apply any additional
// for now. It will however do so in the future, as
// will the live one
true
}
}
}

View File

@@ -115,13 +115,12 @@ pub enum RedactError {
#[derive(Error, Debug)]
pub enum PaginationError {
/// The timeline isn't in the event focus mode.
#[error("The timeline isn't in the event focus mode")]
NotEventFocusMode,
/// An error occurred while paginating.
#[error("Error when paginating.")]
Paginator(#[source] PaginatorError),
#[error("Pagination type not supported in this focus mode")]
NotSupported,
}
#[derive(Debug, Error)]

View File

@@ -70,6 +70,7 @@ mod pinned_events_loader;
mod subscriber;
#[cfg(test)]
mod tests;
mod threaded_events_loader;
mod to_device;
mod traits;
mod virtual_item;
@@ -120,6 +121,9 @@ pub enum TimelineFocus {
/// Focus on a specific event, e.g. after clicking a permalink.
Event { target: OwnedEventId, num_context_events: u16 },
/// Focus on a specific thread
Thread { root_event_id: OwnedEventId, num_events: u16 },
/// Only show pinned events.
PinnedEvents { max_events_to_load: u16, max_concurrent_requests: u16 },
}
@@ -129,6 +133,7 @@ impl TimelineFocus {
match self {
TimelineFocus::Live => "live".to_owned(),
TimelineFocus::Event { target, .. } => format!("permalink:{target}"),
TimelineFocus::Thread { root_event_id, .. } => format!("thread:{root_event_id}"),
TimelineFocus::PinnedEvents { .. } => "pinned-events".to_owned(),
}
}

View File

@@ -31,7 +31,7 @@ use matrix_sdk::{
crypto::OlmMachine,
deserialized_responses::{EncryptionInfo, TimelineEvent},
event_cache::paginator::{PaginableRoom, PaginatorError},
room::{EventWithContextResponse, Messages, MessagesOptions, PushContext},
room::{EventWithContextResponse, Messages, MessagesOptions, PushContext, Relations},
send_queue::RoomSendQueueUpdate,
BoxFuture,
};
@@ -428,4 +428,12 @@ impl RoomDataProvider for TestRoomDataProvider {
) -> Option<EncryptionInfo> {
self.encryption_info.get(session_id).cloned()
}
async fn relations(
&self,
_event_id: OwnedEventId,
_opts: matrix_sdk::room::RelationsOptions,
) -> Result<Relations, matrix_sdk::Error> {
unimplemented!();
}
}

View File

@@ -0,0 +1,99 @@
// Copyright 2025 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{fmt::Formatter, sync::Mutex};
use matrix_sdk::{
event_cache::{
paginator::{PaginationResult, PaginatorError},
PaginationToken,
},
room::{IncludeRelations, RelationsOptions},
};
use ruma::{api::Direction, events::relation::RelationType, OwnedEventId, UInt};
use super::traits::RoomDataProvider;
pub struct ThreadedEventsLoader<P: RoomDataProvider> {
room: P,
root_event_id: OwnedEventId,
token: Mutex<PaginationToken>,
}
impl<P: RoomDataProvider> ThreadedEventsLoader<P> {
/// Create a new [`Paginator`], given a room implementation.
pub fn new(room: P, root_event_id: OwnedEventId) -> Self {
Self { room, root_event_id, token: Mutex::new(None.into()) }
}
pub async fn paginate_backwards(
&self,
num_events: UInt,
) -> Result<PaginationResult, PaginatorError> {
let token = {
let token = self.token.lock().unwrap();
match &*token {
PaginationToken::None => None,
PaginationToken::HasMore(token) => Some(token.clone()),
PaginationToken::HitEnd => {
return Ok(PaginationResult { events: Vec::new(), hit_end_of_timeline: true });
}
}
};
let options = RelationsOptions {
from: token,
dir: Direction::Backward,
limit: Some(num_events),
include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
recurse: true,
};
let mut result = self
.room
.relations(self.root_event_id.to_owned(), options)
.await
.map_err(|error| PaginatorError::SdkError(Box::new(error)))?;
let hit_end_of_timeline = result.next_batch_token.is_none();
// Update the stored tokens
{
let mut token = self.token.lock().unwrap();
*token = match result.next_batch_token {
Some(val) => PaginationToken::HasMore(val),
None => PaginationToken::HitEnd,
};
}
// Finally insert the thread root if at the end of the timeline going backwards
if hit_end_of_timeline {
let root_event =
self.room.load_event_with_relations(&self.root_event_id, None, None).await?.0;
result.chunk.push(root_event);
}
Ok(PaginationResult { events: result.chunk, hit_end_of_timeline })
}
}
#[cfg(not(tarpaulin_include))]
impl<P: RoomDataProvider> std::fmt::Debug for ThreadedEventsLoader<P> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ThreadedEventsLoader").finish()
}
}

View File

@@ -1,4 +1,4 @@
// Copyright 2023 The Matrix.org Foundation C.I.C.
// Copyright 2025 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -22,7 +22,7 @@ use matrix_sdk::{
crypto::types::events::CryptoContextInfo,
deserialized_responses::{EncryptionInfo, TimelineEvent},
event_cache::paginator::PaginableRoom,
room::PushContext,
room::{PushContext, Relations, RelationsOptions},
AsyncTraitDeps, Result, Room, SendOutsideWasm,
};
use matrix_sdk_base::{latest_event::LatestEvent, RoomInfo};
@@ -129,6 +129,8 @@ pub(super) trait RoomDataProvider:
session_id: &str,
sender: &UserId,
) -> impl Future<Output = Option<EncryptionInfo>> + SendOutsideWasm;
async fn relations(&self, event_id: OwnedEventId, opts: RelationsOptions) -> Result<Relations>;
}
impl RoomDataProvider for Room {
@@ -274,6 +276,10 @@ impl RoomDataProvider for Room {
// Pass directly on to `Room::get_encryption_info`
self.get_encryption_info(session_id, sender).await
}
async fn relations(&self, event_id: OwnedEventId, opts: RelationsOptions) -> Result<Relations> {
self.relations(event_id, opts).await
}
}
// Internal helper to make most of retry_event_decryption independent of a room

View File

@@ -62,6 +62,7 @@ mod reactions;
mod read_receipts;
mod replies;
mod subscribe;
mod thread;
pub(crate) mod sliding_sync;

View File

@@ -0,0 +1,195 @@
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use assert_matches2::assert_let;
use eyeball_im::VectorDiff;
use futures_util::StreamExt as _;
use matrix_sdk::test_utils::mocks::{MatrixMockServer, RoomRelationsResponseTemplate};
use matrix_sdk_test::{async_test, event_factory::EventFactory};
use matrix_sdk_ui::{timeline::TimelineFocus, Timeline};
use ruma::{event_id, events::AnyTimelineEvent, owned_event_id, room_id, serde::Raw, user_id};
use stream_assert::assert_pending;
#[async_test]
async fn test_new_thread() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let room_id = room_id!("!a:b.c");
let sender_id = user_id!("@alice:b.c");
let factory = EventFactory::new().room(room_id).sender(sender_id);
let thread_root_event_id = owned_event_id!("$root");
server
.mock_room_event()
.match_event_id()
.ok(factory
.text_msg("Thread root")
.sender(sender_id)
.event_id(&thread_root_event_id)
.into())
.mock_once()
.mount()
.await;
server
.mock_room_relations()
.match_target_event(thread_root_event_id.clone())
.ok(RoomRelationsResponseTemplate::default().events(Vec::<Raw<AnyTimelineEvent>>::new()))
.mock_once()
.mount()
.await;
let room = server.sync_joined_room(&client, room_id).await;
let timeline = Timeline::builder(&room)
.with_focus(TimelineFocus::Thread { root_event_id: thread_root_event_id, num_events: 1 })
.build()
.await
.unwrap();
let (items, mut timeline_stream) = timeline.subscribe().await;
assert_eq!(items.len(), 1 + 1); // a date divider + the thread root
assert!(items[0].is_date_divider());
assert_eq!(items[1].as_event().unwrap().content().as_message().unwrap().body(), "Thread root");
assert_pending!(timeline_stream);
}
#[async_test]
async fn test_thread_backpagination() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let room_id = room_id!("!a:b.c");
let sender_id = user_id!("@alice:b.c");
let factory = EventFactory::new().room(room_id).sender(sender_id);
let thread_root_event_id = owned_event_id!("$root");
server
.mock_room_event()
.match_event_id()
.ok(factory
.text_msg("Thread root")
.sender(sender_id)
.event_id(&thread_root_event_id)
.into())
.mock_once()
.mount()
.await;
let batch1 = vec![
factory.text_msg("Threaded event 4").event_id(event_id!("$3")).into_raw_sync().cast(),
factory.text_msg("Threaded event 3").event_id(event_id!("$4")).into_raw_sync().cast(),
];
let batch2 = vec![
factory.text_msg("Threaded event 2").event_id(event_id!("$2")).into_raw_sync().cast(),
factory.text_msg("Threaded event 1").event_id(event_id!("$1")).into_raw_sync().cast(),
];
server
.mock_room_relations()
.match_target_event(thread_root_event_id.clone())
.ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
.mock_once()
.mount()
.await;
server
.mock_room_relations()
.match_target_event(thread_root_event_id.clone())
.match_from("next_batch")
.ok(RoomRelationsResponseTemplate::default().events(batch2))
.mock_once()
.mount()
.await;
let room = server.sync_joined_room(&client, room_id).await;
let timeline = Timeline::builder(&room)
.with_focus(TimelineFocus::Thread { root_event_id: thread_root_event_id, num_events: 1 })
.build()
.await
.unwrap();
let (items, mut timeline_stream) = timeline.subscribe().await;
assert_pending!(timeline_stream);
assert_eq!(items.len(), 2 + 1); // A date divider + the 2 events
assert!(items[0].is_date_divider());
assert_eq!(
items[1].as_event().unwrap().content().as_message().unwrap().body(),
"Threaded event 3"
);
assert_eq!(
items[2].as_event().unwrap().content().as_message().unwrap().body(),
"Threaded event 4"
);
let hit_start = timeline.paginate_backwards(100).await.unwrap();
assert!(hit_start);
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
// Remove date separator and insert a new one plus the remaining threaded
// events and the thread root
assert_eq!(timeline_updates.len(), 5);
println!("Stefan: {:?}", timeline_updates);
// Check the timeline diffs
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[0]);
assert_eq!(value.as_event().unwrap().event_id().unwrap(), event_id!("$2"));
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[1]);
assert_eq!(value.as_event().unwrap().event_id().unwrap(), event_id!("$1"));
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[2]);
assert_eq!(value.as_event().unwrap().event_id().unwrap(), event_id!("$root"));
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[3]);
assert!(value.is_date_divider());
assert_let!(VectorDiff::Remove { index: 4 } = &timeline_updates[4]);
// Check the final items
let items = timeline.items().await;
assert!(items[0].is_date_divider());
assert_eq!(items[1].as_event().unwrap().content().as_message().unwrap().body(), "Thread root");
assert_eq!(
items[2].as_event().unwrap().content().as_message().unwrap().body(),
"Threaded event 1"
);
assert_eq!(
items[3].as_event().unwrap().content().as_message().unwrap().body(),
"Threaded event 2"
);
assert_eq!(
items[4].as_event().unwrap().content().as_message().unwrap().body(),
"Threaded event 3"
);
assert_eq!(
items[5].as_event().unwrap().content().as_message().unwrap().body(),
"Threaded event 4"
);
}