From 06fe8a8f32b87ab598ac85e17c42cc5214864a3e Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 22 Jan 2024 17:01:56 +0100 Subject: [PATCH] event graph: add an initial implementation of the event graph This is mostly a demonstration of how to plug this with the timeline, and how little it changes as a result. Remove read receipts --- bindings/matrix-sdk-ffi/src/room_list.rs | 3 +- crates/matrix-sdk-ui/src/event_graph.rs | 377 ++++++++++++++++++ crates/matrix-sdk-ui/src/lib.rs | 1 + .../src/room_list_service/room.rs | 3 +- crates/matrix-sdk-ui/src/timeline/builder.rs | 80 +++- .../matrix-sdk-ui/src/timeline/inner/mod.rs | 8 +- .../matrix-sdk-ui/src/timeline/inner/state.rs | 14 +- crates/matrix-sdk-ui/src/timeline/mod.rs | 2 + .../src/timeline/sliding_sync_ext.rs | 14 +- .../matrix-sdk-ui/src/timeline/tests/basic.rs | 7 +- .../src/timeline/tests/reactions.rs | 5 +- .../src/timeline/tests/read_receipts.rs | 4 +- .../src/timeline/tests/redaction.rs | 5 +- .../matrix-sdk-ui/src/timeline/tests/virt.rs | 2 +- .../tests/integration/room_list_service.rs | 4 +- .../integration/timeline/read_receipts.rs | 2 +- labs/rrrepl/src/main.rs | 2 +- 17 files changed, 469 insertions(+), 64 deletions(-) create mode 100644 crates/matrix-sdk-ui/src/event_graph.rs diff --git a/bindings/matrix-sdk-ffi/src/room_list.rs b/bindings/matrix-sdk-ffi/src/room_list.rs index cbd212638..39639a796 100644 --- a/bindings/matrix-sdk-ffi/src/room_list.rs +++ b/bindings/matrix-sdk-ffi/src/room_list.rs @@ -473,6 +473,7 @@ impl RoomListItem { } /// Initializes the timeline for this room using the provided parameters. + /// /// * `event_type_filter` - An optional [`TimelineEventTypeFilter`] to be /// used to filter timeline events besides the default timeline filter. If /// `None` is passed, only the default timeline filter will be used. @@ -480,7 +481,7 @@ impl RoomListItem { &self, event_type_filter: Option>, ) -> Result<(), RoomListError> { - let mut timeline_builder = self.inner.default_room_timeline_builder(); + let mut timeline_builder = self.inner.default_room_timeline_builder().await; if let Some(event_type_filter) = event_type_filter { timeline_builder = timeline_builder.event_filter(move |event, room_version_id| { // Always perform the default filter first diff --git a/crates/matrix-sdk-ui/src/event_graph.rs b/crates/matrix-sdk-ui/src/event_graph.rs new file mode 100644 index 000000000..387cea67a --- /dev/null +++ b/crates/matrix-sdk-ui/src/event_graph.rs @@ -0,0 +1,377 @@ +// Copyright 2024 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! The event graph is an abstraction layer, sitting between the Rust SDK and a +//! final client, that acts as a global observer of all the rooms, gathering and +//! inferring some extra useful information about each room. In particular, this +//! doesn't require subscribing to a specific room to get access to this +//! information. +//! +//! It's intended to be fast, robust and easy to maintain. +//! +//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more details about the historical reasons that led us to start writing this. +//! +//! Most of it is still a work-in-progress, as of 2024-01-22. +//! +//! The desired set of features it may eventually implement is the following: +//! +//! - [ ] compute proper unread room counts, and use backpagination to get +//! missing messages/notifications/mentions, if needs be. +//! - [ ] expose that information with a new data structure similar to the +//! `RoomInfo`, and that may update a `RoomListService`. +//! - [ ] provide read receipts for each message. +//! - [ ] backwards and forward pagination, and reconcile results with cached +//! timelines. +//! - [ ] retry decryption upon receiving new keys (from an encryption sync +//! service or from a key backup). +//! - [ ] expose the latest event for a given room. +//! - [ ] caching of events on-disk. + +#![forbid(missing_docs)] + +use std::{collections::BTreeMap, fmt::Debug, sync::Arc}; + +use async_trait::async_trait; +use matrix_sdk::{sync::RoomUpdate, Client, Room}; +use matrix_sdk_base::{ + deserialized_responses::SyncTimelineEvent, + sync::{JoinedRoom, LeftRoom, Timeline}, +}; +use ruma::{ + events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent}, + serde::Raw, + OwnedRoomId, RoomId, +}; +use tokio::{ + spawn, + sync::{ + broadcast::{error::RecvError, Receiver, Sender}, + RwLock, + }, + task::JoinHandle, +}; +use tracing::{debug, error, trace}; + +/// An error observed in the `EventGraph`. +#[derive(thiserror::Error, Debug)] +pub enum EventGraphError { + /// A room hasn't been found, when trying to create a graph view for that + /// room. + #[error("Room with id {0} not found")] + RoomNotFound(OwnedRoomId), +} + +/// A result using the [`EventGraphError`]. +pub type Result = std::result::Result; + +/// Hold handles to the tasks spawn by a [`RoomEventGraph`]. +struct RoomGraphDropHandles { + listen_updates_task: JoinHandle<()>, +} + +impl Drop for RoomGraphDropHandles { + fn drop(&mut self) { + self.listen_updates_task.abort(); + } +} + +/// An event graph, providing lots of useful functionality for clients. +/// +/// See also the module-level comment. +pub struct EventGraph { + /// Reference to the client used to navigate this graph. + client: Client, + /// Lazily-filled cache of live [`RoomEventGraph`], once per room. + by_room: BTreeMap, + /// Backend used for storage. + store: Arc, +} + +impl Debug for EventGraph { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("EventGraph").finish_non_exhaustive() + } +} + +impl EventGraph { + /// Create a new [`EventGraph`] for the given client. + pub fn new(client: Client) -> Self { + let store = Arc::new(MemoryStore::new()); + Self { client, by_room: Default::default(), store } + } + + /// Return a room-specific view over the [`EventGraph`]. + /// + /// It may not be found, if the room isn't known to the client. + pub fn for_room(&mut self, room_id: &RoomId) -> Option { + match self.by_room.get(room_id) { + Some(room) => Some(room.clone()), + None => { + let room = self.client.get_room(room_id)?; + let room_event_graph = RoomEventGraph::new(room, self.store.clone()); + self.by_room.insert(room_id.to_owned(), room_event_graph.clone()); + Some(room_event_graph) + } + } + } + + /// Add an initial set of events to the event graph, reloaded from a cache. + /// + /// TODO: temporary for API compat, as the event graph should take care of + /// its own cache. + pub async fn add_initial_events( + &mut self, + room_id: &RoomId, + events: Vec, + ) -> Result<()> { + let room_graph = self + .for_room(room_id) + .ok_or_else(|| EventGraphError::RoomNotFound(room_id.to_owned()))?; + room_graph.inner.append_events(events).await?; + Ok(()) + } +} + +/// A store that can be remember information about the event graph. +/// +/// It really acts as a cache, in the sense that clearing the backing data +/// should not have any irremediable effect, other than providing a lesser user +/// experience. +#[async_trait] +pub trait EventGraphStore: Send + Sync { + /// Returns all the known events for the given room. + async fn room_events(&self, room: &RoomId) -> Result>; + + /// Adds all the events to the given room. + async fn add_room_events(&self, room: &RoomId, events: Vec) -> Result<()>; + + /// Clear all the events from the given room. + async fn clear_room_events(&self, room: &RoomId) -> Result<()>; +} + +struct MemoryStore { + /// All the events per room, in sync order. + by_room: RwLock>>, +} + +impl MemoryStore { + fn new() -> Self { + Self { by_room: Default::default() } + } +} + +#[async_trait] +impl EventGraphStore for MemoryStore { + async fn room_events(&self, room: &RoomId) -> Result> { + Ok(self.by_room.read().await.get(room).cloned().unwrap_or_default()) + } + + async fn add_room_events(&self, room: &RoomId, events: Vec) -> Result<()> { + self.by_room.write().await.entry(room.to_owned()).or_default().extend(events); + Ok(()) + } + + async fn clear_room_events(&self, room: &RoomId) -> Result<()> { + let _ = self.by_room.write().await.remove(room); + Ok(()) + } +} + +/// A subset of an event graph, for a room. +/// +/// Cloning is shallow, and thus is cheap to do. +#[derive(Clone)] +pub struct RoomEventGraph { + inner: Arc, + + _drop_handles: Arc, +} + +impl Debug for RoomEventGraph { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RoomEventGraph").finish_non_exhaustive() + } +} + +impl RoomEventGraph { + /// Create a new [`RoomEventGraph`] using the given room and store. + fn new(room: Room, store: Arc) -> Self { + let (inner, drop_handles) = RoomEventGraphInner::new(room, store); + Self { inner, _drop_handles: drop_handles } + } + + /// Subscribe to room updates for this room, after getting the initial list + /// of events. XXX: Could/should it use some kind of `Observable` + /// instead? Or not something async, like explicit handlers as our event + /// handlers? + pub async fn subscribe( + &self, + ) -> Result<(Vec, Receiver)> { + Ok(( + self.inner.store.room_events(self.inner.room.room_id()).await?, + self.inner.sender.subscribe(), + )) + } +} + +struct RoomEventGraphInner { + sender: Sender, + store: Arc, + room: Room, +} + +impl RoomEventGraphInner { + /// Creates a new graph for a room, and subscribes to room updates., so as + /// to handle new timeline events. + fn new(room: Room, store: Arc) -> (Arc, Arc) { + let sender = Sender::new(32); + + let room_graph = Arc::new(Self { room, store, sender }); + + let listen_updates_task = spawn(Self::listen_task(room_graph.clone())); + + (room_graph, Arc::new(RoomGraphDropHandles { listen_updates_task })) + } + + async fn handle_joined_room_update(&self, updates: JoinedRoom) -> Result<()> { + self.handle_timeline(updates.timeline, updates.ephemeral.clone(), updates.account_data) + .await?; + Ok(()) + } + + async fn handle_timeline( + &self, + timeline: Timeline, + ephemeral: Vec>, + account_data: Vec>, + ) -> Result<()> { + let room_id = self.room.room_id(); + + if timeline.limited { + // Ideally we'd try to reconcile existing events against those received in the + // timeline, but we're not there yet. In the meanwhile, clear the + // items from the room. TODO: implement Smart Matching™. + trace!("limited timeline, clearing all previous events"); + self.store.clear_room_events(room_id).await?; + let _ = self.sender.send(RoomEventGraphUpdate::Clear); + } + + // Add all the events to the backend. + trace!("adding new events"); + self.store.add_room_events(room_id, timeline.events.clone()).await?; + + // Propagate events to observers. + let _ = self.sender.send(RoomEventGraphUpdate::Append { + events: timeline.events, + prev_batch: timeline.prev_batch, + ephemeral, + account_data, + }); + + Ok(()) + } + + async fn handle_left_room_update(&self, updates: LeftRoom) -> Result<()> { + self.handle_timeline(updates.timeline, Vec::new(), Vec::new()).await?; + Ok(()) + } + + async fn listen_task(this: Arc) { + // TODO for prototyping, i'm spawning a new task to get the room updates. + // Ideally we'd have something like the whole sync update, a generalisation of + // the room update. + trace!("Spawning the listen task"); + + let mut update_receiver = this.room.client().subscribe_to_room_updates(this.room.room_id()); + + loop { + match update_receiver.recv().await { + Ok(update) => { + trace!("Listen task received an update"); + + match update { + RoomUpdate::Left { updates, .. } => { + if let Err(err) = this.handle_left_room_update(updates).await { + error!("handling left room update: {err}"); + } + } + RoomUpdate::Joined { updates, .. } => { + if let Err(err) = this.handle_joined_room_update(updates).await { + error!("handling joined room update: {err}"); + } + } + RoomUpdate::Invited { .. } => { + // We don't do anything for invited rooms at this + // point. TODO should + // we? + } + } + } + + Err(RecvError::Closed) => { + // The loop terminated successfully. + debug!("Listen task closed"); + break; + } + + Err(RecvError::Lagged(_)) => { + // Since we've lagged behind updates to this room, we might be out of + // sync with the events, leading to potentially lost events. Play it + // safe here, and clear the cache. It's fine because we can retrigger + // backpagination from the last event at any time, if needs be. + debug!("Listen task lagged, clearing room"); + if let Err(err) = this.store.clear_room_events(this.room.room_id()).await { + error!("unable to clear room after room updates lag: {err}"); + } + } + } + } + } + + /// Append a set of events to the room graph and storage, notifying + /// observers. + async fn append_events(&self, events: Vec) -> Result<()> { + self.store.add_room_events(self.room.room_id(), events.clone()).await?; + + let _ = self.sender.send(RoomEventGraphUpdate::Append { + events, + prev_batch: None, + account_data: Default::default(), + ephemeral: Default::default(), + }); + + Ok(()) + } +} + +/// An update related to events happened in a room. +#[derive(Clone)] +pub enum RoomEventGraphUpdate { + /// The room has been cleared from events. + Clear, + /// The room has new events. + Append { + /// All the new events that have been added to the room. + events: Vec, + /// XXX: this is temporary, until backpagination lives in the event + /// graph. + prev_batch: Option, + /// XXX: this is temporary, until account data lives in the event graph + /// — or will it live there? + account_data: Vec>, + /// XXX: this is temporary, until read receipts are handled in the event + /// graph + ephemeral: Vec>, + }, +} diff --git a/crates/matrix-sdk-ui/src/lib.rs b/crates/matrix-sdk-ui/src/lib.rs index 0e33ac0e7..b612189b9 100644 --- a/crates/matrix-sdk-ui/src/lib.rs +++ b/crates/matrix-sdk-ui/src/lib.rs @@ -17,6 +17,7 @@ use ruma::html::HtmlSanitizerMode; mod events; pub mod encryption_sync_service; +pub mod event_graph; pub mod notification_client; pub mod room_list_service; pub mod sync_service; diff --git a/crates/matrix-sdk-ui/src/room_list_service/room.rs b/crates/matrix-sdk-ui/src/room_list_service/room.rs index c5cbe5070..a8203f1a3 100644 --- a/crates/matrix-sdk-ui/src/room_list_service/room.rs +++ b/crates/matrix-sdk-ui/src/room_list_service/room.rs @@ -174,12 +174,13 @@ impl Room { } /// Create a new [`TimelineBuilder`] with the default configuration. - pub fn default_room_timeline_builder(&self) -> TimelineBuilder { + pub async fn default_room_timeline_builder(&self) -> TimelineBuilder { Timeline::builder(&self.inner.room) .events( self.inner.sliding_sync_room.prev_batch(), self.inner.sliding_sync_room.timeline_queue(), ) + .await .track_read_marker_and_receipts() } } diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index 179527261..73845d26d 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -17,9 +17,8 @@ use std::{collections::BTreeSet, sync::Arc}; use eyeball::SharedObservable; use futures_util::{pin_mut, StreamExt}; use imbl::Vector; -use matrix_sdk::{ - deserialized_responses::SyncTimelineEvent, executor::spawn, sync::RoomUpdate, Room, -}; +use matrix_sdk::{deserialized_responses::SyncTimelineEvent, executor::spawn, Room}; +use matrix_sdk_base::sync::JoinedRoom; use ruma::{ events::{receipt::ReceiptType, AnySyncTimelineEvent}, RoomVersionId, @@ -34,6 +33,7 @@ use super::{ queue::send_queued_messages, BackPaginationStatus, Timeline, TimelineDropHandle, }; +use crate::event_graph::{EventGraph, RoomEventGraphUpdate}; /// Builder that allows creating and configuring various parts of a /// [`Timeline`]. @@ -42,8 +42,8 @@ use super::{ pub struct TimelineBuilder { room: Room, prev_token: Option, - events: Vector, settings: TimelineInnerSettings, + event_graph: EventGraph, } impl TimelineBuilder { @@ -51,19 +51,25 @@ impl TimelineBuilder { Self { room: room.clone(), prev_token: None, - events: Vector::new(), settings: TimelineInnerSettings::default(), + event_graph: EventGraph::new(room.client()), } } /// Add initial events to the timeline. - pub(crate) fn events( + /// TODO: remove this, the EventGraph should hold the events data in the + /// first place, and we'd provide an existing EventGraph to the + /// TimelineBuilder. + pub(crate) async fn events( mut self, prev_token: Option, events: Vector, ) -> Self { self.prev_token = prev_token; - self.events = events; + self.event_graph + .add_initial_events(self.room.room_id(), events.iter().cloned().collect()) + .await + .expect("room exists"); self } @@ -120,13 +126,19 @@ impl TimelineBuilder { skip(self), fields( room_id = ?self.room.room_id(), - events_length = self.events.len(), track_read_receipts = self.settings.track_read_receipts, prev_token = self.prev_token, ) )] pub async fn build(self) -> Timeline { - let Self { room, prev_token, events, settings } = self; + let Self { room, mut event_graph, prev_token, settings } = self; + + let room_event_graph = event_graph.for_room(room.room_id()).expect("room exists"); + let (events, mut event_subscriber) = room_event_graph + .subscribe() + .await + .expect("make this function fallible, or allow this for the time being?"); + let has_events = !events.is_empty(); let track_read_marker_and_receipts = settings.track_read_receipts; @@ -148,7 +160,6 @@ impl TimelineBuilder { let client = room.client(); let sync_response_notify = Arc::new(Notify::new()); - let mut room_update_rx = room.subscribe_to_updates(); let room_update_join_handle = spawn({ let sync_response_notify = sync_response_notify.clone(); let inner = inner.clone(); @@ -158,8 +169,12 @@ impl TimelineBuilder { span.follows_from(Span::current()); async move { + trace!("Spawned the event subscriber task"); + loop { - let update = match room_update_rx.recv().await { + trace!("Waiting for an event"); + + let update = match event_subscriber.recv().await { Ok(up) => up, Err(broadcast::error::RecvError::Closed) => break, Err(broadcast::error::RecvError::Lagged(_)) => { @@ -169,21 +184,41 @@ impl TimelineBuilder { } }; - trace!("Handling a room update"); - match update { - RoomUpdate::Left { updates, .. } => { - inner.handle_sync_timeline(updates.timeline).await; + RoomEventGraphUpdate::Clear => { + trace!("Clearing the timeline."); + inner.clear().await; } - RoomUpdate::Joined { updates, .. } => { - inner.handle_joined_room_update(updates).await; - } - RoomUpdate::Invited { .. } => { - warn!("Room is in invited state, can't build or update its timeline"); + + RoomEventGraphUpdate::Append { + events, + prev_batch, + account_data, + ephemeral, + } => { + trace!("Received new events"); + + // XXX this timeline and the joined room updates are synthetic, until + // we get rid of `handle_joined_room_update` by adding all functionality + // back in the event graph, and replacing it with a simple + // `handle_add_events`. + let timeline = matrix_sdk_base::sync::Timeline { + limited: false, + prev_batch, + events, + }; + let update = JoinedRoom { + unread_notifications: Default::default(), + timeline, + state: Default::default(), + account_data, + ephemeral, + }; + inner.handle_joined_room_update(update).await; + + sync_response_notify.notify_waiters(); } } - - sync_response_notify.notify_waiters(); } } .instrument(span) @@ -268,6 +303,7 @@ impl TimelineBuilder { room_update_join_handle, ignore_user_list_update_join_handle, room_key_from_backups_join_handle, + _event_graph: room_event_graph, }), }; diff --git a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs index cc9519892..ec328b6a1 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs @@ -29,7 +29,6 @@ use matrix_sdk::{ sync::JoinedRoom, Error, Result, Room, }; -use matrix_sdk_base::sync::Timeline; #[cfg(test)] use ruma::events::receipt::ReceiptEventContent; #[cfg(all(test, feature = "e2e-encryption"))] @@ -407,7 +406,7 @@ impl TimelineInner

