feat(timeline): use the event-focused caches in the timeline

This commit is contained in:
Benjamin Bouvier
2026-02-12 08:18:10 +01:00
parent 4ec7d99d71
commit 188d3a2ead
12 changed files with 305 additions and 449 deletions

View File

@@ -25,11 +25,11 @@ use super::{
};
use crate::{
timeline::{
TimelineReadReceiptTracking,
PaginationError, TimelineReadReceiptTracking,
controller::spawn_crypto_tasks,
tasks::{
pinned_events_task, room_event_cache_updates_task, room_send_queue_update_task,
thread_updates_task,
event_focused_task, pinned_events_task, room_event_cache_updates_task,
room_send_queue_update_task, thread_updates_task,
},
},
unable_to_decrypt_hook::UtdHookManager,
@@ -198,6 +198,26 @@ impl TimelineBuilder {
None
};
let event_focused_join_handle =
if let TimelineFocus::Event { target, thread_mode, .. } = &focus {
let cache = room_event_cache
.get_event_focused_cache(target.clone(), (*thread_mode).into())
.await?
.ok_or(Error::PaginationError(PaginationError::MissingCache))?;
let (_initial_events, recv) = cache.subscribe().await;
Some(spawn(event_focused_task(
target.clone(),
(*thread_mode).into(),
room_event_cache.clone(),
controller.clone(),
recv,
)))
} else {
None
};
let room_update_join_handle = spawn({
let span = info_span!(
parent: Span::none(),
@@ -281,6 +301,7 @@ impl TimelineBuilder {
room_update_join_handle,
thread_update_join_handle,
pinned_events_join_handle,
event_focused_join_handle,
local_echo_listener_handle,
_event_cache_drop_handle: event_cache_drop,
}),

View File

@@ -12,7 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{collections::BTreeSet, fmt, sync::Arc};
use std::{
collections::BTreeSet,
fmt,
sync::{Arc, OnceLock},
};
use as_variant::as_variant;
use eyeball_im::{VectorDiff, VectorSubscriberStream};
@@ -22,10 +26,10 @@ use imbl::Vector;
#[cfg(test)]
use matrix_sdk::Result;
use matrix_sdk::{
config::RequestConfig,
deserialized_responses::TimelineEvent,
event_cache::{DecryptionRetryRequest, RoomEventCache, RoomPaginationStatus},
paginators::{PaginationResult, PaginationToken, Paginator},
event_cache::{
DecryptionRetryRequest, EventFocusThreadMode, RoomEventCache, RoomPaginationStatus,
},
send_queue::{
LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
},
@@ -41,13 +45,13 @@ use ruma::{
poll::unstable_start::UnstablePollStartEventContent,
reaction::ReactionEventContent,
receipt::{Receipt, ReceiptThread, ReceiptType},
relation::{Annotation, RelationType},
relation::Annotation,
room::message::{MessageType, Relation},
},
room_version_rules::RoomVersionRules,
serde::Raw,
};
use tokio::sync::{OnceCell, RwLock, RwLockWriteGuard};
use tokio::sync::{RwLock, RwLockWriteGuard};
use tracing::{debug, error, field::debug, info, instrument, trace, warn};
pub(super) use self::{
@@ -90,8 +94,6 @@ mod state_transaction;
pub(super) use aggregations::*;
pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
use matrix_sdk::paginators::{PaginatorError, thread::ThreadedEventsLoader};
use matrix_sdk_common::serde_helpers::extract_thread_root;
/// Data associated to the current timeline focus.
///
@@ -99,7 +101,7 @@ use matrix_sdk_common::serde_helpers::extract_thread_root;
/// version of it, including extra state that makes it useful over the lifetime
/// of a timeline.
#[derive(Debug)]
pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
pub(in crate::timeline) enum TimelineFocusKind {
/// The timeline receives live events from the sync.
Live {
/// Whether to hide in-thread events from the timeline.
@@ -109,8 +111,19 @@ pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
/// The timeline is focused on a single event, and it can expand in one
/// direction or another.
Event {
/// The paginator instance.
paginator: OnceCell<AnyPaginator<P>>,
/// The focused event ID.
focused_event_id: OwnedEventId,
/// If the focused event is part or the root of a thread, what's the
/// thread root?
///
/// This is determined once when initializing the event-focused cache,
/// and then it won't change for the duration of this timeline.
thread_root: OnceLock<OwnedEventId>,
/// The thread mode to use for this event-focused timeline, which is
/// part of the key for the memoized event-focused cache.
thread_mode: TimelineEventFocusThreadMode,
},
/// A live timeline for a thread.
@@ -122,82 +135,7 @@ pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
PinnedEvents,
}
#[derive(Debug)]
pub(in crate::timeline) enum AnyPaginator<P: RoomDataProvider> {
Unthreaded {
/// The actual event paginator.
paginator: Paginator<P>,
/// Whether to hide in-thread events from the timeline.
hide_threaded_events: bool,
},
Threaded(ThreadedEventsLoader<P>),
}
impl<P: RoomDataProvider> AnyPaginator<P> {
/// Runs a backward pagination (requesting `num_events` to the server), from
/// the current state of the object.
///
/// Will return immediately if we have already hit the start of the
/// timeline.
///
/// May return an error if it's already paginating, or if the call to
/// the homeserver endpoints failed.
pub async fn paginate_backwards(
&self,
num_events: u16,
) -> Result<PaginationResult, PaginatorError> {
match self {
Self::Unthreaded { paginator, .. } => {
paginator.paginate_backward(num_events.into()).await
}
Self::Threaded(threaded_paginator) => {
threaded_paginator.paginate_backwards(num_events.into()).await
}
}
}
/// Runs a forward pagination (requesting `num_events` to the server), from
/// the current state of the object.
///
/// Will return immediately if we have already hit the end of the timeline.
///
/// May return an error if it's already paginating, or if the call to
/// the homeserver endpoints failed.
pub async fn paginate_forwards(
&self,
num_events: u16,
) -> Result<PaginationResult, PaginatorError> {
match self {
Self::Unthreaded { paginator, .. } => {
paginator.paginate_forward(num_events.into()).await
}
Self::Threaded(threaded_paginator) => {
threaded_paginator.paginate_forwards(num_events.into()).await
}
}
}
/// Whether to hide in-thread events from the timeline.
pub fn hide_threaded_events(&self) -> bool {
match self {
Self::Unthreaded { hide_threaded_events, .. } => *hide_threaded_events,
Self::Threaded(_) => false,
}
}
/// Returns the root event id of the thread, if the paginator is
/// [`AnyPaginator::Threaded`].
pub fn thread_root(&self) -> Option<&EventId> {
match self {
Self::Unthreaded { .. } => None,
Self::Threaded(thread_events_loader) => {
Some(thread_events_loader.thread_root_event_id())
}
}
}
}
impl<P: RoomDataProvider> TimelineFocusKind<P> {
impl TimelineFocusKind {
/// Returns the [`ReceiptThread`] that should be used for the current
/// timeline focus.
///
@@ -218,8 +156,11 @@ impl<P: RoomDataProvider> TimelineFocusKind<P> {
fn hide_threaded_events(&self) -> bool {
match self {
TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
TimelineFocusKind::Event { paginator } => {
paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
TimelineFocusKind::Event { thread_mode, .. } => {
matches!(
thread_mode,
TimelineEventFocusThreadMode::Automatic { hide_threaded_events: true }
)
}
TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents => false,
}
@@ -234,9 +175,7 @@ impl<P: RoomDataProvider> TimelineFocusKind<P> {
/// If the focus is a thread, returns its root event ID.
fn thread_root(&self) -> Option<&EventId> {
match self {
TimelineFocusKind::Event { paginator, .. } => {
paginator.get().and_then(|paginator| paginator.thread_root())
}
TimelineFocusKind::Event { thread_root, .. } => thread_root.get().map(|v| &**v),
TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents => None,
TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
}
@@ -249,7 +188,7 @@ pub(super) struct TimelineController<P: RoomDataProvider = Room> {
state: Arc<RwLock<TimelineState<P>>>,
/// Focus data.
focus: Arc<TimelineFocusKind<P>>,
focus: Arc<TimelineFocusKind>,
/// A [`RoomDataProvider`] implementation, providing data.
///
@@ -394,7 +333,14 @@ impl<P: RoomDataProvider> TimelineController<P> {
TimelineFocusKind::Live { hide_threaded_events }
}
TimelineFocus::Event { .. } => TimelineFocusKind::Event { paginator: OnceCell::new() },
TimelineFocus::Event { target, thread_mode, .. } => {
TimelineFocusKind::Event {
focused_event_id: target,
// This will be initialized in `Self::init_focus`.
thread_root: OnceLock::new(),
thread_mode,
}
}
TimelineFocus::Thread { root_event_id, .. } => {
TimelineFocusKind::Thread { root_event_id }
@@ -451,132 +397,45 @@ impl<P: RoomDataProvider> TimelineController<P> {
}
TimelineFocus::Event { target: event_id, num_context_events, thread_mode } => {
let TimelineFocusKind::Event { paginator, .. } = &*self.focus else {
// NOTE: this is sync'd with code in the ctor.
unreachable!();
};
let event_paginator = Paginator::new(self.room_data_provider.clone());
let load_events_with_context = || async {
// Start a /context request to load the focused event and surrounding events.
event_paginator
.start_from(event_id, (*num_context_events).into())
.await
.map(|r| r.events)
.map_err(PaginationError::Paginator)
};
let events = if *num_context_events == 0 {
// If no context is requested, try to load the event from the cache first and
// include common relations such as reactions and edits.
let request_config = Some(RequestConfig::default().retry_limit(3));
let relations_filter =
Some(vec![RelationType::Annotation, RelationType::Replacement]);
// Load the event from the cache or, failing that, the server.
match self
.room_data_provider
.load_event_with_relations(event_id, request_config, relations_filter)
.await
{
Ok((event, related_events)) => {
let mut events = vec![event];
events.extend(related_events);
events
}
Err(err) => {
error!("error when loading focussed event: {err}");
// Fall back to load the focused event using /context.
load_events_with_context().await?
}
// Use the event-focused cache from the event cache layer.
let event_cache_thread_mode = match thread_mode {
TimelineEventFocusThreadMode::ForceThread => EventFocusThreadMode::ForceThread,
TimelineEventFocusThreadMode::Automatic { .. } => {
EventFocusThreadMode::Automatic
}
} else {
// Start a /context request to load the focussed event and surrounding events.
load_events_with_context().await?
};
// Find the target event, and see if it's part of a thread.
let extracted_thread_root = events
.iter()
.find(
|event| {
if let Some(id) = event.event_id() { id == *event_id } else { false }
},
let cache = room_event_cache
.get_or_create_event_focused_cache(
event_id.clone(),
*num_context_events,
event_cache_thread_mode,
)
.and_then(|event| extract_thread_root(event.raw()));
.await
.map_err(PaginationError::EventCache)?;
// Determine the timeline's threading behavior.
let (thread_root_event_id, hide_threaded_events) = match thread_mode {
TimelineEventFocusThreadMode::ForceThread => {
// If the event is part of a thread, use its thread root. Otherwise,
// assume the event itself is the thread root.
(extracted_thread_root.or_else(|| Some(event_id.clone())), false)
}
TimelineEventFocusThreadMode::Automatic { hide_threaded_events } => {
(extracted_thread_root, *hide_threaded_events)
}
};
let _ = paginator.set(match thread_root_event_id {
Some(root_id) => {
let mut tokens = event_paginator.tokens();
// Look if the thread root event is part of the /context response. This
// allows us to spare some backwards pagination with
// /relations.
let includes_root_event = events.iter().any(|event| {
if let Some(id) = event.event_id() { id == root_id } else { false }
});
if includes_root_event {
// If we have the root event, there's no need to do back-paginations
// with /relations, since we are at the start of the thread.
tokens.previous = PaginationToken::HitEnd;
}
AnyPaginator::Threaded(ThreadedEventsLoader::new(
self.room_data_provider.clone(),
root_id,
tokens,
))
}
None => AnyPaginator::Unthreaded {
paginator: event_paginator,
hide_threaded_events,
},
});
let (events, _receiver) = cache.subscribe().await;
let has_events = !events.is_empty();
match paginator.get().expect("Paginator was not instantiated") {
AnyPaginator::Unthreaded { .. } => {
self.replace_with_initial_remote_events(
events,
RemoteEventOrigin::Pagination,
)
.await;
// Ask the cache for the thread root, if it managed to extract one or decided
// that the target event was the thread root.
match &*self.focus {
TimelineFocusKind::Event { thread_root: focus_thread_root, .. } => {
if let Some(thread_root) = cache.thread_root().await {
focus_thread_root.get_or_init(|| thread_root);
}
}
AnyPaginator::Threaded(threaded_events_loader) => {
// We filter only events that are part of the thread (including the root),
// since /context will return adjacent events without filters.
let thread_root = threaded_events_loader.thread_root_event_id();
let events_in_thread = events.into_iter().filter(|event| {
extract_thread_root(event.raw())
.is_some_and(|event_thread_root| event_thread_root == thread_root)
|| event.event_id().as_deref() == Some(thread_root)
});
self.replace_with_initial_remote_events(
events_in_thread,
RemoteEventOrigin::Pagination,
)
.await;
TimelineFocusKind::Live { .. }
| TimelineFocusKind::Thread { .. }
| TimelineFocusKind::PinnedEvents => {
panic!("unexpected focus for an event-focused timeline")
}
}
self.replace_with_initial_remote_events(events, RemoteEventOrigin::Pagination)
.await;
Ok(has_events)
}
@@ -682,71 +541,6 @@ impl<P: RoomDataProvider> TimelineController<P> {
needs
}
/// Run a backwards pagination (in focused mode) and append the results to
/// the timeline.
///
/// Returns whether we hit the start of the timeline.
pub(super) async fn focused_paginate_backwards(
&self,
num_events: u16,
) -> Result<bool, PaginationError> {
let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
TimelineFocusKind::Live { .. }
| TimelineFocusKind::PinnedEvents
| TimelineFocusKind::Thread { .. } => {
return Err(PaginationError::NotSupported);
}
TimelineFocusKind::Event { paginator, .. } => paginator
.get()
.expect("Paginator was not instantiated")
.paginate_backwards(num_events)
.await
.map_err(PaginationError::Paginator)?,
};
// Events are in reverse topological order.
// We can push front each event individually.
self.handle_remote_events_with_diffs(
events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
RemoteEventOrigin::Pagination,
)
.await;
Ok(hit_end_of_timeline)
}
/// Run a forwards pagination (in focused mode) and append the results to
/// the timeline.
///
/// Returns whether we hit the end of the timeline.
pub(super) async fn focused_paginate_forwards(
&self,
num_events: u16,
) -> Result<bool, PaginationError> {
let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
TimelineFocusKind::Live { .. }
| TimelineFocusKind::PinnedEvents
| TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported),
TimelineFocusKind::Event { paginator, .. } => paginator
.get()
.expect("Paginator was not instantiated")
.paginate_forwards(num_events)
.await
.map_err(PaginationError::Paginator)?,
};
// Events are in topological order.
// We can append all events with no transformation.
self.handle_remote_events_with_diffs(
vec![VectorDiff::Append { values: events.into() }],
RemoteEventOrigin::Pagination,
)
.await;
Ok(hit_end_of_timeline)
}
/// Is this timeline receiving events from sync (aka has a live focus)?
pub(super) fn is_live(&self) -> bool {
matches!(&*self.focus, TimelineFocusKind::Live { .. })
@@ -1776,8 +1570,9 @@ impl TimelineController {
let filter_out_thread_events = match self.focus() {
TimelineFocusKind::Thread { .. } => false,
TimelineFocusKind::Live { hide_threaded_events } => hide_threaded_events.to_owned(),
TimelineFocusKind::Event { paginator } => {
paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
TimelineFocusKind::Event { .. } => {
// For event-focused timelines, filtering is handled in the event cache layer.
false
}
_ => true,
};
@@ -1847,7 +1642,7 @@ impl TimelineController {
impl<P: RoomDataProvider> TimelineController<P> {
/// Returns the timeline focus of the [`TimelineController`].
pub(super) fn focus(&self) -> &TimelineFocusKind<P> {
pub(super) fn focus(&self) -> &TimelineFocusKind {
&self.focus
}
}

View File

@@ -45,12 +45,15 @@ pub(in crate::timeline) struct TimelineState<P: RoomDataProvider> {
pub meta: TimelineMetadata,
/// The kind of focus of this timeline.
pub(super) focus: Arc<TimelineFocusKind<P>>,
pub(super) focus: Arc<TimelineFocusKind>,
/// Phantom data for the room data provider.
_phantom: std::marker::PhantomData<P>,
}
impl<P: RoomDataProvider> TimelineState<P> {
pub(super) fn new(
focus: Arc<TimelineFocusKind<P>>,
focus: Arc<TimelineFocusKind>,
own_user_id: OwnedUserId,
room_version_rules: RoomVersionRules,
internal_id_prefix: Option<String>,
@@ -67,6 +70,7 @@ impl<P: RoomDataProvider> TimelineState<P> {
is_room_encrypted,
),
focus,
_phantom: std::marker::PhantomData,
}
}
@@ -252,6 +256,6 @@ impl<P: RoomDataProvider> TimelineState<P> {
}
pub(super) fn transaction(&mut self) -> TimelineStateTransaction<'_, P> {
TimelineStateTransaction::new(&mut self.items, &mut self.meta, &*self.focus)
TimelineStateTransaction::new(&mut self.items, &mut self.meta, &self.focus)
}
}

View File

@@ -66,7 +66,10 @@ pub(in crate::timeline) struct TimelineStateTransaction<'a, P: RoomDataProvider>
previous_meta: &'a mut TimelineMetadata,
/// The kind of focus of this timeline.
pub focus: &'a TimelineFocusKind<P>,
pub focus: &'a TimelineFocusKind,
/// Phantom data for type parameter.
_phantom: std::marker::PhantomData<P>,
}
impl<'a, P: RoomDataProvider> TimelineStateTransaction<'a, P> {
@@ -74,7 +77,7 @@ impl<'a, P: RoomDataProvider> TimelineStateTransaction<'a, P> {
pub(super) fn new(
items: &'a mut ObservableItems,
meta: &'a mut TimelineMetadata,
focus: &'a TimelineFocusKind<P>,
focus: &'a TimelineFocusKind,
) -> Self {
let previous_meta = meta;
let meta = previous_meta.clone();
@@ -86,6 +89,7 @@ impl<'a, P: RoomDataProvider> TimelineStateTransaction<'a, P> {
previous_meta,
meta,
focus,
_phantom: std::marker::PhantomData,
}
}
@@ -485,14 +489,9 @@ impl<'a, P: RoomDataProvider> TimelineStateTransaction<'a, P> {
true
}
TimelineFocusKind::Event { paginator } => {
// If the timeline's filtering out in-thread events, don't add items for
// threaded events.
let hide_threaded_events =
paginator.get().is_some_and(|paginator| paginator.hide_threaded_events());
if thread_root.is_some() && hide_threaded_events {
return false;
}
TimelineFocusKind::Event { .. } => {
// For event-focused timelines, thread filtering is now handled in the
// event cache layer. We accept all events from pagination.
// Retrieve the origin of the event.
let origin = match position {

View File

@@ -113,6 +113,14 @@ pub enum PaginationError {
#[error("Error when paginating.")]
Paginator(#[source] PaginatorError),
/// An error occurred in the event cache.
#[error("Error in event cache.")]
EventCache(#[source] EventCacheError),
/// The focused event doesn't have an attached cache.
#[error("Missing cache for focused event")]
MissingCache,
#[error("Pagination type not supported in this focus mode")]
NotSupported,
}

View File

@@ -29,7 +29,7 @@ use matrix_sdk::{
Result,
attachment::{AttachmentInfo, Thumbnail},
deserialized_responses::TimelineEvent,
event_cache::{EventCacheDropHandles, RoomEventCache},
event_cache::{EventCacheDropHandles, EventFocusThreadMode, RoomEventCache},
executor::JoinHandle,
room::{
Receipts, Room,
@@ -151,20 +151,25 @@ pub enum TimelineFocus {
/// Options for controlling the behaviour of [`TimelineFocus::Event`]
/// for threaded events.
#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum TimelineEventFocusThreadMode {
/// Force the timeline into threaded mode. When the focused event is part of
/// a thread, the timeline will be focused on that thread's root. Otherwise,
/// the timeline will treat the target event itself as the thread root.
/// Threaded events will never be hidden.
/// Force the timeline into threaded mode.
///
/// When the focused event is part of a thread, the timeline will be focused
/// on that thread's root. Otherwise, the timeline will treat the target
/// event itself as the thread root. Threaded events will never be
/// hidden.
ForceThread,
/// Automatically determine if the target event is
/// part of a thread or not. If the event is part of a thread, the timeline
/// Automatically determine if the target event is part of a thread or not.
///
/// If the event is part of a thread, the timeline
/// will be filtered to on-thread events.
Automatic {
/// When the target event is not part of a thread, whether to
/// hide in-thread replies from the live timeline. Has no effect
/// when the target event is part of a thread.
/// hide in-thread replies from the live timeline.
///
/// Has no effect when the target event is part of a thread.
///
/// This should be set to true when the client can create
/// [`TimelineFocus::Thread`]-focused timelines from the thread roots
@@ -173,6 +178,15 @@ pub enum TimelineEventFocusThreadMode {
},
}
impl From<TimelineEventFocusThreadMode> for EventFocusThreadMode {
fn from(val: TimelineEventFocusThreadMode) -> Self {
match val {
TimelineEventFocusThreadMode::ForceThread => EventFocusThreadMode::ForceThread,
TimelineEventFocusThreadMode::Automatic { .. } => EventFocusThreadMode::Automatic,
}
}
}
impl TimelineFocus {
pub(super) fn debug_string(&self) -> String {
match self {
@@ -868,6 +882,7 @@ impl Timeline {
struct TimelineDropHandle {
room_update_join_handle: JoinHandle<()>,
pinned_events_join_handle: Option<JoinHandle<()>>,
event_focused_join_handle: Option<JoinHandle<()>>,
thread_update_join_handle: Option<JoinHandle<()>>,
local_echo_listener_handle: JoinHandle<()>,
_event_cache_drop_handle: Arc<EventCacheDropHandles>,
@@ -884,6 +899,10 @@ impl Drop for TimelineDropHandle {
handle.abort();
}
if let Some(handle) = self.event_focused_join_handle.take() {
handle.abort();
}
self.local_echo_listener_handle.abort();
self.room_update_join_handle.abort();
}

View File

@@ -20,7 +20,10 @@ use matrix_sdk::event_cache::{self, EventCacheError, RoomPaginationStatus};
use tracing::{instrument, warn};
use super::Error;
use crate::timeline::{PaginationError::NotSupported, controller::TimelineFocusKind};
use crate::timeline::{
PaginationError::{self, NotSupported},
controller::TimelineFocusKind,
};
impl super::Timeline {
/// Add more events to the start of the timeline.
@@ -50,13 +53,21 @@ impl super::Timeline {
Ok(self.live_paginate_backwards(num_events).await?)
}
TimelineFocusKind::Event { .. } => {
Ok(self.controller.focused_paginate_backwards(num_events).await?)
}
TimelineFocusKind::Event { focused_event_id, thread_mode, .. } => Ok(self
.event_cache
.get_event_focused_cache(focused_event_id.clone(), (*thread_mode).into())
.await?
.ok_or(PaginationError::MissingCache)?
.paginate_backwards(num_events)
.await?
.hit_end_of_timeline),
TimelineFocusKind::Thread { root_event_id } => Ok(self
.event_cache
.paginate_thread_backwards(root_event_id.to_owned(), num_events)
.await?),
TimelineFocusKind::PinnedEvents => Err(Error::PaginationError(NotSupported)),
}
}
@@ -66,10 +77,21 @@ impl super::Timeline {
/// Returns whether we hit the end of the timeline.
#[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
pub async fn paginate_forwards(&self, num_events: u16) -> Result<bool, Error> {
if self.controller.is_live() {
Ok(true)
} else {
Ok(self.controller.focused_paginate_forwards(num_events).await?)
match self.controller.focus() {
TimelineFocusKind::Live { .. } => Ok(true),
TimelineFocusKind::Event { focused_event_id, thread_mode, .. } => Ok(self
.event_cache
.get_event_focused_cache(focused_event_id.clone(), (*thread_mode).into())
.await?
.ok_or(PaginationError::MissingCache)?
.paginate_forwards(num_events)
.await?
.hit_end_of_timeline),
TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents => {
Err(Error::PaginationError(NotSupported))
}
}
}

View File

@@ -18,8 +18,8 @@ use std::collections::BTreeSet;
use matrix_sdk::{
event_cache::{
EventsOrigin, RoomEventCache, RoomEventCacheSubscriber, RoomEventCacheUpdate,
TimelineVectorDiffs,
EventFocusThreadMode, EventsOrigin, RoomEventCache, RoomEventCacheSubscriber,
RoomEventCacheUpdate, TimelineVectorDiffs,
},
send_queue::RoomSendQueueUpdate,
};
@@ -49,7 +49,7 @@ pub(in crate::timeline) async fn pinned_events_task(
Ok(up) => up,
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(num_skipped)) => {
warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
warn!(num_skipped, "Lagged behind pinned-event cache updates, resetting timeline");
// The updates might have lagged, but the room event cache might have
// events, so retrieve them and add them back again to the timeline,
@@ -84,6 +84,70 @@ pub(in crate::timeline) async fn pinned_events_task(
}
}
/// Long-lived task, in the event focus mode, that updates the timeline after
/// any changes to the underlying timeline.
#[instrument(
skip_all,
fields(
room_id = %timeline_controller.room().room_id(),
focused_event_id = %focused_event,
?thread_mode
)
)]
pub(in crate::timeline) async fn event_focused_task(
focused_event: OwnedEventId,
thread_mode: EventFocusThreadMode,
room_event_cache: RoomEventCache,
timeline_controller: TimelineController,
mut event_focused_events_recv: Receiver<TimelineVectorDiffs>,
) {
loop {
trace!("Waiting for an event.");
let update = match event_focused_events_recv.recv().await {
Ok(up) => up,
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(num_skipped)) => {
warn!(num_skipped, "Lagged behind focused-event cache updates, resetting timeline");
// The updates might have lagged, but the room event cache might have
// events, so retrieve them and add them back again to the timeline,
// after clearing it.
let cache = match room_event_cache
.get_event_focused_cache(focused_event.clone(), thread_mode)
.await
{
Ok(Some(cache)) => cache,
Ok(None) => {
error!("Focused event timeline doesn't have an attached cache");
break;
}
Err(err) => {
error!(%err, "Failed to get the focused cache for the focused event");
break;
}
};
let (initial_events, _) = cache.subscribe().await;
timeline_controller
.replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
.await;
continue;
}
};
trace!("Received new timeline events diffs");
let origin = match update.origin {
EventsOrigin::Sync => RemoteEventOrigin::Sync,
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
EventsOrigin::Cache => RemoteEventOrigin::Cache,
};
timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
}
}
/// For a thread-focused timeline, a long-lived task that will listen to the
/// underlying thread updates.
pub(in crate::timeline) async fn thread_updates_task(

View File

@@ -27,8 +27,6 @@ use futures_core::Stream;
use imbl::vector;
use indexmap::IndexMap;
use matrix_sdk::{
BoxFuture,
config::RequestConfig,
deserialized_responses::TimelineEvent,
paginators::{PaginableRoom, PaginatorError, thread::PaginableThread},
room::{EventWithContextResponse, Messages, MessagesOptions, Relations},
@@ -43,7 +41,7 @@ use ruma::{
AnyMessageLikeEventContent, AnyTimelineEvent,
reaction::ReactionEventContent,
receipt::{Receipt, ReceiptThread, ReceiptType},
relation::{Annotation, RelationType},
relation::Annotation,
},
room_version_rules::RoomVersionRules,
serde::Raw,
@@ -382,13 +380,4 @@ impl RoomDataProvider for TestRoomDataProvider {
async fn load_event<'a>(&'a self, _event_id: &'a EventId) -> matrix_sdk::Result<TimelineEvent> {
unimplemented!();
}
fn load_event_with_relations<'a>(
&'a self,
_event_id: &'a EventId,
_request_config: Option<RequestConfig>,
_related_event_filters: Option<Vec<RelationType>>,
) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), matrix_sdk::Error>> {
unimplemented!();
}
}

View File

@@ -17,8 +17,7 @@ use std::future::Future;
use eyeball::Subscriber;
use indexmap::IndexMap;
use matrix_sdk::{
BoxFuture, Result, Room, SendOutsideWasm,
config::RequestConfig,
Result, Room, SendOutsideWasm,
deserialized_responses::TimelineEvent,
paginators::{PaginableRoom, thread::PaginableThread},
};
@@ -29,7 +28,6 @@ use ruma::{
AnyMessageLikeEventContent,
fully_read::FullyReadEventContent,
receipt::{Receipt, ReceiptThread, ReceiptType},
relation::RelationType,
},
room_version_rules::RoomVersionRules,
};
@@ -138,19 +136,6 @@ pub(super) trait RoomDataProvider:
&'a self,
event_id: &'a EventId,
) -> impl Future<Output = Result<TimelineEvent>> + SendOutsideWasm + 'a;
/// Load a single room event using the cache or network and any events
/// related to it, if they are cached.
///
/// You can control which types of related events are retrieved using
/// `related_event_filters`. A `None` value will retrieve any type of
/// related event.
fn load_event_with_relations<'a>(
&'a self,
event_id: &'a EventId,
request_config: Option<RequestConfig>,
related_event_filters: Option<Vec<RelationType>>,
) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), matrix_sdk::Error>>;
}
impl RoomDataProvider for Room {
@@ -259,17 +244,4 @@ impl RoomDataProvider for Room {
async fn load_event<'a>(&'a self, event_id: &'a EventId) -> Result<TimelineEvent> {
self.load_or_fetch_event(event_id, None).await
}
fn load_event_with_relations<'a>(
&'a self,
event_id: &'a EventId,
request_config: Option<RequestConfig>,
related_event_filters: Option<Vec<RelationType>>,
) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), matrix_sdk::Error>> {
Box::pin(self.load_or_fetch_event_with_relations(
event_id,
related_event_filters,
request_config,
))
}
}

View File

@@ -16,10 +16,11 @@
use std::time::Duration;
use assert_matches2::{assert_let, assert_matches};
use assert_matches2::assert_let;
use eyeball_im::VectorDiff;
use futures_util::StreamExt;
use matrix_sdk::{
assert_let_timeout,
config::{SyncSettings, SyncToken},
test_utils::{
logged_in_client_with_server,
@@ -30,10 +31,7 @@ use matrix_sdk_test::{
ALICE, BOB, JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
mocks::mock_encryption_state,
};
use matrix_sdk_ui::timeline::{
TimelineBuilder, TimelineEventFocusThreadMode, TimelineFocus, TimelineItemKind,
VirtualTimelineItem,
};
use matrix_sdk_ui::timeline::{TimelineBuilder, TimelineEventFocusThreadMode, TimelineFocus};
use ruma::{event_id, events::room::message::RoomMessageEventContent, room_id};
use stream_assert::assert_pending;
use tokio::time::sleep;
@@ -136,27 +134,20 @@ async fn test_new_focused() {
server.reset().await;
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
assert_eq!(timeline_updates.len(), 4);
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 2);
assert_let!(VectorDiff::PushFront { value: message } = &timeline_updates[0]);
assert_eq!(
message.as_event().unwrap().content().as_message().unwrap().body(),
"And even though I tried, it all fell apart"
);
assert_let!(VectorDiff::PushFront { value: message } = &timeline_updates[1]);
assert_let!(VectorDiff::Insert { index: 1, value: message } = &timeline_updates[0]);
assert_eq!(
message.as_event().unwrap().content().as_message().unwrap().body(),
"I kept everything inside"
);
// Date divider post processing.
assert_let!(VectorDiff::PushFront { value: item } = &timeline_updates[2]);
assert!(item.is_date_divider());
assert_let!(VectorDiff::Remove { index } = &timeline_updates[3]);
assert_eq!(*index, 3);
assert_let!(VectorDiff::Insert { index: 2, value: message } = &timeline_updates[1]);
assert_eq!(
message.as_event().unwrap().content().as_message().unwrap().body(),
"And even though I tried, it all fell apart"
);
// Now trigger a forward pagination.
mock_messages(
@@ -176,7 +167,7 @@ async fn test_new_focused() {
server.reset().await;
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 2);
assert_let!(VectorDiff::PushBack { value: message } = &timeline_updates[0]);
@@ -508,16 +499,10 @@ async fn test_focused_timeline_handles_threaded_event() {
assert!(!start_of_timeline);
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
assert_eq!(timeline_updates.len(), 3);
// The new item loaded is added at the start
assert_let!(VectorDiff::PushFront { value: item } = &timeline_updates[0]);
assert_eq!(timeline_updates.len(), 1);
// The new item loaded is inserted at the start, just after the date divider.
assert_let!(VectorDiff::Insert { index: 1, value: item } = &timeline_updates[0]);
assert_eq!(item.as_event().unwrap().content().as_message().unwrap().body(), "Prev");
// So is the new date divider
assert_let!(VectorDiff::PushFront { value: item } = &timeline_updates[1]);
assert_matches!(item.kind(), TimelineItemKind::Virtual(VirtualTimelineItem::DateDivider(_)));
// The previous date divider is removed
assert_let!(VectorDiff::Remove { index } = &timeline_updates[2]);
assert_eq!(*index, 2);
// We paginate back until the start of the timeline, which will trigger an
// /event request for the initial item.
@@ -548,16 +533,11 @@ async fn test_focused_timeline_handles_threaded_event() {
assert!(start_of_timeline);
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
assert_eq!(timeline_updates.len(), 3);
// Same as before, the previous event is added at the front
assert_let!(VectorDiff::PushFront { value: item } = &timeline_updates[0]);
assert_eq!(timeline_updates.len(), 1);
// Same as before, the previous event is inserted at the front, after the date
// divider.
assert_let!(VectorDiff::Insert { index: 1, value: item } = &timeline_updates[0]);
assert_eq!(item.as_event().unwrap().content().as_message().unwrap().body(), "Root");
// Then the new date divider
assert_let!(VectorDiff::PushFront { value: item } = &timeline_updates[1]);
assert_matches!(item.kind(), TimelineItemKind::Virtual(VirtualTimelineItem::DateDivider(_)));
// And the old date divider is removed
assert_let!(VectorDiff::Remove { index } = &timeline_updates[2]);
assert_eq!(*index, 2);
// Then we paginate forwards
server
@@ -628,13 +608,11 @@ async fn test_focused_timeline_handles_thread_root_event_when_forcing_threaded_m
let thread_root = event_id!("$root:example.org");
let thread_root_event = f.text_msg("Hey").event_id(thread_root).into_event();
// Mock the initial /event and /relations requests to fetch the focussed event
// and its common relations.
server.mock_room_event().match_event_id().ok(thread_root_event).mock_once().mount().await;
// Mock the initial /context request to fetch the focussed event.
server
.mock_room_relations()
.match_target_event(thread_root.to_owned())
.ok(RoomRelationsResponseTemplate::default())
.mock_room_event_context()
.match_event_id()
.ok(RoomContextResponseTemplate::new(thread_root_event).end("next_token_1"))
.mock_once()
.mount()
.await;
@@ -671,6 +649,7 @@ async fn test_focused_timeline_handles_thread_root_event_when_forcing_threaded_m
let prev_event_id = event_id!("$prev:example.org");
server
.mock_room_relations()
.match_from("next_token_1")
.ok(RoomRelationsResponseTemplate {
chunk: vec![
f.text_msg("Next1")
@@ -742,13 +721,11 @@ async fn test_focused_timeline_handles_other_thread_event_when_forcing_threaded_
.event_id(threaded_event_id)
.into_event();
// Mock the initial /event and /relations requests to fetch the focussed event
// and its common relations.
server.mock_room_event().match_event_id().ok(threaded_event).mock_once().mount().await;
// Mock the initial /context request to fetch the focussed event.
server
.mock_room_relations()
.match_target_event(threaded_event_id.to_owned())
.ok(RoomRelationsResponseTemplate::default())
.mock_room_event_context()
.match_event_id()
.ok(RoomContextResponseTemplate::new(threaded_event).start("prev_token").end("next_token"))
.mock_once()
.mount()
.await;
@@ -780,6 +757,7 @@ async fn test_focused_timeline_handles_other_thread_event_when_forcing_threaded_
// an /event request for the thread root.
server
.mock_room_relations()
.match_from("prev_token")
.ok(RoomRelationsResponseTemplate::default())
.mock_once()
.mount()
@@ -790,22 +768,17 @@ async fn test_focused_timeline_handles_other_thread_event_when_forcing_threaded_
timeline.paginate_backwards(10).await.expect("Could not paginate backwards");
assert!(end_of_timeline);
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
assert_eq!(timeline_updates.len(), 3);
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 1);
// The new item loaded is added at the start.
assert_let!(VectorDiff::PushFront { value: item } = &timeline_updates[0]);
assert_let!(VectorDiff::Insert { index: 1, value: item } = &timeline_updates[0]);
assert_eq!(item.as_event().unwrap().content().as_message().unwrap().body(), "Hey");
// So is the new date divider.
assert_let!(VectorDiff::PushFront { value: item } = &timeline_updates[1]);
assert_matches!(item.kind(), TimelineItemKind::Virtual(VirtualTimelineItem::DateDivider(_)));
// The previous date divider is removed
assert_let!(VectorDiff::Remove { index } = &timeline_updates[2]);
assert_eq!(*index, 2);
// We paginate forwards once and hit the end of the thread.
let next_event_id = event_id!("$prev:example.org");
server
.mock_room_relations()
.match_from("next_token")
.ok(RoomRelationsResponseTemplate {
chunk: vec![
f.text_msg("Next")
@@ -826,7 +799,7 @@ async fn test_focused_timeline_handles_other_thread_event_when_forcing_threaded_
timeline.paginate_forwards(10).await.expect("Could not paginate forwards");
assert!(end_of_timeline);
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 1);
assert_let!(VectorDiff::PushBack { value: item } = &timeline_updates[0]);
assert_eq!(item.as_event().unwrap().content().as_message().unwrap().body(), "Next");

View File

@@ -20,9 +20,7 @@ use eyeball_im::VectorDiff;
use futures_util::StreamExt;
use matrix_sdk::{
linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
test_utils::mocks::{
MatrixMockServer, RoomContextResponseTemplate, RoomRelationsResponseTemplate,
},
test_utils::mocks::{MatrixMockServer, RoomContextResponseTemplate},
};
use matrix_sdk_test::{ALICE, BOB, JoinedRoomBuilder, async_test, event_factory::EventFactory};
use matrix_sdk_ui::timeline::{
@@ -34,15 +32,13 @@ use matrix_sdk_ui::timeline::{
use ruma::{
EventId, MilliSecondsSinceUnixEpoch, event_id,
events::{
AnyTimelineEvent, MessageLikeEventType, TimelineEventType,
MessageLikeEventType, TimelineEventType,
room::{
encryption::RoomEncryptionEventContent,
message::{RedactedRoomMessageEventContent, RoomMessageEventContent},
},
},
owned_event_id, room_id,
serde::Raw,
user_id,
owned_event_id, room_id, user_id,
};
use sliding_sync::assert_timeline_stream;
use stream_assert::assert_pending;
@@ -95,22 +91,20 @@ async fn test_timeline_is_threaded() {
// An event-focused timeline, focused on a non-thread event, isn't threaded when
// no context is requested.
let f = EventFactory::new();
let event_id = event_id!("$target");
let event_id = event_id!("$target1");
let event =
f.text_msg("hello world").event_id(event_id).room(room_id).sender(&ALICE).into_event();
server.mock_room_event().match_event_id().ok(event).mock_once().mount().await;
server
.mock_room_relations()
.match_target_event(event_id.to_owned())
.ok(RoomRelationsResponseTemplate::default()
.events(Vec::<Raw<AnyTimelineEvent>>::new()))
.mock_room_event_context()
.match_event_id()
.ok(RoomContextResponseTemplate::new(event))
.mock_once()
.mount()
.await;
let timeline = TimelineBuilder::new(&room)
.with_focus(TimelineFocus::Event {
target: owned_event_id!("$target"),
target: owned_event_id!("$target1"),
num_context_events: 0,
thread_mode: TimelineEventFocusThreadMode::Automatic { hide_threaded_events: true },
})
@@ -125,7 +119,7 @@ async fn test_timeline_is_threaded() {
// when no context is requested \o/
let f = EventFactory::new();
let thread_root = event_id!("$thread_root");
let event_id = event_id!("$thetarget");
let event_id = event_id!("$target2");
let event = f
.text_msg("hey to you too")
.event_id(event_id)
@@ -134,19 +128,17 @@ async fn test_timeline_is_threaded() {
.sender(&ALICE)
.into_event();
server.mock_room_event().match_event_id().ok(event).mock_once().mount().await;
server
.mock_room_relations()
.match_target_event(event_id.to_owned())
.ok(RoomRelationsResponseTemplate::default()
.events(Vec::<Raw<AnyTimelineEvent>>::new()))
.mock_room_event_context()
.match_event_id()
.ok(RoomContextResponseTemplate::new(event))
.mock_once()
.mount()
.await;
let timeline = TimelineBuilder::new(&room)
.with_focus(TimelineFocus::Event {
target: owned_event_id!("$thetarget"),
target: owned_event_id!("$target2"),
num_context_events: 0,
thread_mode: TimelineEventFocusThreadMode::Automatic { hide_threaded_events: true },
})
@@ -160,7 +152,7 @@ async fn test_timeline_is_threaded() {
// An event-focused timeline, focused on a thread root, is also threaded
// when no context is requested \o/
let f = EventFactory::new();
let event_id = event_id!("$atarget");
let event_id = event_id!("$target3");
let event = f
.text_msg("hey to you too")
.event_id(event_id)
@@ -168,19 +160,17 @@ async fn test_timeline_is_threaded() {
.sender(&ALICE)
.into_event();
server.mock_room_event().match_event_id().ok(event).mock_once().mount().await;
server
.mock_room_relations()
.match_target_event(event_id.to_owned())
.ok(RoomRelationsResponseTemplate::default()
.events(Vec::<Raw<AnyTimelineEvent>>::new()))
.mock_room_event_context()
.match_event_id()
.ok(RoomContextResponseTemplate::new(event))
.mock_once()
.mount()
.await;
let timeline = TimelineBuilder::new(&room)
.with_focus(TimelineFocus::Event {
target: owned_event_id!("$atarget"),
target: owned_event_id!("$target3"),
num_context_events: 0,
thread_mode: TimelineEventFocusThreadMode::ForceThread,
})
@@ -195,7 +185,7 @@ async fn test_timeline_is_threaded() {
let f = EventFactory::new();
let event = f
.text_msg("hello world")
.event_id(event_id!("$target"))
.event_id(event_id!("$target4"))
.room(room_id)
.sender(&ALICE)
.into_event();
@@ -208,7 +198,7 @@ async fn test_timeline_is_threaded() {
let timeline = TimelineBuilder::new(&room)
.with_focus(TimelineFocus::Event {
target: owned_event_id!("$target"),
target: owned_event_id!("$target4"),
num_context_events: 2,
thread_mode: TimelineEventFocusThreadMode::Automatic { hide_threaded_events: true },
})
@@ -224,7 +214,7 @@ async fn test_timeline_is_threaded() {
let thread_root = event_id!("$thread_root");
let event = f
.text_msg("hey to you too")
.event_id(event_id!("$target"))
.event_id(event_id!("$target5"))
.in_thread(thread_root, thread_root)
.room(room_id)
.sender(&ALICE)
@@ -239,7 +229,7 @@ async fn test_timeline_is_threaded() {
let timeline = TimelineBuilder::new(&room)
.with_focus(TimelineFocus::Event {
target: owned_event_id!("$target"),
target: owned_event_id!("$target5"),
num_context_events: 2,
thread_mode: TimelineEventFocusThreadMode::Automatic { hide_threaded_events: true },
})
@@ -254,7 +244,7 @@ async fn test_timeline_is_threaded() {
let f = EventFactory::new();
let event = f
.text_msg("hey to you too")
.event_id(event_id!("$target"))
.event_id(event_id!("$target6"))
.room(room_id)
.sender(&ALICE)
.into_event();
@@ -268,7 +258,7 @@ async fn test_timeline_is_threaded() {
let timeline = TimelineBuilder::new(&room)
.with_focus(TimelineFocus::Event {
target: owned_event_id!("$target"),
target: owned_event_id!("$target6"),
num_context_events: 2,
thread_mode: TimelineEventFocusThreadMode::ForceThread,
})