diff --git a/crates/matrix-sdk/src/room/common.rs b/crates/matrix-sdk/src/room/common.rs index e307c4b86..704f5be39 100644 --- a/crates/matrix-sdk/src/room/common.rs +++ b/crates/matrix-sdk/src/room/common.rs @@ -270,7 +270,7 @@ impl Common { /// independent events. #[cfg(feature = "experimental-timeline")] pub async fn timeline(&self) -> Timeline { - Timeline::new(self).with_fully_read_tracking().await + Timeline::builder(self).track_fully_read().build().await } /// Fetch the event with the given `EventId` in this room. diff --git a/crates/matrix-sdk/src/room/timeline/builder.rs b/crates/matrix-sdk/src/room/timeline/builder.rs new file mode 100644 index 000000000..385f53c84 --- /dev/null +++ b/crates/matrix-sdk/src/room/timeline/builder.rs @@ -0,0 +1,162 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use matrix_sdk_base::{ + deserialized_responses::{EncryptionInfo, SyncTimelineEvent}, + locks::Mutex, +}; +use ruma::events::fully_read::FullyReadEventContent; +use tracing::error; + +use super::{ + inner::TimelineInner, + to_device::{handle_forwarded_room_key_event, handle_room_key_event}, + Timeline, +}; +use crate::room; + +/// Builder that allows creating and configuring various parts of a +/// [`Timeline`]. +#[must_use] +#[derive(Debug)] +pub(crate) struct TimelineBuilder { + room: room::Common, + prev_token: Option, + events: Vec, + track_fully_read: bool, +} + +impl TimelineBuilder { + pub(super) fn new(room: &room::Common) -> Self { + Self { + room: room.clone(), + prev_token: None, + events: Vec::default(), + track_fully_read: false, + } + } + + /// Add initial events to the timeline. + #[cfg(feature = "experimental-sliding-sync")] + pub(crate) fn events( + mut self, + prev_token: Option, + events: Vec, + ) -> Self { + self.prev_token = prev_token; + self.events = events; + self + } + + /// Enable tracking of the fully-read marker on the timeline. + pub(crate) fn track_fully_read(mut self) -> Self { + self.track_fully_read = true; + self + } + + /// Create a [`Timeline`] with the options set on this builder. + pub(crate) async fn build(self) -> Timeline { + let Self { room, prev_token, events, track_fully_read } = self; + let has_events = !events.is_empty(); + + let mut inner = TimelineInner::new(room); + + if has_events { + inner.add_initial_events(events).await; + } + + let inner = Arc::new(inner); + let room = inner.room(); + + let timeline_event_handle = room.add_event_handler({ + let inner = inner.clone(); + move |event, encryption_info: Option| { + let inner = inner.clone(); + async move { + inner.handle_live_event(event, encryption_info).await; + } + } + }); + + // Not using room.add_event_handler here because RoomKey events are + // to-device events that are not received in the context of a room. + #[cfg(feature = "e2e-encryption")] + let room_key_handle = room + .client + .add_event_handler(handle_room_key_event(inner.clone(), room.room_id().to_owned())); + #[cfg(feature = "e2e-encryption")] + let forwarded_room_key_handle = room.client.add_event_handler( + handle_forwarded_room_key_event(inner.clone(), room.room_id().to_owned()), + ); + + let mut event_handler_handles = vec![ + timeline_event_handle, + #[cfg(feature = "e2e-encryption")] + room_key_handle, + #[cfg(feature = "e2e-encryption")] + forwarded_room_key_handle, + ]; + + if track_fully_read { + match room.account_data_static::().await { + Ok(Some(fully_read)) => match fully_read.deserialize() { + Ok(fully_read) => { + inner.set_fully_read_event(fully_read.content.event_id).await; + } + Err(e) => { + error!("Failed to deserialize fully-read account data: {e}"); + } + }, + Err(e) => { + error!("Failed to get fully-read account data from the store: {e}"); + } + _ => {} + } + + let fully_read_handle = room.add_event_handler({ + let inner = inner.clone(); + move |event| { + let inner = inner.clone(); + async move { + inner.handle_fully_read(event).await; + } + } + }); + event_handler_handles.push(fully_read_handle); + } + + let timeline = Timeline { + inner, + start_token: Mutex::new(prev_token), + _end_token: Mutex::new(None), + event_handler_handles, + }; + + #[cfg(feature = "e2e-encryption")] + if has_events { + // The events we're injecting might be encrypted events, but we might + // have received the room key to decrypt them while nobody was listening to the + // `m.room_key` event, let's retry now. + // + // TODO: We could spawn a task here and put this into the background, though it + // might not be worth it depending on the number of events we injected. + // Some measuring needs to be done. + timeline.retry_decryption_for_all_events().await; + } + + timeline + } +} diff --git a/crates/matrix-sdk/src/room/timeline/inner.rs b/crates/matrix-sdk/src/room/timeline/inner.rs index cb5618a57..1e2a14ea9 100644 --- a/crates/matrix-sdk/src/room/timeline/inner.rs +++ b/crates/matrix-sdk/src/room/timeline/inner.rs @@ -6,11 +6,9 @@ use std::{ use async_trait::async_trait; use futures_signals::signal_vec::{MutableVec, MutableVecLockRef, SignalVec}; use indexmap::IndexSet; -#[cfg(any(test, feature = "experimental-sliding-sync"))] -use matrix_sdk_base::deserialized_responses::SyncTimelineEvent; use matrix_sdk_base::{ crypto::OlmMachine, - deserialized_responses::{EncryptionInfo, TimelineEvent}, + deserialized_responses::{EncryptionInfo, SyncTimelineEvent, TimelineEvent}, locks::Mutex, }; use ruma::{ @@ -75,7 +73,6 @@ impl TimelineInner

{ self.items.signal_vec_cloned() } - #[cfg(any(test, feature = "experimental-sliding-sync"))] pub(super) async fn add_initial_events(&mut self, events: Vec) { if events.is_empty() { return; diff --git a/crates/matrix-sdk/src/room/timeline/mod.rs b/crates/matrix-sdk/src/room/timeline/mod.rs index ae9d589b6..3115049d7 100644 --- a/crates/matrix-sdk/src/room/timeline/mod.rs +++ b/crates/matrix-sdk/src/room/timeline/mod.rs @@ -20,13 +20,9 @@ use std::sync::Arc; use futures_core::Stream; use futures_signals::signal_vec::{SignalVec, SignalVecExt, VecDiff}; -#[cfg(feature = "experimental-sliding-sync")] -use matrix_sdk_base::deserialized_responses::SyncTimelineEvent; -use matrix_sdk_base::{deserialized_responses::EncryptionInfo, locks::Mutex}; +use matrix_sdk_base::locks::Mutex; use ruma::{ - assign, - events::{fully_read::FullyReadEventContent, AnyMessageLikeEventContent}, - EventId, MilliSecondsSinceUnixEpoch, TransactionId, + assign, events::AnyMessageLikeEventContent, EventId, MilliSecondsSinceUnixEpoch, TransactionId, }; use thiserror::Error; use tracing::{error, instrument, warn}; @@ -38,6 +34,7 @@ use crate::{ Result, }; +mod builder; mod event_handler; mod event_item; mod inner; @@ -48,6 +45,8 @@ mod tests; mod to_device; mod virtual_item; +pub(crate) use self::builder::TimelineBuilder; +use self::inner::{TimelineInner, TimelineInnerMetadata}; pub use self::{ event_item::{ AnyOtherFullStateEventContent, BundledReactions, EncryptedMessage, EventSendState, @@ -58,10 +57,6 @@ pub use self::{ pagination::{PaginationOptions, PaginationOutcome}, virtual_item::VirtualTimelineItem, }; -use self::{ - inner::{TimelineInner, TimelineInnerMetadata}, - to_device::{handle_forwarded_room_key_event, handle_room_key_event}, -}; /// A high-level view into a regular¹ room's contents. /// @@ -85,107 +80,14 @@ impl Drop for Timeline { } impl Timeline { - pub(super) fn new(room: &room::Common) -> Self { - Self::from_inner(Arc::new(TimelineInner::new(room.to_owned())), None) - } - - #[cfg(feature = "experimental-sliding-sync")] - pub(crate) async fn with_events( - room: &room::Common, - prev_token: Option, - events: Vec, - ) -> Self { - let mut inner = TimelineInner::new(room.to_owned()); - inner.add_initial_events(events).await; - - let timeline = Self::from_inner(Arc::new(inner), prev_token); - - // The events we're injecting might be encrypted events, but we might - // have received the room key to decrypt them while nobody was listening to the - // `m.room_key` event, let's retry now. - // - // TODO: We could spawn a task here and put this into the background, though it - // might not be worth it depending on the number of events we injected. - // Some measuring needs to be done. - #[cfg(feature = "e2e-encryption")] - timeline.retry_decryption_for_all_events().await; - - timeline - } - - fn from_inner(inner: Arc, prev_token: Option) -> Timeline { - let room = inner.room(); - - let timeline_event_handle = room.add_event_handler({ - let inner = inner.clone(); - move |event, encryption_info: Option| { - let inner = inner.clone(); - async move { - inner.handle_live_event(event, encryption_info).await; - } - } - }); - - // Not using room.add_event_handler here because RoomKey events are - // to-device events that are not received in the context of a room. - #[cfg(feature = "e2e-encryption")] - let room_key_handle = room - .client - .add_event_handler(handle_room_key_event(inner.clone(), room.room_id().to_owned())); - #[cfg(feature = "e2e-encryption")] - let forwarded_room_key_handle = room.client.add_event_handler( - handle_forwarded_room_key_event(inner.clone(), room.room_id().to_owned()), - ); - - let event_handler_handles = vec![ - timeline_event_handle, - #[cfg(feature = "e2e-encryption")] - room_key_handle, - #[cfg(feature = "e2e-encryption")] - forwarded_room_key_handle, - ]; - - Timeline { - inner, - start_token: Mutex::new(prev_token), - _end_token: Mutex::new(None), - event_handler_handles, - } + pub(crate) fn builder(room: &room::Common) -> TimelineBuilder { + TimelineBuilder::new(room) } fn room(&self) -> &room::Common { self.inner.room() } - /// Enable tracking of the fully-read marker on this `Timeline`. - pub async fn with_fully_read_tracking(mut self) -> Self { - match self.room().account_data_static::().await { - Ok(Some(fully_read)) => match fully_read.deserialize() { - Ok(fully_read) => { - self.inner.set_fully_read_event(fully_read.content.event_id).await; - } - Err(e) => { - error!("Failed to deserialize fully-read account data: {e}"); - } - }, - Err(e) => { - error!("Failed to get fully-read account data from the store: {e}"); - } - _ => {} - } - - let inner = self.inner.clone(); - let fully_read_handle = self.room().add_event_handler(move |event| { - let inner = inner.clone(); - async move { - inner.handle_fully_read(event).await; - } - }); - self.event_handler_handles.push(fully_read_handle); - - self - } - /// Clear all timeline items, and reset pagination parameters. #[cfg(feature = "experimental-sliding-sync")] pub async fn clear(&self) { @@ -311,7 +213,7 @@ impl Timeline { .await; } - #[cfg(all(feature = "experimental-sliding-sync", feature = "e2e-encryption"))] + #[cfg(feature = "e2e-encryption")] async fn retry_decryption_for_all_events(&self) { self.inner .retry_event_decryption( diff --git a/crates/matrix-sdk/src/sliding_sync.rs b/crates/matrix-sdk/src/sliding_sync.rs index 8503b9698..cefe73dce 100644 --- a/crates/matrix-sdk/src/sliding_sync.rs +++ b/crates/matrix-sdk/src/sliding_sync.rs @@ -49,7 +49,7 @@ use tracing::{debug, error, instrument, trace, warn}; use url::Url; #[cfg(feature = "experimental-timeline")] -use crate::room::timeline::{EventTimelineItem, Timeline}; +use crate::room::timeline::{EventTimelineItem, Timeline, TimelineBuilder}; use crate::{config::RequestConfig, Client, Result}; /// Internal representation of errors in Sliding Sync @@ -241,16 +241,17 @@ impl SlidingSyncRoom { /// `Timeline` of this room #[cfg(feature = "experimental-timeline")] pub async fn timeline(&self) -> Option { - Some(self.timeline_no_fully_read_tracking().await?.with_fully_read_tracking().await) + Some(self.timeline_builder()?.track_fully_read().build().await) } - async fn timeline_no_fully_read_tracking(&self) -> Option { + #[cfg(feature = "experimental-timeline")] + fn timeline_builder(&self) -> Option { if let Some(room) = self.client.get_room(&self.room_id) { let current_timeline = self.timeline.lock_ref().to_vec(); let prev_batch = self.prev_batch.lock_ref().clone(); - Some(Timeline::with_events(&room, prev_batch, current_timeline).await) + Some(Timeline::builder(&room).events(prev_batch, current_timeline)) } else if let Some(invited_room) = self.client.get_invited_room(&self.room_id) { - Some(Timeline::with_events(&invited_room, None, vec![]).await) + Some(Timeline::builder(&invited_room).events(None, vec![])) } else { error!( room_id = ?self.room_id, @@ -266,7 +267,7 @@ impl SlidingSyncRoom { /// this `SlidingSyncRoom`. #[cfg(feature = "experimental-timeline")] pub async fn latest_event(&self) -> Option { - self.timeline_no_fully_read_tracking().await?.latest_event() + self.timeline_builder()?.build().await.latest_event() } /// This rooms name as calculated by the server, if any