{ pub(super) async fn add_initial_events( &mut self, - events: Vector, + events: Vec, back_pagination_token: Option, ) { if events.is_empty() { @@ -434,11 +433,6 @@ impl TimelineInner

{ state.handle_joined_room_update(update, &self.room_data_provider, &self.settings).await; } - pub(super) async fn handle_sync_timeline(&self, timeline: Timeline) { - let mut state = self.state.write().await; - state.handle_sync_timeline(timeline, &self.room_data_provider, &self.settings).await; - } - #[cfg(test)] pub(super) async fn handle_live_event(&self, event: SyncTimelineEvent) { let mut state = self.state.write().await; diff --git a/crates/matrix-sdk-ui/src/timeline/inner/state.rs b/crates/matrix-sdk-ui/src/timeline/inner/state.rs index b0ba11ca7..bcbc382bf 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner/state.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner/state.rs @@ -21,7 +21,6 @@ use std::{ }; use eyeball_im::{ObservableVector, ObservableVectorTransaction, ObservableVectorTransactionEntry}; -use imbl::Vector; use indexmap::IndexMap; use matrix_sdk::{deserialized_responses::SyncTimelineEvent, sync::Timeline}; use matrix_sdk_base::{deserialized_responses::TimelineEvent, sync::JoinedRoom}; @@ -82,7 +81,7 @@ impl TimelineInnerState { #[tracing::instrument(skip_all)] pub(super) async fn add_initial_events( &mut self, - events: Vector, + events: Vec, mut back_pagination_token: Option, room_data_provider: &P, settings: &TimelineInnerSettings, @@ -111,17 +110,6 @@ impl TimelineInnerState { txn.commit(); } - pub(super) async fn handle_sync_timeline( - &mut self, - timeline: Timeline, - room_data_provider: &P, - settings: &TimelineInnerSettings, - ) { - let mut txn = self.transaction(); - txn.handle_sync_timeline(timeline, room_data_provider, settings).await; - txn.commit(); - } - #[instrument(skip_all)] pub(super) async fn handle_joined_room_update( &mut self, diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index ccd0c6579..5ba127d8d 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -59,6 +59,7 @@ use tokio::sync::{mpsc::Sender, Mutex, Notify}; use tracing::{debug, error, info, instrument, trace, warn}; use self::futures::SendAttachment; +use crate::event_graph::RoomEventGraph; mod builder; mod error; @@ -796,6 +797,7 @@ struct TimelineDropHandle { room_update_join_handle: JoinHandle<()>, ignore_user_list_update_join_handle: JoinHandle<()>, room_key_from_backups_join_handle: JoinHandle<()>, + _event_graph: RoomEventGraph, } impl Drop for TimelineDropHandle { diff --git a/crates/matrix-sdk-ui/src/timeline/sliding_sync_ext.rs b/crates/matrix-sdk-ui/src/timeline/sliding_sync_ext.rs index 296182248..54c978436 100644 --- a/crates/matrix-sdk-ui/src/timeline/sliding_sync_ext.rs +++ b/crates/matrix-sdk-ui/src/timeline/sliding_sync_ext.rs @@ -33,7 +33,13 @@ pub trait SlidingSyncRoomExt { #[async_trait] impl SlidingSyncRoomExt for SlidingSyncRoom { async fn timeline(&self) -> Option { - Some(sliding_sync_timeline_builder(self)?.track_read_marker_and_receipts().build().await) + Some( + sliding_sync_timeline_builder(self) + .await? + .track_read_marker_and_receipts() + .build() + .await, + ) } /// Get a timeline item representing the latest event in this room. @@ -46,10 +52,12 @@ impl SlidingSyncRoomExt for SlidingSyncRoom { } } -fn sliding_sync_timeline_builder(room: &SlidingSyncRoom) -> Option { +async fn sliding_sync_timeline_builder(room: &SlidingSyncRoom) -> Option { let room_id = room.room_id(); match room.client().get_room(room_id) { - Some(r) => Some(Timeline::builder(&r).events(room.prev_batch(), room.timeline_queue())), + Some(r) => { + Some(Timeline::builder(&r).events(room.prev_batch(), room.timeline_queue()).await) + } None => { error!(?room_id, "Room not found in client. Can't provide a timeline for it"); None diff --git a/crates/matrix-sdk-ui/src/timeline/tests/basic.rs b/crates/matrix-sdk-ui/src/timeline/tests/basic.rs index 911a9e4f7..c07c35e16 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/basic.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/basic.rs @@ -15,7 +15,6 @@ use assert_matches::assert_matches; use assert_matches2::assert_let; use eyeball_im::VectorDiff; -use imbl::vector; use matrix_sdk_base::deserialized_responses::SyncTimelineEvent; use matrix_sdk_test::{async_test, sync_timeline_event, ALICE, BOB, CAROL}; use ruma::{ @@ -47,7 +46,7 @@ async fn initial_events() { timeline .inner .add_initial_events( - vector![ + vec![ SyncTimelineEvent::new( timeline .event_builder @@ -239,7 +238,7 @@ async fn dedup_initial() { timeline .inner .add_initial_events( - vector![ + vec![ // two events event_a.clone(), event_b.clone(), @@ -247,7 +246,7 @@ async fn dedup_initial() { event_a, event_b, // … and a new event also came in - event_c + event_c, ], None, ) diff --git a/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs b/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs index 3b938dae3..59792d84b 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs @@ -18,7 +18,6 @@ use assert_matches::assert_matches; use assert_matches2::assert_let; use eyeball_im::VectorDiff; use futures_core::Stream; -use imbl::vector; use matrix_sdk_base::deserialized_responses::SyncTimelineEvent; use matrix_sdk_test::{async_test, ALICE, BOB}; use ruma::{ @@ -248,7 +247,7 @@ async fn initial_reaction_timestamp_is_stored() { timeline .inner .add_initial_events( - vector![ + vec![ SyncTimelineEvent::new(timeline.event_builder.make_sync_reaction( *ALICE, &Annotation::new(message_event_id.clone(), REACTION_KEY.to_owned()), @@ -258,7 +257,7 @@ async fn initial_reaction_timestamp_is_stored() { *ALICE, &message_event_id, RoomMessageEventContent::text_plain("A"), - )) + )), ], None, ) diff --git a/crates/matrix-sdk-ui/src/timeline/tests/read_receipts.rs b/crates/matrix-sdk-ui/src/timeline/tests/read_receipts.rs index 0c712b6a6..822d35fbe 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/read_receipts.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/read_receipts.rs @@ -40,7 +40,7 @@ fn filter_notice(ev: &AnySyncTimelineEvent, _room_version: &RoomVersionId) -> bo } #[async_test] -async fn read_receipts_updates_on_live_events() { +async fn test_read_receipts_updates_on_live_events() { let timeline = TestTimeline::new() .with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() }); let mut stream = timeline.subscribe().await; @@ -138,7 +138,7 @@ async fn read_receipts_updates_on_back_paginated_events() { } #[async_test] -async fn read_receipts_updates_on_filtered_events() { +async fn test_read_receipts_updates_on_filtered_events() { let timeline = TestTimeline::new().with_settings(TimelineInnerSettings { track_read_receipts: true, event_filter: Arc::new(filter_notice), diff --git a/crates/matrix-sdk-ui/src/timeline/tests/redaction.rs b/crates/matrix-sdk-ui/src/timeline/tests/redaction.rs index d2313db71..a34d5cd04 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/redaction.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/redaction.rs @@ -15,7 +15,6 @@ use assert_matches::assert_matches; use assert_matches2::assert_let; use eyeball_im::VectorDiff; -use imbl::vector; use matrix_sdk_base::deserialized_responses::SyncTimelineEvent; use matrix_sdk_test::{async_test, sync_timeline_event, ALICE, BOB}; use ruma::{ @@ -148,10 +147,10 @@ async fn reaction_redaction_timeline_filter() { timeline .inner .add_initial_events( - vector![SyncTimelineEvent::new( + vec![SyncTimelineEvent::new( timeline .event_builder - .make_sync_redacted_message_event(*ALICE, RedactedReactionEventContent::new()) + .make_sync_redacted_message_event(*ALICE, RedactedReactionEventContent::new()), )], None, ) diff --git a/crates/matrix-sdk-ui/src/timeline/tests/virt.rs b/crates/matrix-sdk-ui/src/timeline/tests/virt.rs index e4c0b2496..49ba1442c 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/virt.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/virt.rs @@ -94,7 +94,7 @@ async fn day_divider() { } #[async_test] -async fn update_read_marker() { +async fn test_update_read_marker() { let timeline = TestTimeline::new(); let mut stream = timeline.subscribe().await; diff --git a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs index 078bde7d6..2b7951963 100644 --- a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs +++ b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs @@ -2370,7 +2370,7 @@ async fn test_room_timeline() -> Result<(), Error> { }; let room = room_list.room(room_id).await?; - room.init_timeline_with_builder(room.default_room_timeline_builder()).await?; + room.init_timeline_with_builder(room.default_room_timeline_builder().await).await?; let timeline = room.timeline().unwrap(); let (previous_timeline_items, mut timeline_items_stream) = timeline.subscribe().await; @@ -2452,7 +2452,7 @@ async fn test_room_latest_event() -> Result<(), Error> { }; let room = room_list.room(room_id).await?; - room.init_timeline_with_builder(room.default_room_timeline_builder()).await?; + room.init_timeline_with_builder(room.default_room_timeline_builder().await).await?; // The latest event does not exist. assert!(room.latest_event().await.is_none()); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/read_receipts.rs b/crates/matrix-sdk-ui/tests/integration/timeline/read_receipts.rs index 2a52654e3..17a0e4d19 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/read_receipts.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/read_receipts.rs @@ -281,7 +281,7 @@ async fn read_receipts_updates() { } #[async_test] -async fn read_receipts_updates_on_filtered_events() { +async fn test_read_receipts_updates_on_filtered_events() { let room_id = room_id!("!a98sd12bjh:example.org"); let (client, server) = logged_in_client().await; let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); diff --git a/labs/rrrepl/src/main.rs b/labs/rrrepl/src/main.rs index 1f17d41b2..f60866595 100644 --- a/labs/rrrepl/src/main.rs +++ b/labs/rrrepl/src/main.rs @@ -134,7 +134,7 @@ async fn login_and_sync(server_name: String) -> anyhow::Result<()> { let room_id = { rooms.lock().unwrap()[id].as_room_id().map(ToOwned::to_owned) }; if let Some(room_id) = &room_id { let room = room_list_service.room(room_id).await?; - room.init_timeline_with_builder(room.default_room_timeline_builder()) + room.init_timeline_with_builder(room.default_room_timeline_builder().await) .await?; let timeline = room.timeline().unwrap();