diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8346f1d74..ed0e45565 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -185,7 +185,7 @@ jobs: - name: Test run: | cargo nextest run --workspace \ - --exclude matrix-sdk-integration-testing --exclude sliding-sync-integration-test + --exclude matrix-sdk-integration-testing --exclude sliding-sync-integration-test --features testing - name: Test documentation run: | diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 33abd9d9e..427fc4ad7 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -52,7 +52,7 @@ jobs: - name: Run tarpaulin run: | - cargo tarpaulin --out Xml -e sliding-sync-integration-test + cargo tarpaulin --out Xml -e sliding-sync-integration-test --features testing - name: Upload to codecov.io uses: codecov/codecov-action@v3 diff --git a/Cargo.lock b/Cargo.lock index 880e2b025..1abf6d29c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1519,6 +1519,18 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "eyeball-im-util" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "988b102aa389565187ccb138e51339c72258deb8af8e459180a582af40ca323b" +dependencies = [ + "eyeball-im", + "futures-core", + "imbl", + "pin-project-lite", +] + [[package]] name = "eyre" version = "0.6.8" @@ -2532,6 +2544,7 @@ dependencies = [ "event-listener", "eyeball", "eyeball-im", + "eyeball-im-util", "eyre", "futures-core", "futures-executor", @@ -2924,16 +2937,21 @@ name = "matrix-sdk-ui" version = "0.6.0" dependencies = [ "anyhow", + "assert-json-diff", "assert_matches", + "async-stream", "async-trait", "chrono", "ctor", + "eyeball", "eyeball-im", + "eyeball-im-util", "futures-core", "futures-util", "imbl", "indexmap", "matrix-sdk", + "matrix-sdk-base", "matrix-sdk-test", "mime", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 41c101734..18fe2c123 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ ctor = "0.2.0" dashmap = "5.2.0" eyeball = "0.7.0" eyeball-im = "0.2.0" +eyeball-im-util = "0.1.0" futures-core = "0.3.28" futures-executor = "0.3.21" futures-util = { version = "0.3.26", default-features = false, features = ["alloc"] } diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index 4e8ba8864..1da566026 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -592,7 +592,7 @@ impl SlidingSyncList { &self, observer: Box, ) -> Arc { - let mut room_list_stream = self.inner.room_list_stream(); + let (_, mut room_list_stream) = self.inner.room_list_stream(); Arc::new(TaskHandle::new(RUNTIME.spawn(async move { loop { @@ -695,12 +695,16 @@ impl SlidingSync { } pub fn get_room(&self, room_id: String) -> Result>, ClientError> { - Ok(self.inner.get_room(<&RoomId>::try_from(room_id.as_str())?).map(|inner| { - Arc::new(SlidingSyncRoom { - inner, - sliding_sync: self.inner.clone(), - client: self.client.clone(), - timeline: Default::default(), + let room_id = <&RoomId>::try_from(room_id.as_str())?; + + Ok(RUNTIME.block_on(async move { + self.inner.get_room(room_id).await.map(|inner| { + Arc::new(SlidingSyncRoom { + inner, + sliding_sync: self.inner.clone(), + client: self.client.clone(), + timeline: Default::default(), + }) }) })) } @@ -713,21 +717,24 @@ impl SlidingSync { .into_iter() .map(OwnedRoomId::try_from) .collect::, IdParseError>>()?; - Ok(self - .inner - .get_rooms(actual_ids.into_iter()) - .into_iter() - .map(|o| { - o.map(|inner| { - Arc::new(SlidingSyncRoom { - inner, - sliding_sync: self.inner.clone(), - client: self.client.clone(), - timeline: Default::default(), + + Ok(RUNTIME.block_on(async move { + self.inner + .get_rooms(actual_ids.into_iter()) + .await + .into_iter() + .map(|o| { + o.map(|inner| { + Arc::new(SlidingSyncRoom { + inner, + sliding_sync: self.inner.clone(), + client: self.client.clone(), + timeline: Default::default(), + }) }) }) - }) - .collect()) + .collect() + })) } pub fn add_list(&self, list_builder: Arc) -> Arc { diff --git a/crates/matrix-sdk-ui/Cargo.toml b/crates/matrix-sdk-ui/Cargo.toml index 048162cfa..d59898240 100644 --- a/crates/matrix-sdk-ui/Cargo.toml +++ b/crates/matrix-sdk-ui/Cargo.toml @@ -4,24 +4,31 @@ version = "0.6.0" edition = "2021" [features] -default = ["e2e-encryption", "native-tls"] +default = ["e2e-encryption", "native-tls", "experimental-room-list"] e2e-encryption = ["matrix-sdk/e2e-encryption"] native-tls = ["matrix-sdk/native-tls"] rustls-tls = ["matrix-sdk/rustls-tls"] +experimental-room-list = ["experimental-sliding-sync", "dep:async-stream", "dep:eyeball-im-util"] experimental-sliding-sync = ["matrix-sdk/experimental-sliding-sync"] +testing = ["matrix-sdk/testing"] + [dependencies] +async-stream = { workspace = true, optional = true } async-trait = { workspace = true } chrono = "0.4.23" +eyeball = { workspace = true } eyeball-im = { workspace = true } +eyeball-im-util = { workspace = true, optional = true } futures-core = { workspace = true } futures-util = { workspace = true } imbl = { version = "2.0.0", features = ["serde"] } indexmap = "1.9.1" matrix-sdk = { version = "0.6.2", path = "../matrix-sdk", default-features = false } +matrix-sdk-base = { version = "0.6.1", path = "../matrix-sdk-base" } mime = "0.3.16" once_cell = { workspace = true } pin-project-lite = "0.2.9" @@ -34,9 +41,14 @@ tracing = { workspace = true, features = ["attributes"] } [dev-dependencies] anyhow = { workspace = true } +assert-json-diff = "2.0" assert_matches = { workspace = true } ctor = { workspace = true } matrix-sdk-test = { version = "0.6.0", path = "../../testing/matrix-sdk-test" } stream_assert = "0.1.0" tracing-subscriber = { version = "0.3.11", features = ["env-filter"] } wiremock = "0.5.13" + +[[test]] +name = "integration" +required-features = ["testing"] diff --git a/crates/matrix-sdk-ui/src/lib.rs b/crates/matrix-sdk-ui/src/lib.rs index 6635ce881..a08debc48 100644 --- a/crates/matrix-sdk-ui/src/lib.rs +++ b/crates/matrix-sdk-ui/src/lib.rs @@ -13,6 +13,11 @@ // limitations under the License. mod events; + +#[cfg(feature = "experimental-room-list")] +pub mod room_list; pub mod timeline; +#[cfg(feature = "experimental-room-list")] +pub use self::room_list::RoomList; pub use self::timeline::Timeline; diff --git a/crates/matrix-sdk-ui/src/room_list/mod.rs b/crates/matrix-sdk-ui/src/room_list/mod.rs new file mode 100644 index 000000000..fadf3b986 --- /dev/null +++ b/crates/matrix-sdk-ui/src/room_list/mod.rs @@ -0,0 +1,688 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for that specific language governing permissions and +// limitations under the License. + +//! `RoomList` API. +//! +//! The `RoomList` is a UI API dedicated to present a list of Matrix rooms to +//! the user. The syncing is handled by +//! [`SlidingSync`][matrix_sdk::SlidingSync]. The idea is to expose a simple API +//! to handle most of the client app use cases, like: Showing and updating a +//! list of rooms, filtering a list of rooms, handling particular updates of a +//! range of rooms (the ones the client app is showing to the view, i.e. the +//! rooms present in the viewport) etc. +//! +//! As such, the `RoomList` works as an opinionated state machine. The states +//! are defined by [`State`]. Actions are attached to the each state transition. +//! Apart from that, one can apply [`Input`]s on the state machine, like +//! notifying that the client app viewport of the room list has changed (if the +//! user of the client app has scrolled in the room list for example) etc. +//! +//! The API is purposely small. Sliding Sync is versatile. `RoomList` is _one_ +//! specific usage of Sliding Sync. +//! +//! # Basic principle +//! +//! `RoomList` works with 2 Sliding Sync List: +//! +//! * `all_rooms` (referred by the constant [`ALL_ROOMS_LIST_NAME`]) is the main +//! list. Its goal is to load all the user' rooms. It starts with a +//! [`SlidingSyncMode::Selective`] sync-mode with a small range (i.e. a small +//! set of rooms) to load the first rooms quickly, and then updates to a +//! [`SlidingSyncMode::Growing`] sync-mode to load the remaining rooms “in the +//! background”: it will sync the existing rooms and will fetch new rooms, by +//! a certain batch size. +//! * `visible_rooms` (referred by the constant [`VISIBLE_ROOMS_LIST_NAME`]) is +//! the “reactive” list. It's goal is to react to the client app user actions. +//! If the user scrolls in the room list, the `visible_rooms` will be +//! configured to sync for the particular range of rooms the user is actually +//! seeing (the rooms in the current viewport). `visible_rooms` has a +//! different configuration than `all_rooms` as it loads more timeline events: +//! it means that the room will already have a “history”, a timeline, ready to +//! be presented when the user enters the room. +//! +//! This behavior has proven to be empirically satisfying to provide a fast and +//! fluid user experience for a Matrix client. +//! +//! [`RoomList::entries`] provides a way to get a stream of room list entry. +//! This stream can be filtered, and the filter can be changed over time. +//! +//! [`RoomList::state_stream`] provides a way to get a stream of the state +//! machine's state, which can be pretty helpful for the client app. + +use std::{future::ready, sync::Arc}; + +use async_stream::stream; +use async_trait::async_trait; +use eyeball::shared::Observable; +use eyeball_im::VectorDiff; +use futures_util::{pin_mut, Stream, StreamExt}; +use imbl::Vector; +pub use matrix_sdk::RoomListEntry; +use matrix_sdk::{ + sliding_sync::Ranges, Client, Error as SlidingSyncError, SlidingSync, SlidingSyncList, + SlidingSyncMode, SlidingSyncRoom, +}; +use once_cell::sync::Lazy; +use ruma::{OwnedRoomId, RoomId}; +use thiserror::Error; + +use crate::{timeline::EventTimelineItem, Timeline}; + +pub const ALL_ROOMS_LIST_NAME: &str = "all_rooms"; +pub const VISIBLE_ROOMS_LIST_NAME: &str = "visible_rooms"; + +/// The [`RoomList`] type. See the module's documentation to learn more. +#[derive(Debug)] +pub struct RoomList { + sliding_sync: SlidingSync, + state: Observable, +} + +impl RoomList { + /// Create a new `RoomList`. + /// + /// A [`matrix_sdk::SlidingSync`] client will be created, with a cached list + /// already pre-configured. + pub async fn new(client: Client) -> Result { + let sliding_sync = client + .sliding_sync("room-list") + .map_err(Error::SlidingSync)? + .enable_caching() + .map_err(Error::SlidingSync)? + .add_cached_list( + SlidingSyncList::builder(ALL_ROOMS_LIST_NAME) + .sync_mode(SlidingSyncMode::new_selective().add_range(0..=19)) + .timeline_limit(1), + ) + .await + .map_err(Error::SlidingSync)? + .build() + .await + .map_err(Error::SlidingSync)?; + + Ok(Self { sliding_sync, state: Observable::new(State::Init) }) + } + + /// Start to sync the room list. + /// + /// It's the main method of this entire API. Calling `sync` allows to + /// receive updates on the room list: new rooms, rooms updates etc. Those + /// updates can be read with [`Self::entries`]. This method returns a + /// [`Stream`] where produced items only hold an empty value in case of a + /// sync success, otherwise an error. + /// + /// The `RoomList`' state machine is run by this method. + /// + /// Stopping the [`Stream`] (i.e. stop polling it) and calling + /// [`Self::sync`] again will resume from the previous state of the state + /// machine. + pub fn sync(&self) -> impl Stream> + '_ { + stream! { + let sync = self.sliding_sync.sync(); + pin_mut!(sync); + + // This is a state machine implementation. + // Things happen in this order: + // + // 1. The next state is calculated, + // 2. The actions associated to the next state are run, + // 3. The next state is stored, + // 4. A sync is done. + // + // So the sync is done after the machine _has entered_ into a new state. + loop { + { + let next_state = self.state.read().next(&self.sliding_sync).await?; + + Observable::set(&self.state, next_state); + } + + match sync.next().await { + Some(Ok(_update_summary)) => { + yield Ok(()); + } + + Some(Err(error)) => { + let next_state = State::Terminated { from: Box::new(self.state.get()) }; + + Observable::set(&self.state, next_state); + + yield Err(Error::SlidingSync(error)); + + break; + } + + None => { + let next_state = State::Terminated { from: Box::new(self.state.get()) }; + + Observable::set(&self.state, next_state); + + break; + } + } + } + } + } + + /// Get the current state of the state machine. + pub fn state(&self) -> State { + self.state.get() + } + + /// Get a [`Stream`] of [`State`]s. + pub fn state_stream(&self) -> impl Stream { + Observable::subscribe(&self.state) + } + + /// Get all previous room list entries, in addition to a [`Stream`] to room + /// list entry's updates. + pub async fn entries( + &self, + ) -> Result<(Vector, impl Stream>), Error> { + self.sliding_sync + .on_list(ALL_ROOMS_LIST_NAME, |list| ready(list.room_list_stream())) + .await + .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_string())) + } + + /// Similar to [`Self::entries`] except that it's possible to provide a + /// filter that will filter out room list entries. + pub async fn entries_filtered( + &self, + filter: F, + ) -> Result<(Vector, impl Stream>), Error> + where + F: Fn(&RoomListEntry) -> bool + Send + Sync + 'static, + { + self.sliding_sync + .on_list(ALL_ROOMS_LIST_NAME, |list| ready(list.room_list_filtered_stream(filter))) + .await + .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_string())) + } + + /// Pass an [`Input`] onto the state machine. + pub async fn apply_input(&self, input: Input) -> Result<(), Error> { + use Input::*; + + match input { + Viewport(ranges) => { + self.update_viewport(ranges).await?; + } + } + + Ok(()) + } + + async fn update_viewport(&self, ranges: Ranges) -> Result<(), Error> { + self.sliding_sync + .on_list(VISIBLE_ROOMS_LIST_NAME, |list| { + list.set_sync_mode(SlidingSyncMode::new_selective().add_ranges(ranges.clone())); + + ready(()) + }) + .await + .ok_or_else(|| Error::InputHasNotBeenApplied(Input::Viewport(ranges)))?; + + Ok(()) + } + + /// Get a [`Room`] if it exists. + pub async fn room(&self, room_id: &RoomId) -> Result { + match self.sliding_sync.get_room(room_id).await { + Some(room) => Room::new(room).await, + None => Err(Error::RoomNotFound(room_id.to_owned())), + } + } + + #[cfg(any(test, feature = "testing"))] + pub fn sliding_sync(&self) -> &SlidingSync { + &self.sliding_sync + } +} + +/// A room in the room list. +/// +/// It's cheap to clone this type. +#[derive(Clone, Debug)] +pub struct Room { + inner: Arc, +} + +#[derive(Debug)] +struct RoomInner { + /// The Sliding Sync room. + sliding_sync_room: SlidingSyncRoom, + + /// The underlying client room. + room: matrix_sdk::room::Room, + + /// The timeline of the room. + timeline: Timeline, + + /// The “sneaky” timeline of the room, i.e. this timeline doesn't track the + /// read marker nor the receipts. + sneaky_timeline: Timeline, +} + +impl Room { + /// Create a new `Room`. + async fn new(sliding_sync_room: SlidingSyncRoom) -> Result { + let room = sliding_sync_room + .client() + .get_room(sliding_sync_room.room_id()) + .ok_or_else(|| Error::RoomNotFound(sliding_sync_room.room_id().to_owned()))?; + + let timeline = Timeline::builder(&room) + .events(sliding_sync_room.prev_batch(), sliding_sync_room.timeline_queue()) + .track_read_marker_and_receipts() + .build() + .await; + + let sneaky_timeline = Timeline::builder(&room) + .events(sliding_sync_room.prev_batch(), sliding_sync_room.timeline_queue()) + .build() + .await; + + Ok(Self { + inner: Arc::new(RoomInner { sliding_sync_room, room, timeline, sneaky_timeline }), + }) + } + + /// Get the best possible name for the room. + /// + /// If the sliding sync room has received a name from the server, then use + /// it, otherwise, let's calculate a name. + pub async fn name(&self) -> Option { + Some(match self.inner.sliding_sync_room.name() { + Some(name) => name, + None => self.inner.room.display_name().await.ok()?.to_string(), + }) + } + + /// Get the timeline of the room. + pub fn timeline(&self) -> &Timeline { + &self.inner.timeline + } + + /// Get the latest event of the timeline. + /// + /// It's different from `Self::timeline().latest_event()` as it won't track + /// the read marker and receipts. + pub async fn latest_event(&self) -> Option { + self.inner.sneaky_timeline.latest_event().await + } +} + +/// [`RoomList`]'s errors. +#[derive(Debug, Error)] +pub enum Error { + /// Error from [`matrix_sdk::SlidingSync`]. + #[error("SlidingSync failed")] + SlidingSync(SlidingSyncError), + + /// An operation has been requested on an unknown list. + #[error("Unknown list `{0}`")] + UnknownList(String), + + /// An input was asked to be applied but it wasn't possible to apply it. + #[error("The input has been not applied")] + InputHasNotBeenApplied(Input), + + /// The requested room doesn't exist. + #[error("Room `{0}` not found")] + RoomNotFound(OwnedRoomId), +} + +/// The state of the [`RoomList`]' state machine. +#[derive(Clone, Debug, PartialEq)] +pub enum State { + /// That's the first initial state. + Init, + + /// At this state, the first rooms start to be synced. + FirstRooms, + + /// At this state, all rooms start to be synced. + AllRooms, + + /// This state is the cruising speed, i.e. the “normal” state, where nothing + /// fancy happens: all rooms are syncing, and life is great. + Enjoy, + + /// At this state, the sync has been stopped (because it was requested, or + /// because it has errored too many times previously). + Terminated { from: Box }, +} + +impl State { + /// Transition to the next state, and execute the associated transition's + /// [`Actions`]. + async fn next(&self, sliding_sync: &SlidingSync) -> Result { + use State::*; + + let (next_state, actions) = match self { + Init => (FirstRooms, Actions::none()), + FirstRooms => (AllRooms, Actions::first_rooms_are_loaded()), + AllRooms => (Enjoy, Actions::none()), + Enjoy => (Enjoy, Actions::none()), + // If the state was `Terminated` but the next state is calculated again, it means the + // sync has been restarted. In this case, let's jump back on the previous state that led + // to the termination. No action is required in this scenario. + Terminated { from: previous_state } => { + match previous_state.as_ref() { + state @ Init | state @ FirstRooms => { + // Do nothing. + (state.to_owned(), Actions::none()) + } + + state @ AllRooms | state @ Enjoy => { + // Refresh the lists. + (state.to_owned(), Actions::refresh_lists()) + } + + Terminated { .. } => { + // Having `Terminated { from: Terminated { … } }` is not allowed. + unreachable!("It's impossible to reach `Terminated` from `Terminated`"); + } + } + } + }; + + for action in actions.iter() { + action.run(sliding_sync).await?; + } + + Ok(next_state) + } +} + +/// A trait to define what an `Action` is. +#[async_trait] +trait Action { + async fn run(&self, sliding_sync: &SlidingSync) -> Result<(), Error>; +} + +struct AddVisibleRoomsList; + +#[async_trait] +impl Action for AddVisibleRoomsList { + async fn run(&self, sliding_sync: &SlidingSync) -> Result<(), Error> { + sliding_sync + .add_list( + SlidingSyncList::builder(VISIBLE_ROOMS_LIST_NAME) + .sync_mode(SlidingSyncMode::new_selective()) + .timeline_limit(20), + ) + .await + .map_err(Error::SlidingSync)?; + + Ok(()) + } +} + +struct SetAllRoomsListToGrowingSyncMode; + +#[async_trait] +impl Action for SetAllRoomsListToGrowingSyncMode { + async fn run(&self, sliding_sync: &SlidingSync) -> Result<(), Error> { + sliding_sync + .on_list(ALL_ROOMS_LIST_NAME, |list| { + list.set_sync_mode(SlidingSyncMode::new_growing(50)); + + ready(()) + }) + .await + .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_string()))?; + + Ok(()) + } +} + +/// Type alias to represent one action. +type OneAction = Box; + +/// Type alias to represent many actions. +type ManyActions = Vec; + +/// A type to represent multiple actions. +/// +/// It contains helper methods to create pre-configured set of actions. +struct Actions { + actions: &'static Lazy, +} + +macro_rules! actions { + ( + $( + $action_group_name:ident => [ + $( $action_name:ident ),* $(,)? + ] + ),* + $(,)? + ) => { + $( + fn $action_group_name () -> Self { + static ACTIONS: Lazy = Lazy::new(|| { + vec![ + $( Box::new( $action_name ) ),* + ] + }); + + Self { actions: &ACTIONS } + } + )* + }; +} + +impl Actions { + actions! { + none => [], + first_rooms_are_loaded => [SetAllRoomsListToGrowingSyncMode, AddVisibleRoomsList], + refresh_lists => [SetAllRoomsListToGrowingSyncMode], + } + + fn iter(&self) -> &[OneAction] { + self.actions.as_slice() + } +} + +/// An input for the [`RoomList`]' state machine. +/// +/// An input is something that has happened or is happening or is requested by +/// the client app using this [`RoomList`]. +#[derive(Debug)] +pub enum Input { + /// The client app's viewport of the room list has changed. + /// + /// Use this input when the user of the client app is scrolling inside the + /// room list, and the viewport has changed. The viewport is defined as the + /// range of visible rooms in the room list. + Viewport(Ranges), +} + +#[cfg(test)] +mod tests { + use matrix_sdk::{config::RequestConfig, Session}; + use matrix_sdk_test::async_test; + use ruma::{api::MatrixVersion, device_id, user_id}; + use wiremock::MockServer; + + use super::*; + + async fn new_client() -> (Client, MockServer) { + let session = Session { + access_token: "1234".to_owned(), + refresh_token: None, + user_id: user_id!("@example:localhost").to_owned(), + device_id: device_id!("DEVICEID").to_owned(), + }; + + let server = MockServer::start().await; + let client = Client::builder() + .homeserver_url(server.uri()) + .server_versions([MatrixVersion::V1_0]) + .request_config(RequestConfig::new().disable_retry()) + .build() + .await + .unwrap(); + client.restore_session(session).await.unwrap(); + + (client, server) + } + + async fn new_room_list() -> Result { + let (client, _) = new_client().await; + + RoomList::new(client).await + } + + #[async_test] + async fn test_all_rooms_are_declared() -> Result<(), Error> { + let room_list = new_room_list().await?; + let sliding_sync = room_list.sliding_sync(); + + // List is present, in Selective mode. + assert_eq!( + sliding_sync + .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!( + list.sync_mode(), + SlidingSyncMode::Selective { ranges } if ranges == vec![0..=19] + ))) + .await, + Some(true) + ); + + Ok(()) + } + + #[async_test] + async fn test_states() -> Result<(), Error> { + let room_list = new_room_list().await?; + let sliding_sync = room_list.sliding_sync(); + + // First state. + let state = State::Init; + + // Hypothetical termination. + { + let state = + State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?; + assert_eq!(state, State::Init); + } + + // Next state. + let state = state.next(sliding_sync).await?; + assert_eq!(state, State::FirstRooms); + + // Hypothetical termination. + { + let state = + State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?; + assert_eq!(state, State::FirstRooms); + } + + // Next state. + let state = state.next(sliding_sync).await?; + assert_eq!(state, State::AllRooms); + + // Hypothetical termination. + { + let state = + State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?; + assert_eq!(state, State::AllRooms); + } + + // Next state. + let state = state.next(sliding_sync).await?; + assert_eq!(state, State::Enjoy); + + // Hypothetical termination. + { + let state = + State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?; + assert_eq!(state, State::Enjoy); + } + + // Next state. + let state = state.next(sliding_sync).await?; + assert_eq!(state, State::Enjoy); + + // Hypothetical termination. + { + let state = + State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?; + assert_eq!(state, State::Enjoy); + } + + Ok(()) + } + + #[async_test] + async fn test_action_add_visible_rooms_list() -> Result<(), Error> { + let room_list = new_room_list().await?; + let sliding_sync = room_list.sliding_sync(); + + // List is absent. + assert_eq!(sliding_sync.on_list(VISIBLE_ROOMS_LIST_NAME, |_list| ready(())).await, None); + + // Run the action! + AddVisibleRoomsList.run(sliding_sync).await?; + + // List is present! + assert_eq!( + sliding_sync + .on_list(VISIBLE_ROOMS_LIST_NAME, |list| ready(matches!( + list.sync_mode(), + SlidingSyncMode::Selective { ranges } if ranges.is_empty() + ))) + .await, + Some(true) + ); + + Ok(()) + } + + #[async_test] + async fn test_action_set_all_rooms_list_to_growing_sync_mode() -> Result<(), Error> { + let room_list = new_room_list().await?; + let sliding_sync = room_list.sliding_sync(); + + // List is present, in Selective mode. + assert_eq!( + sliding_sync + .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!( + list.sync_mode(), + SlidingSyncMode::Selective { ranges } if ranges == vec![0..=19] + ))) + .await, + Some(true) + ); + + // Run the action! + SetAllRoomsListToGrowingSyncMode.run(sliding_sync).await.unwrap(); + + // List is still present, in Growing mode. + assert_eq!( + sliding_sync + .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!( + list.sync_mode(), + SlidingSyncMode::Growing { batch_size: 50, .. } + ))) + .await, + Some(true) + ); + + Ok(()) + } +} diff --git a/crates/matrix-sdk-ui/tests/integration/main.rs b/crates/matrix-sdk-ui/tests/integration/main.rs index 0a6eac8cc..a507258ba 100644 --- a/crates/matrix-sdk-ui/tests/integration/main.rs +++ b/crates/matrix-sdk-ui/tests/integration/main.rs @@ -21,6 +21,8 @@ use wiremock::{ Mock, MockServer, ResponseTemplate, }; +#[cfg(feature = "experimental-room-list")] +mod room_list; mod timeline; #[cfg(all(test, not(target_arch = "wasm32")))] diff --git a/crates/matrix-sdk-ui/tests/integration/room_list.rs b/crates/matrix-sdk-ui/tests/integration/room_list.rs new file mode 100644 index 000000000..1da379f18 --- /dev/null +++ b/crates/matrix-sdk-ui/tests/integration/room_list.rs @@ -0,0 +1,1378 @@ +use assert_matches::assert_matches; +use eyeball_im::VectorDiff; +use futures_util::{pin_mut, FutureExt, StreamExt}; +use imbl::vector; +use matrix_sdk_test::async_test; +use matrix_sdk_ui::{ + room_list::{ + Error, Input, RoomListEntry, State, ALL_ROOMS_LIST_NAME as ALL_ROOMS, + VISIBLE_ROOMS_LIST_NAME as VISIBLE_ROOMS, + }, + timeline::{TimelineItem, VirtualTimelineItem}, + RoomList, +}; +use ruma::{event_id, room_id}; +use serde_json::json; +use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate}; + +use crate::{ + logged_in_client, + timeline::sliding_sync::{assert_timeline_stream, timeline_event}, +}; + +async fn new_room_list() -> Result<(MockServer, RoomList), Error> { + let (client, server) = logged_in_client().await; + let room_list = RoomList::new(client).await?; + + Ok((server, room_list)) +} + +#[derive(Copy, Clone)] +struct SlidingSyncMatcher; + +impl Match for SlidingSyncMatcher { + fn matches(&self, request: &Request) -> bool { + request.url.path() == "/_matrix/client/unstable/org.matrix.msc3575/sync" + && request.method == Method::Post + } +} + +macro_rules! sync_then_assert_request_and_fake_response { + ( + [$server:ident, $room_list:ident, $room_list_sync_stream:ident] + $( states = $pre_state:pat => $post_state:pat, )? + assert request = { $( $request_json:tt )* }, + respond with = $( ( code $code:expr ) )? { $( $response_json:tt )* } + $(,)? + ) => { + sync_then_assert_request_and_fake_response! { + [$server, $room_list, $room_list_sync_stream] + sync matches Some(Ok(_)), + $( states = $pre_state => $post_state, )? + assert request = { $( $request_json )* }, + respond with = $( ( code $code ) )? { $( $response_json )* }, + } + }; + + ( + [$server:ident, $room_list:ident, $room_list_sync_stream:ident] + sync matches $sync_result:pat, + $( states = $pre_state:pat => $post_state:pat, )? + assert request = { $( $request_json:tt )* }, + respond with = $( ( code $code:expr ) )? { $( $response_json:tt )* } + $(,)? + ) => { + { + let _code = 200; + $( let _code = $code; )? + + let _mock_guard = Mock::given(SlidingSyncMatcher) + .respond_with(ResponseTemplate::new(_code).set_body_json( + json!({ $( $response_json )* }) + )) + .mount_as_scoped(&$server) + .await; + + $( + use State::*; + + assert_matches!($room_list.state(), $pre_state, "pre state"); + )? + + let next = $room_list_sync_stream.next().await; + + assert_matches!(next, $sync_result, "sync's result"); + + for request in $server.received_requests().await.expect("Request recording has been disabled").iter().rev() { + if SlidingSyncMatcher.matches(request) { + let json_value = serde_json::from_slice::(&request.body).unwrap(); + + if let Err(error) = assert_json_diff::assert_json_matches_no_panic( + &json_value, + &json!({ $( $request_json )* }), + assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive), + ) { + dbg!(json_value); + panic!("{}", error); + } + + break; + } + } + + $( assert_matches!($room_list.state(), $post_state, "post state"); )? + + next + } + }; +} + +macro_rules! entries { + ( @_ [ E $( , $( $rest:tt )* )? ] [ $( $accumulator:tt )* ] ) => { + entries!( @_ [ $( $( $rest )* )? ] [ $( $accumulator )* RoomListEntry::Empty, ] ) + }; + + ( @_ [ F( $room_id:literal ) $( , $( $rest:tt )* )? ] [ $( $accumulator:tt )* ] ) => { + entries!( @_ [ $( $( $rest )* )? ] [ $( $accumulator )* RoomListEntry::Filled(room_id!( $room_id ).to_owned()), ] ) + }; + + ( @_ [ I( $room_id:literal ) $( , $( $rest:tt )* )? ] [ $( $accumulator:tt )* ] ) => { + entries!( @_ [ $( $( $rest )* )? ] [ $( $accumulator )* RoomListEntry::Invalidated(room_id!( $room_id ).to_owned()), ] ) + }; + + ( @_ [] [ $( $accumulator:tt )* ] ) => { + vector![ $( $accumulator )* ] + }; + + ( $( $all:tt )* ) => { + entries!( @_ [ $( $all )* ] [] ) + }; +} + +macro_rules! assert_entries_stream { + // `append [$entries]` + ( @_ [ $stream:ident ] [ append [ $( $entries:tt )+ ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => { + assert_entries_stream!( + @_ + [ $stream ] + [ $( $rest )* ] + [ + $( $accumulator )* + assert_matches!( + $stream.next().now_or_never(), + Some(Some(VectorDiff::Append { values })) => { + assert_eq!(values, entries!( $( $entries )+ )); + } + ); + ] + ) + }; + + // `set [$nth] [$entry]` + ( @_ [ $stream:ident ] [ set [ $index:literal ] [ $( $entry:tt )+ ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => { + assert_entries_stream!( + @_ + [ $stream ] + [ $( $rest )* ] + [ + $( $accumulator )* + assert_matches!( + $stream.next().now_or_never(), + Some(Some(VectorDiff::Set { index: $index, value })) => { + assert_eq!(value, entries!( $( $entry )+ )[0]); + } + ); + ] + ) + }; + + // `remove [$nth]` + ( @_ [ $stream:ident ] [ remove [ $index:literal ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => { + assert_entries_stream!( + @_ + [ $stream ] + [ $( $rest )* ] + [ + $( $accumulator )* + assert_eq!( + $stream.next().now_or_never(), + Some(Some(VectorDiff::Remove { index: $index })), + ); + ] + ) + }; + + // `insert [$nth] [ $entry ]` + ( @_ [ $stream:ident ] [ insert [ $index:literal ] [ $( $entry:tt )+ ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => { + assert_entries_stream!( + @_ + [ $stream ] + [ $( $rest )* ] + [ + $( $accumulator )* + assert_matches!( + $stream.next().now_or_never(), + Some(Some(VectorDiff::Insert { index: $index, value })) => { + assert_eq!(value, entries!( $( $entry )+ )[0]); + } + ); + ] + ) + }; + + // `pending` + ( @_ [ $stream:ident ] [ pending ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => { + assert_entries_stream!( + @_ + [ $stream ] + [ $( $rest )* ] + [ + $( $accumulator )* + assert_eq!($stream.next().now_or_never(), None); + ] + ) + }; + + ( @_ [ $stream:ident ] [] [ $( $accumulator:tt )* ] ) => { + $( $accumulator )* + }; + + ( [ $stream:ident ] $( $all:tt )* ) => { + assert_entries_stream!( @_ [ $stream ] [ $( $all )* ] [] ) + }; +} + +#[async_test] +async fn test_sync_from_init_to_enjoy() -> Result<(), Error> { + let (server, room_list) = new_room_list().await?; + + let sync = room_list.sync(); + pin_mut!(sync); + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = Init => FirstRooms, + assert request = { + "lists": { + ALL_ROOMS: { + "ranges": [ + [0, 19], + ], + "required_state": [ + ["m.room.encryption", ""], + ], + "sort": ["by_recency", "by_name"], + "timeline_limit": 1, + }, + }, + "extensions": { + "e2ee": { + "enabled": true, + }, + "to_device": { + "enabled": true, + }, + }, + }, + respond with = { + "pos": "0", + "lists": { + ALL_ROOMS: { + "count": 200, + "ops": [ + // let's ignore them for now + ], + }, + }, + "rooms": { + // let's ignore them for now + }, + "extensions": {}, + }, + }; + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = FirstRooms => AllRooms, + assert request = { + "lists": { + ALL_ROOMS: { + "ranges": [ + [0, 49], + ], + "timeline_limit": 1, + }, + VISIBLE_ROOMS: { + "ranges": [], + "required_state": [ + ["m.room.encryption", ""], + ], + "sort": ["by_recency", "by_name"], + "timeline_limit": 20, + } + } + }, + respond with = { + "pos": "1", + "lists": { + ALL_ROOMS: { + "count": 200, + "ops": [ + // let's ignore them for now + ], + }, + VISIBLE_ROOMS: { + "count": 0, + "ops": [], + }, + }, + "rooms": { + // let's ignore them for now + }, + }, + }; + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = AllRooms => Enjoy, + assert request = { + "lists": { + ALL_ROOMS: { + "ranges": [[0, 99]], + }, + VISIBLE_ROOMS: { + "ranges": [], + }, + }, + }, + respond with = { + "pos": "2", + "lists": { + ALL_ROOMS: { + "count": 200, + "ops": [ + // let's ignore them for now + ], + }, + VISIBLE_ROOMS: { + "count": 0, + "ops": [], + }, + }, + "rooms": { + // let's ignore them for now + }, + }, + }; + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = Enjoy => Enjoy, + assert request = { + "lists": { + ALL_ROOMS: { + "ranges": [[0, 149]], + }, + VISIBLE_ROOMS: { + "ranges": [], + }, + }, + }, + respond with = { + "pos": "2", + "lists": { + ALL_ROOMS: { + "count": 200, + "ops": [ + // let's ignore them for now + ], + }, + VISIBLE_ROOMS: { + "count": 0, + "ops": [], + }, + }, + "rooms": { + // let's ignore them for now + }, + }, + }; + + Ok(()) +} +#[async_test] + +async fn test_sync_resumes_from_previous_state() -> Result<(), Error> { + let (server, room_list) = new_room_list().await?; + + // Start a sync, and drop it at the end of the block. + { + let sync = room_list.sync(); + pin_mut!(sync); + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = Init => FirstRooms, + assert request = { + "lists": { + ALL_ROOMS: { + "ranges": [[0, 19]], + }, + }, + }, + respond with = { + "pos": "0", + "lists": { + ALL_ROOMS: { + "count": 10, + "ops": [] + }, + }, + "rooms": {}, + }, + }; + } + + // Start a sync, and drop it at the end of the block. + { + let sync = room_list.sync(); + pin_mut!(sync); + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = FirstRooms => AllRooms, + assert request = { + "lists": { + ALL_ROOMS: { + "ranges": [[0, 9]], + }, + VISIBLE_ROOMS: { + "ranges": [], + }, + }, + }, + respond with = { + "pos": "1", + "lists": { + ALL_ROOMS: { + "count": 10, + "ops": [], + }, + VISIBLE_ROOMS: { + "count": 0, + "ops": [], + }, + }, + "rooms": {}, + }, + }; + } + + // Start a sync, and drop it at the end of the block. + { + let sync = room_list.sync(); + pin_mut!(sync); + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = AllRooms => Enjoy, + assert request = { + "lists": { + ALL_ROOMS: { + "ranges": [[0, 9]], + }, + VISIBLE_ROOMS: { + "ranges": [], + }, + }, + }, + respond with = { + "pos": "2", + "lists": { + ALL_ROOMS: { + "count": 10, + "ops": [], + }, + VISIBLE_ROOMS: { + "count": 0, + "ops": [], + }, + }, + "rooms": {}, + }, + }; + } + + Ok(()) +} + +#[async_test] +async fn test_sync_resumes_from_terminated() -> Result<(), Error> { + let (server, room_list) = new_room_list().await?; + + let sync = room_list.sync(); + pin_mut!(sync); + + // Simulate an error from the `Init` state. + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + sync matches Some(Err(_)), + states = Init => Terminated { .. }, + assert request = { + "lists": { + ALL_ROOMS: { + // The default range, in selective sync-mode. + "ranges": [[0, 19]], + }, + }, + }, + respond with = (code 400) { + "error": "foo", + "errcode": "M_UNKNOWN", + }, + }; + + // Ensure sync is terminated. + assert!(sync.next().await.is_none()); + + // Start a new sync. + let sync = room_list.sync(); + pin_mut!(sync); + + // Do a regular sync from the `Terminated` state. + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = Terminated { .. } => FirstRooms, + assert request = { + "lists": { + ALL_ROOMS: { + // Still the default range, in selective sync-mode. + "ranges": [[0, 19]], + }, + }, + }, + respond with = { + "pos": "1", + "lists": { + ALL_ROOMS: { + "count": 110, + }, + }, + "rooms": {}, + }, + }; + + // Simulate an error from the `FirstRooms` state. + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + sync matches Some(Err(_)), + states = FirstRooms => Terminated { .. }, + assert request = { + "lists": { + ALL_ROOMS: { + // In `FirstRooms`, the sync-mode has changed to growing, with + // its initial range. + "ranges": [[0, 49]], + }, + VISIBLE_ROOMS: { + // Hello new list. + "ranges": [], + }, + }, + }, + respond with = (code 400) { + "error": "foo", + "errcode": "M_UNKNOWN", + }, + }; + + // Ensure sync is terminated. + assert!(sync.next().await.is_none()); + + // Start a new sync. + let sync = room_list.sync(); + pin_mut!(sync); + + // Update the viewport, just to be sure it's not reset later. + room_list.apply_input(Input::Viewport(vec![5..=10])).await?; + + // Do a regular sync from the `Terminated` state. + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = Terminated { .. } => AllRooms, + assert request = { + "lists": { + ALL_ROOMS: { + // In `AllRooms`, the sync-mode is still growing, but the range + // hasn't been modified due to previous error. + "ranges": [[0, 49]], + }, + VISIBLE_ROOMS: { + // We have set a viewport, which reflects here. + "ranges": [[5, 10]], + }, + }, + }, + respond with = { + "pos": "2", + "lists": { + ALL_ROOMS: { + "count": 110, + }, + }, + "rooms": {}, + }, + }; + + // Simulate an error from the `AllRooms` state. + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + sync matches Some(Err(_)), + states = AllRooms => Terminated { .. }, + assert request = { + "lists": { + ALL_ROOMS: { + // In `AllRooms`, the sync-mode is still growing, and the range + // has made progress. + "ranges": [[0, 99]], + }, + VISIBLE_ROOMS: { + // Despites the error, the range is kept. + "ranges": [[5, 10]], + }, + }, + }, + respond with = (code 400) { + "error": "foo", + "errcode": "M_UNKNOWN", + }, + }; + + // Ensure sync is terminated. + assert!(sync.next().await.is_none()); + + // Start a new sync. + let sync = room_list.sync(); + pin_mut!(sync); + + // Do a regular sync from the `Terminated` state. + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = Terminated { .. } => Enjoy, + assert request = { + "lists": { + ALL_ROOMS: { + // Due to the error, the range is reset to its initial value. + "ranges": [[0, 49]], + }, + VISIBLE_ROOMS: { + // Despites the error, the range is kept. + "ranges": [[5, 10]], + }, + }, + }, + respond with = { + "pos": "3", + "lists": { + ALL_ROOMS: { + "count": 110, + }, + }, + "rooms": {}, + }, + }; + + // Do a regular sync from the `Enjoy` state to update the `ALL_ROOMS` list + // again. + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = Enjoy => Enjoy, + assert request = { + "lists": { + ALL_ROOMS: { + // No error. The range is making progress. + "ranges": [[0, 99]], + }, + VISIBLE_ROOMS: { + // No error. The range is still here. + "ranges": [[5, 10]], + }, + }, + }, + respond with = { + "pos": "4", + "lists": { + ALL_ROOMS: { + "count": 110, + }, + }, + "rooms": {}, + }, + }; + + // Simulate an error from the `Enjoy` state. + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + sync matches Some(Err(_)), + states = Enjoy => Terminated { .. }, + assert request = { + "lists": { + ALL_ROOMS: { + // Range is making progress and is even reaching the maximum + // number of rooms. + "ranges": [[0, 109]], + }, + VISIBLE_ROOMS: { + // The range is still here. + "ranges": [[5, 10]], + }, + }, + }, + respond with = (code 400) { + "error": "foo", + "errcode": "M_UNKNOWN", + }, + }; + + // Ensure sync is terminated. + assert!(sync.next().await.is_none()); + + // Start a new sync. + let sync = room_list.sync(); + pin_mut!(sync); + + // Do a regular sync from the `Terminated` state. + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = Terminated { .. } => Enjoy, + assert request = { + "lists": { + ALL_ROOMS: { + // An error was received at the previous sync iteration. + // The list is still in growing sync-mode, but its range has + // been reset. + "ranges": [[0, 49]], + }, + VISIBLE_ROOMS: { + // The range is still here. + "ranges": [[5, 10]], + }, + }, + }, + respond with = { + "pos": "5", + "lists": { + ALL_ROOMS: { + "count": 110, + }, + }, + "rooms": {}, + }, + }; + + Ok(()) +} + +#[async_test] +async fn test_entries_stream() -> Result<(), Error> { + let (server, room_list) = new_room_list().await?; + + let sync = room_list.sync(); + pin_mut!(sync); + + let (previous_entries, entries_stream) = room_list.entries().await?; + pin_mut!(entries_stream); + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = Init => FirstRooms, + assert request = { + "lists": { + ALL_ROOMS: { + "ranges": [[0, 19]], + }, + }, + }, + respond with = { + "pos": "0", + "lists": { + ALL_ROOMS: { + "count": 10, + "ops": [ + { + "op": "SYNC", + "range": [0, 2], + "room_ids": [ + "!r0:bar.org", + "!r1:bar.org", + "!r2:bar.org", + ], + }, + ], + }, + }, + "rooms": { + "!r0:bar.org": { + "name": "Room #0", + "initial": true, + "timeline": [], + }, + "!r1:bar.org": { + "name": "Room #1", + "initial": true, + "timeline": [], + }, + "!r2:bar.org": { + "name": "Room #2", + "initial": true, + "timeline": [], + } + }, + }, + }; + + assert!(previous_entries.is_empty()); + assert_entries_stream! { + [entries_stream] + append [ E, E, E, E, E, E, E, E, E, E ]; + set[0] [ F("!r0:bar.org") ]; + set[1] [ F("!r1:bar.org") ]; + set[2] [ F("!r2:bar.org") ]; + pending; + } + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = FirstRooms => AllRooms, + assert request = { + "lists": { + ALL_ROOMS: { + "ranges": [ + [0, 9], + ], + }, + VISIBLE_ROOMS: { + "ranges": [], + } + } + }, + respond with = { + "pos": "1", + "lists": { + ALL_ROOMS: { + "count": 9, + "ops": [ + { + "op": "DELETE", + "index": 1, + }, + { + "op": "DELETE", + "index": 0, + }, + { + "op": "INSERT", + "index": 0, + "room_id": "!r3:bar.org" + }, + ], + }, + VISIBLE_ROOMS: { + "count": 0, + "ops": [ + // let's ignore them for now + ], + }, + }, + "rooms": { + "!r3:bar.org": { + "name": "Room #3", + "initial": true, + "timeline": [], + }, + }, + }, + }; + + assert_entries_stream! { + [entries_stream] + remove[1]; + remove[0]; + insert[0] [ F("!r3:bar.org") ]; + pending; + } + + Ok(()) +} + +#[async_test] +async fn test_entries_stream_with_updated_filter() -> Result<(), Error> { + let (server, room_list) = new_room_list().await?; + + let sync = room_list.sync(); + pin_mut!(sync); + + let (previous_entries, entries_stream) = room_list.entries().await?; + pin_mut!(entries_stream); + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = Init => FirstRooms, + assert request = { + "lists": { + ALL_ROOMS: { + "ranges": [[0, 19]], + }, + }, + }, + respond with = { + "pos": "0", + "lists": { + ALL_ROOMS: { + "count": 10, + "ops": [ + { + "op": "SYNC", + "range": [0, 0], + "room_ids": [ + "!r0:bar.org", + ], + }, + ], + }, + }, + "rooms": { + "!r0:bar.org": { + "name": "Room #0", + "initial": true, + "timeline": [], + }, + }, + }, + }; + + assert!(previous_entries.is_empty()); + assert_entries_stream! { + [entries_stream] + append [ E, E, E, E, E, E, E, E, E, E ]; + set[0] [ F("!r0:bar.org") ]; + pending; + }; + + let (previous_entries, entries_stream) = room_list + .entries_filtered(|room_list_entry| { + matches!( + room_list_entry.as_room_id(), + Some(room_id) if room_id.server_name() == "bar.org" + ) + }) + .await?; + pin_mut!(entries_stream); + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = FirstRooms => AllRooms, + assert request = { + "lists": { + ALL_ROOMS: { + "ranges": [ + [0, 9], + ], + }, + VISIBLE_ROOMS: { + "ranges": [], + } + } + }, + respond with = { + "pos": "1", + "lists": { + ALL_ROOMS: { + "count": 10, + "ops": [ + { + "op": "SYNC", + "range": [1, 4], + "room_ids": [ + "!r1:bar.org", + "!r2:qux.org", + "!r3:qux.org", + "!r4:bar.org", + ], + }, + ], + }, + VISIBLE_ROOMS: { + "count": 0, + "ops": [ + // let's ignore them for now + ], + }, + }, + "rooms": { + "!r1:bar.org": { + "name": "Room #1", + "initial": true, + "timeline": [], + }, + "!r2:qux.org": { + "name": "Room #2", + "initial": true, + "timeline": [], + }, + "!r3:qux.org": { + "name": "Room #3", + "initial": true, + "timeline": [], + }, + "!r4:bar.org": { + "name": "Room #4", + "initial": true, + "timeline": [], + }, + }, + }, + }; + + assert_eq!(previous_entries, entries![F("!r0:bar.org")]); + assert_entries_stream! { + [entries_stream] + insert[1] [ F("!r1:bar.org") ]; + insert[2] [ F("!r4:bar.org") ]; + pending; + }; + + Ok(()) +} + +#[async_test] +async fn test_room() -> Result<(), Error> { + let (server, room_list) = new_room_list().await?; + + let sync = room_list.sync(); + pin_mut!(sync); + + let room_id_0 = room_id!("!r0:bar.org"); + let room_id_1 = room_id!("!r1:bar.org"); + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + assert request = {}, + respond with = { + "pos": "0", + "lists": { + ALL_ROOMS: { + "count": 2, + "ops": [ + { + "op": "SYNC", + "range": [0, 1], + "room_ids": [ + room_id_0, + room_id_1, + ], + }, + ], + }, + }, + "rooms": { + room_id_0: { + "name": "Room #0", + "initial": true, + }, + room_id_1: { + "initial": true, + }, + }, + }, + }; + + // Room has received a name from sliding sync. + let room0 = room_list.room(room_id_0).await?; + assert_eq!(room0.name().await, Some("Room #0".to_string())); + + // Room has not received a name from sliding sync, then it's calculated. + let room1 = room_list.room(room_id_1).await?; + assert_eq!(room1.name().await, Some("Empty Room".to_string())); + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + assert request = {}, + respond with = { + "pos": "1", + "lists": { + ALL_ROOMS: { + "count": 2, + "ops": [ + { + "op": "SYNC", + "range": [1, 1], + "room_ids": [ + room_id_1, + ], + }, + ], + }, + }, + "rooms": { + room_id_1: { + "name": "Room #1", + }, + }, + }, + }; + + // Room has _now_ received a name from sliding sync! + assert_eq!(room1.name().await, Some("Room #1".to_string())); + + Ok(()) +} + +#[async_test] +async fn test_room_not_found() -> Result<(), Error> { + let (_server, room_list) = new_room_list().await?; + + let room_id = room_id!("!foo:bar.org"); + + assert_matches!( + room_list.room(room_id).await, + Err(Error::RoomNotFound(error_room_id)) => { + assert_eq!(error_room_id, room_id.to_owned()); + } + ); + + Ok(()) +} + +#[async_test] +async fn test_room_timeline() -> Result<(), Error> { + let (server, room_list) = new_room_list().await?; + + let sync = room_list.sync(); + pin_mut!(sync); + + let room_id = room_id!("!r0:bar.org"); + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + assert request = {}, + respond with = { + "pos": "0", + "lists": { + ALL_ROOMS: { + "count": 2, + "ops": [ + { + "op": "SYNC", + "range": [0, 0], + "room_ids": [room_id], + }, + ], + }, + }, + "rooms": { + room_id: { + "name": "Room #0", + "initial": true, + "timeline": [ + timeline_event!("$x0:bar.org" at 0 sec), + ], + }, + }, + }, + }; + + let room = room_list.room(room_id).await?; + let timeline = room.timeline(); + + let (previous_timeline_items, mut timeline_items_stream) = timeline.subscribe().await; + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + assert request = {}, + respond with = { + "pos": "0", + "lists": {}, + "rooms": { + room_id: { + "timeline": [ + timeline_event!("$x1:bar.org" at 1 sec), + timeline_event!("$x2:bar.org" at 2 sec), + ], + }, + }, + }, + }; + + // Previous timeline items. + assert_matches!( + previous_timeline_items[0].as_ref(), + TimelineItem::Virtual(VirtualTimelineItem::DayDivider(_)) + ); + assert_matches!( + previous_timeline_items[1].as_ref(), + TimelineItem::Event(item) => { + assert_eq!(item.event_id().unwrap().as_str(), "$x0:bar.org"); + } + ); + + // Timeline items stream. + assert_timeline_stream! { + [timeline_items_stream] + update[1] "$x0:bar.org"; + append "$x1:bar.org"; + update[2] "$x1:bar.org"; + append "$x2:bar.org"; + }; + + Ok(()) +} + +#[async_test] +async fn test_room_latest_event() -> Result<(), Error> { + let (server, room_list) = new_room_list().await?; + + let sync = room_list.sync(); + pin_mut!(sync); + + let room_id = room_id!("!r0:bar.org"); + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + assert request = {}, + respond with = { + "pos": "0", + "lists": { + ALL_ROOMS: { + "count": 2, + "ops": [ + { + "op": "SYNC", + "range": [0, 0], + "room_ids": [room_id], + }, + ], + }, + }, + "rooms": { + room_id: { + "name": "Room #0", + "initial": true, + }, + }, + }, + }; + + let room = room_list.room(room_id).await?; + + // The latest event does not exist. + assert!(room.latest_event().await.is_none()); + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + assert request = {}, + respond with = { + "pos": "0", + "lists": {}, + "rooms": { + room_id: { + "timeline": [ + timeline_event!("$x0:bar.org" at 0 sec), + ], + }, + }, + }, + }; + + // The latest event exists. + assert_matches!( + room.latest_event().await, + Some(event) => { + assert_eq!(event.event_id(), Some(event_id!("$x0:bar.org"))); + } + ); + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + assert request = {}, + respond with = { + "pos": "0", + "lists": {}, + "rooms": { + room_id: { + "timeline": [ + timeline_event!("$x1:bar.org" at 1 sec), + ], + }, + }, + }, + }; + + // The latest event has been updated. + assert_matches!( + room.latest_event().await, + Some(event) => { + assert_eq!(event.event_id(), Some(event_id!("$x1:bar.org"))); + } + ); + + Ok(()) +} + +#[async_test] +async fn test_input_viewport() -> Result<(), Error> { + let (server, room_list) = new_room_list().await?; + + let sync = room_list.sync(); + pin_mut!(sync); + + // The input cannot be applied because the `VISIBLE_ROOMS_LIST_NAME` list isn't + // present. + assert_matches!( + room_list.apply_input(Input::Viewport(vec![10..=15])).await, + Err(Error::InputHasNotBeenApplied(_)) + ); + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = Init => FirstRooms, + assert request = { + "lists": { + ALL_ROOMS: { + "ranges": [[0, 19]], + }, + }, + }, + respond with = { + "pos": "0", + "lists": {}, + "rooms": {}, + }, + }; + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = FirstRooms => AllRooms, + assert request = { + "lists": { + ALL_ROOMS: { + "ranges": [[0, 49]], + }, + VISIBLE_ROOMS: { + "ranges": [], + "timeline_limit": 20, + } + } + }, + respond with = { + "pos": "1", + "lists": {}, + "rooms": {}, + }, + }; + + assert!(room_list.apply_input(Input::Viewport(vec![10..=15, 20..=25])).await.is_ok()); + + sync_then_assert_request_and_fake_response! { + [server, room_list, sync] + states = AllRooms => Enjoy, + assert request = { + "lists": { + ALL_ROOMS: { + "ranges": [[0, 49]], + }, + VISIBLE_ROOMS: { + "ranges": [[10, 15], [20, 25]], + "timeline_limit": 20, + } + } + }, + respond with = { + "pos": "1", + "lists": {}, + "rooms": {}, + }, + }; + + Ok(()) +} diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs b/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs index f15baf86a..189f58779 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs @@ -36,7 +36,7 @@ mod echo; mod pagination; mod read_receipts; #[cfg(feature = "experimental-sliding-sync")] -mod sliding_sync; +pub(crate) mod sliding_sync; use crate::{logged_in_client, mock_sync}; diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/sliding_sync.rs b/crates/matrix-sdk-ui/tests/integration/timeline/sliding_sync.rs index 7d656869e..f2cff0301 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/sliding_sync.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/sliding_sync.rs @@ -42,7 +42,7 @@ macro_rules! receive_response { .mount_as_scoped(&$server) .await; - let next = $sliding_sync_stream.next().await.context("`stream` trip")??; + let next = $sliding_sync_stream.next().await.context("`sync` trip")??; next } @@ -64,6 +64,8 @@ macro_rules! timeline_event { } } +pub(crate) use timeline_event; + macro_rules! assert_timeline_stream { // `--- day divider ---` ( @_ [ $stream:ident ] [ --- day divider --- ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => { @@ -77,7 +79,12 @@ macro_rules! assert_timeline_stream { assert_matches!( $stream.next().now_or_never(), Some(Some(VectorDiff::PushBack { value })) => { - assert_matches!(value.as_ref(), TimelineItem::Virtual(VirtualTimelineItem::DayDivider(_))); + assert_matches!( + value.as_ref(), + TimelineItem::Virtual( + VirtualTimelineItem::DayDivider(_) + ) + ); } ); } @@ -162,6 +169,8 @@ macro_rules! assert_timeline_stream { }; } +pub(crate) use assert_timeline_stream; + async fn new_sliding_sync(lists: Vec) -> Result<(MockServer, SlidingSync)> { let (client, server) = logged_in_client().await; @@ -201,7 +210,7 @@ async fn create_one_room( assert!(update.rooms.contains(&room_id.to_owned())); - let room = sliding_sync.get_room(room_id).context("`get_room`")?; + let room = sliding_sync.get_room(room_id).await.context("`get_room`")?; assert_eq!(room.name(), Some(room_name.clone())); Ok(()) @@ -213,6 +222,7 @@ async fn timeline( ) -> Result<(Vector>, impl Stream>>)> { Ok(sliding_sync .get_room(room_id) + .await .unwrap() .timeline() .await diff --git a/crates/matrix-sdk/Cargo.toml b/crates/matrix-sdk/Cargo.toml index 8d71e3e2a..56f52f286 100644 --- a/crates/matrix-sdk/Cargo.toml +++ b/crates/matrix-sdk/Cargo.toml @@ -47,7 +47,7 @@ appservice = ["ruma/appservice-api-s"] image-proc = ["dep:image"] image-rayon = ["image-proc", "image?/jpeg_rayon"] -experimental-sliding-sync = ["matrix-sdk-base/experimental-sliding-sync", "reqwest/gzip"] +experimental-sliding-sync = ["matrix-sdk-base/experimental-sliding-sync", "reqwest/gzip", "dep:eyeball-im-util"] docsrs = [ "e2e-encryption", @@ -68,6 +68,7 @@ dashmap = { workspace = true } event-listener = "2.5.2" eyeball = { workspace = true } eyeball-im = { workspace = true } +eyeball-im-util = { workspace = true, optional = true } eyre = { version = "0.6.8", optional = true } futures-core = { workspace = true } futures-util = { workspace = true } diff --git a/crates/matrix-sdk/src/sliding_sync/README.md b/crates/matrix-sdk/src/sliding_sync/README.md index e5d5f2837..cf5b11af3 100644 --- a/crates/matrix-sdk/src/sliding_sync/README.md +++ b/crates/matrix-sdk/src/sliding_sync/README.md @@ -411,6 +411,7 @@ use ruma::{assign, api::client::sync::sync_events::v4, events::StateEventType}; use tracing::{warn, error, info, debug}; use futures_util::{pin_mut, StreamExt}; use url::Url; +use std::future::ready; # async { # let homeserver = Url::parse("http://example.com")?; # let client = Client::new(homeserver).await?; @@ -447,9 +448,9 @@ let sliding_sync = sliding_sync_builder // subscribe to the list APIs for updates -let (list_state_stream, list_count_stream, list_stream) = sliding_sync.on_list(&active_list_name, |list| { - (list.state_stream(), list.maximum_number_of_rooms_stream(), list.room_list_stream()) -}).unwrap(); +let (list_state_stream, list_count_stream, (_, list_stream)) = sliding_sync.on_list(&active_list_name, |list| { + ready((list.state_stream(), list.maximum_number_of_rooms_stream(), list.room_list_stream())) +}).await.unwrap(); tokio::spawn(async move { pin_mut!(list_state_stream); diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 90742c4ba..735561e53 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -7,7 +7,7 @@ use ruma::{ }, OwnedRoomId, }; -use tokio::sync::broadcast::channel; +use tokio::sync::{broadcast::channel, RwLock as AsyncRwLock}; use url::Url; use super::{ @@ -249,8 +249,8 @@ impl SlidingSyncBuilder { .await?; } - let rooms = StdRwLock::new(self.rooms); - let lists = StdRwLock::new(lists); + let rooms = AsyncRwLock::new(self.rooms); + let lists = AsyncRwLock::new(lists); Ok(SlidingSync::new(SlidingSyncInner { _id: Some(self.id), diff --git a/crates/matrix-sdk/src/sliding_sync/cache.rs b/crates/matrix-sdk/src/sliding_sync/cache.rs index 2cb42ab68..59564bb4c 100644 --- a/crates/matrix-sdk/src/sliding_sync/cache.rs +++ b/crates/matrix-sdk/src/sliding_sync/cache.rs @@ -76,13 +76,13 @@ pub(super) async fn store_sliding_sync_state(sliding_sync: &SlidingSync) -> Resu // Write every `SlidingSyncList` that's configured for caching into the store. let frozen_lists = { - let rooms_lock = sliding_sync.inner.rooms.read().unwrap(); + let rooms_lock = sliding_sync.inner.rooms.read().await; sliding_sync .inner .lists .read() - .unwrap() + .await .iter() .filter_map(|(list_name, list)| { matches!(list.cache_policy(), SlidingSyncListCachePolicy::Enabled).then(|| { @@ -231,157 +231,152 @@ mod tests { } #[allow(clippy::await_holding_lock)] - #[test] - fn test_sliding_sync_can_be_stored_and_restored() -> Result<()> { - block_on(async { - let client = logged_in_client(Some("https://foo.bar".to_owned())).await; + #[tokio::test] + async fn test_sliding_sync_can_be_stored_and_restored() -> Result<()> { + let client = logged_in_client(Some("https://foo.bar".to_owned())).await; - let store = client.store(); + let store = client.store(); - // Store entries don't exist. - assert!(store - .get_custom_value(format_storage_key_for_sliding_sync("hello").as_bytes()) + // Store entries don't exist. + assert!(store + .get_custom_value(format_storage_key_for_sliding_sync("hello").as_bytes()) + .await? + .is_none()); + + assert!(store + .get_custom_value( + format_storage_key_for_sliding_sync_list("hello", "list_foo").as_bytes() + ) + .await? + .is_none()); + + assert!(store + .get_custom_value( + format_storage_key_for_sliding_sync_list("hello", "list_bar").as_bytes() + ) + .await? + .is_none()); + + // Create a new `SlidingSync` instance, and store it. + let storage_key = { + let sync_id = "test-sync-id"; + let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap()); + let sliding_sync = client + .sliding_sync(sync_id)? + .enable_caching()? + .add_cached_list(SlidingSyncList::builder("list_foo")) .await? - .is_none()); + .add_list(SlidingSyncList::builder("list_bar")) + .build() + .await?; - assert!(store - .get_custom_value( - format_storage_key_for_sliding_sync_list("hello", "list_foo").as_bytes() - ) + // Modify both lists, so we can check expected caching behavior later. + { + let lists = sliding_sync.inner.lists.write().await; + + let list_foo = lists.get("list_foo").unwrap(); + list_foo.set_maximum_number_of_rooms(Some(42)); + + let list_bar = lists.get("list_bar").unwrap(); + list_bar.set_maximum_number_of_rooms(Some(1337)); + } + + assert!(sliding_sync.cache_to_storage().await.is_ok()); + storage_key + }; + + // Store entries now exist for the sliding sync object and list_foo. + assert!(store + .get_custom_value(format_storage_key_for_sliding_sync(&storage_key).as_bytes()) + .await? + .is_some()); + + assert!(store + .get_custom_value( + format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes() + ) + .await? + .is_some()); + + // But not for list_bar. + assert!(store + .get_custom_value( + format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes() + ) + .await? + .is_none()); + + // Create a new `SlidingSync`, and it should be read from the cache. + let storage_key = { + let sync_id = "test-sync-id"; + let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap()); + let max_number_of_room_stream = Arc::new(RwLock::new(None)); + let cloned_stream = max_number_of_room_stream.clone(); + let sliding_sync = client + .sliding_sync(sync_id)? + .enable_caching()? + .add_cached_list(SlidingSyncList::builder("list_foo").once_built(move |list| { + // In the `once_built()` handler, nothing has been read from the cache yet. + assert_eq!(list.maximum_number_of_rooms(), None); + + let mut stream = cloned_stream.write().unwrap(); + *stream = Some(list.maximum_number_of_rooms_stream()); + list + })) .await? - .is_none()); + .add_list(SlidingSyncList::builder("list_bar")) + .build() + .await?; - assert!(store - .get_custom_value( - format_storage_key_for_sliding_sync_list("hello", "list_bar").as_bytes() - ) - .await? - .is_none()); + // Check the list' state. + { + let lists = sliding_sync.inner.lists.write().await; - // Create a new `SlidingSync` instance, and store it. - let storage_key = { - let sync_id = "test-sync-id"; - let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap()); - let sliding_sync = client - .sliding_sync(sync_id)? - .enable_caching()? - .add_cached_list(SlidingSyncList::builder("list_foo")) - .await? - .add_list(SlidingSyncList::builder("list_bar")) - .build() - .await?; + // This one was cached. + let list_foo = lists.get("list_foo").unwrap(); + assert_eq!(list_foo.maximum_number_of_rooms(), Some(42)); - // Modify both lists, so we can check expected caching behavior later. - { - let lists = sliding_sync.inner.lists.write().unwrap(); + // This one wasn't. + let list_bar = lists.get("list_bar").unwrap(); + assert_eq!(list_bar.maximum_number_of_rooms(), None); + } - let list_foo = lists.get("list_foo").unwrap(); - list_foo.set_maximum_number_of_rooms(Some(42)); + // The maximum number of rooms reloaded from the cache should have been + // published. + { + let mut stream = + max_number_of_room_stream.write().unwrap().take().expect("stream must be set"); + let initial_max_number_of_rooms = + stream.next().await.expect("stream must have emitted something"); + assert_eq!(initial_max_number_of_rooms, Some(42)); + } - let list_bar = lists.get("list_bar").unwrap(); - list_bar.set_maximum_number_of_rooms(Some(1337)); - } + // Clean the cache. + let lists = sliding_sync.inner.lists.read().await; + clean_storage(&client, &storage_key, &lists).await; + storage_key + }; - assert!(sliding_sync.cache_to_storage().await.is_ok()); - storage_key - }; + // Store entries don't exist. + assert!(store + .get_custom_value(format_storage_key_for_sliding_sync(&storage_key).as_bytes()) + .await? + .is_none()); - // Store entries now exist for the sliding sync object and list_foo. - assert!(store - .get_custom_value(format_storage_key_for_sliding_sync(&storage_key).as_bytes()) - .await? - .is_some()); + assert!(store + .get_custom_value( + format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes() + ) + .await? + .is_none()); - assert!(store - .get_custom_value( - format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes() - ) - .await? - .is_some()); + assert!(store + .get_custom_value( + format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes() + ) + .await? + .is_none()); - // But not for list_bar. - assert!(store - .get_custom_value( - format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes() - ) - .await? - .is_none()); - - // Create a new `SlidingSync`, and it should be read from the cache. - let storage_key = { - let sync_id = "test-sync-id"; - let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap()); - let max_number_of_room_stream = Arc::new(RwLock::new(None)); - let cloned_stream = max_number_of_room_stream.clone(); - let sliding_sync = client - .sliding_sync(sync_id)? - .enable_caching()? - .add_cached_list(SlidingSyncList::builder("list_foo").once_built(move |list| { - // In the `once_built()` handler, nothing has been read from the cache yet. - assert_eq!(list.maximum_number_of_rooms(), None); - - let mut stream = cloned_stream.write().unwrap(); - *stream = Some(list.maximum_number_of_rooms_stream()); - list - })) - .await? - .add_list(SlidingSyncList::builder("list_bar")) - .build() - .await?; - - // Check the list' state. - { - let lists = sliding_sync.inner.lists.write().unwrap(); - - // This one was cached. - let list_foo = lists.get("list_foo").unwrap(); - assert_eq!(list_foo.maximum_number_of_rooms(), Some(42)); - - // This one wasn't. - let list_bar = lists.get("list_bar").unwrap(); - assert_eq!(list_bar.maximum_number_of_rooms(), None); - } - - // The maximum number of rooms reloaded from the cache should have been - // published. - { - let mut stream = max_number_of_room_stream - .write() - .unwrap() - .take() - .expect("stream must be set"); - let initial_max_number_of_rooms = - stream.next().await.expect("stream must have emitted something"); - assert_eq!(initial_max_number_of_rooms, Some(42)); - } - - // Clean the cache. - clean_storage(&client, &storage_key, &sliding_sync.inner.lists.read().unwrap()) - .await; - storage_key - }; - - // Store entries don't exist. - assert!(store - .get_custom_value(format_storage_key_for_sliding_sync(&storage_key).as_bytes()) - .await? - .is_none()); - - assert!(store - .get_custom_value( - format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes() - ) - .await? - .is_none()); - - assert!(store - .get_custom_value( - format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes() - ) - .await? - .is_none()); - - Ok(()) - }) + Ok(()) } } diff --git a/crates/matrix-sdk/src/sliding_sync/list/builder.rs b/crates/matrix-sdk/src/sliding_sync/list/builder.rs index 1dbdfb7e1..488c8c34d 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/builder.rs @@ -194,6 +194,9 @@ impl SlidingSyncListBuilder { ) -> SlidingSyncList { let list = SlidingSyncList { inner: Arc::new(SlidingSyncListInner { + #[cfg(any(test, feature = "testing"))] + sync_mode: StdRwLock::new(self.sync_mode.clone()), + // From the builder sort: self.sort, required_state: self.required_state, diff --git a/crates/matrix-sdk/src/sliding_sync/list/mod.rs b/crates/matrix-sdk/src/sliding_sync/list/mod.rs index 8aad9b4f7..1643a619f 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/mod.rs @@ -14,8 +14,10 @@ use std::{ pub use builder::*; use eyeball::unique::Observable; use eyeball_im::{ObservableVector, VectorDiff}; +use eyeball_im_util::{FilteredVectorSubscriber, VectorExt}; pub(super) use frozen::FrozenSlidingSyncList; use futures_core::Stream; +use imbl::Vector; pub(super) use request_generator::*; pub use room_list_entry::RoomListEntry; use ruma::{ @@ -42,9 +44,15 @@ pub(crate) enum SlidingSyncListCachePolicy { } /// The type used to express natural bounds (including but not limited to: -/// ranges, timeline limit) in the sliding sync SDK. +/// ranges, timeline limit) in the Sliding Sync. pub type Bound = u32; +/// One range of rooms in a response from Sliding Sync. +pub type Range = RangeInclusive; + +/// Many ranges of rooms. +pub type Ranges = Vec; + /// Holding a specific filtered list within the concept of sliding sync. /// /// It is OK to clone this type as much as you need: cloning it is cheap. @@ -53,6 +61,8 @@ pub struct SlidingSyncList { inner: Arc, } +type BoxedRoomListEntryFilter = Box bool + Sync + Send>; + impl SlidingSyncList { /// Create a new [`SlidingSyncListBuilder`] with the given name. pub fn builder(name: impl Into) -> SlidingSyncListBuilder { @@ -75,7 +85,10 @@ impl SlidingSyncList { /// means that the state is not reset **purposely**. The ranges and the /// state will be updated when the next request will be sent and a /// response will be received. The maximum number of rooms won't change. - pub fn set_sync_mode(&self, sync_mode: impl Into) { + pub fn set_sync_mode(&self, sync_mode: M) + where + M: Into, + { self.inner.set_sync_mode(sync_mode.into()); // When the sync mode is changed, the sync loop must skip over any work in its @@ -126,8 +139,35 @@ impl SlidingSyncList { /// /// There's no guarantee of ordering between items emitted by this stream /// and those emitted by other streams exposed on this structure. - pub fn room_list_stream(&self) -> impl Stream> { - ObservableVector::subscribe(&self.inner.room_list.read().unwrap()) + /// + /// The first part of the returned tuple is the actual room list entries, + /// and the second part is the `Stream` to receive updates on the room list + /// entries. + pub fn room_list_stream( + &self, + ) -> (Vector, impl Stream>) { + let read_lock = self.inner.room_list.read().unwrap(); + let previous_values = (*read_lock).clone(); + let subscriber = ObservableVector::subscribe(&read_lock); + + (previous_values, subscriber) + } + + /// Get a stream of room list, but filtered. + /// + /// It's similar to [`Self::room_list_stream`] but the room list is filtered + /// by `filter`. + pub fn room_list_filtered_stream( + &self, + filter: F, + ) -> (Vector, FilteredVectorSubscriber) + where + F: Fn(&RoomListEntry) -> bool + Sync + Send + 'static, + { + ObservableVector::subscribe_filtered( + &self.inner.room_list.read().unwrap(), + Box::new(filter), + ) } /// Get the maximum number of rooms. See [`Self::maximum_number_of_rooms`] @@ -204,14 +244,21 @@ impl SlidingSyncList { } } -#[cfg(test)] +#[cfg(any(test, feature = "testing"))] +#[allow(dead_code)] impl SlidingSyncList { + /// Set the maximum number of rooms. pub(super) fn set_maximum_number_of_rooms(&self, maximum_number_of_rooms: Option) { Observable::set( &mut self.inner.maximum_number_of_rooms.write().unwrap(), maximum_number_of_rooms, ); } + + /// Get the sync-mode. + pub fn sync_mode(&self) -> SlidingSyncMode { + self.inner.sync_mode.read().unwrap().clone() + } } #[derive(Debug)] @@ -260,6 +307,9 @@ pub(super) struct SlidingSyncListInner { /// The `bump_event_types` field. See /// [`SlidingSyncListBuilder::bump_event_types`] to learn more. bump_event_types: Vec, + + #[cfg(any(test, feature = "testing"))] + sync_mode: StdRwLock, } impl SlidingSyncListInner { @@ -270,6 +320,11 @@ impl SlidingSyncListInner { /// The [`Self::state`] is immediately updated to reflect the new state. The /// [`Self::maximum_number_of_rooms`] won't change. pub fn set_sync_mode(&self, sync_mode: SlidingSyncMode) { + #[cfg(any(test, feature = "testing"))] + { + *self.sync_mode.write().unwrap() = sync_mode.clone(); + } + { let mut request_generator = self.request_generator.write().unwrap(); *request_generator = SlidingSyncListRequestGenerator::new(sync_mode); @@ -729,10 +784,16 @@ impl SlidingSyncSelectiveModeBuilder { } /// Select a range to fetch. - pub fn add_range(mut self, range: RangeInclusive) -> Self { + pub fn add_range(mut self, range: Range) -> Self { self.ranges.push(range); self } + + /// Select many ranges to fetch. + pub fn add_ranges(mut self, ranges: Ranges) -> Self { + self.ranges.extend(ranges); + self + } } impl From for SlidingSyncMode { @@ -847,6 +908,7 @@ mod tests { use futures_util::StreamExt; use imbl::vector; + use matrix_sdk_test::async_test; use ruma::{api::client::sync::sync_events::v4::SlidingOp, room_id, uint}; use serde_json::json; use tokio::{ @@ -869,8 +931,8 @@ mod tests { }; } - #[test] - fn test_sliding_sync_list_selective_mode() { + #[async_test] + async fn test_sliding_sync_list_selective_mode() { let (sender, mut receiver) = channel(1); // Set range on `Selective`. @@ -889,7 +951,7 @@ mod tests { // There shouldn't be any internal request to restart the sync loop yet. assert!(matches!(receiver.try_recv(), Err(TryRecvError::Empty))); - list.set_sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(4..=5)); + list.set_sync_mode(SlidingSyncMode::new_selective().add_range(4..=5)); { let mut generator = list.inner.request_generator.write().unwrap(); @@ -912,7 +974,7 @@ mod tests { let (sender, _receiver) = channel(1); let list = SlidingSyncList::builder("foo") - .sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(0..=1)) + .sync_mode(SlidingSyncMode::new_selective().add_range(0..=1)) .timeline_limit(7) .build(sender); @@ -930,7 +992,7 @@ mod tests { let (sender, _receiver) = channel(1); let mut list = SlidingSyncList::builder("foo") - .sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(0..=1)) + .sync_mode(SlidingSyncMode::new_selective().add_range(0..=1)) .build(sender); let room0 = room_id!("!room0:bar.org"); @@ -1174,7 +1236,7 @@ mod tests { let (sender, _receiver) = channel(1); let mut list = SlidingSyncList::builder("testing") - .sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(0..=10).add_range(42..=153)) + .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10).add_range(42..=153)) .build(sender); assert_ranges! { @@ -1201,12 +1263,12 @@ mod tests { }; } - #[test] - fn test_generator_selective_with_modifying_ranges_on_the_fly() { + #[tokio::test] + async fn test_generator_selective_with_modifying_ranges_on_the_fly() { let (sender, _receiver) = channel(4); let mut list = SlidingSyncList::builder("testing") - .sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(0..=10).add_range(42..=153)) + .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10).add_range(42..=153)) .build(sender); assert_ranges! { @@ -1232,7 +1294,7 @@ mod tests { } }; - list.set_sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(3..=7)); + list.set_sync_mode(SlidingSyncMode::new_selective().add_range(3..=7)); assert_ranges! { list = list, @@ -1245,7 +1307,7 @@ mod tests { }, }; - list.set_sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(42..=77)); + list.set_sync_mode(SlidingSyncMode::new_selective().add_range(42..=77)); assert_ranges! { list = list, @@ -1258,7 +1320,7 @@ mod tests { }, }; - list.set_sync_mode(SlidingSyncSelectiveModeBuilder::new()); + list.set_sync_mode(SlidingSyncMode::new_selective()); assert_ranges! { list = list, @@ -1272,12 +1334,12 @@ mod tests { }; } - #[test] - fn test_generator_changing_sync_mode_to_various_modes() { + #[async_test] + async fn test_generator_changing_sync_mode_to_various_modes() { let (sender, _receiver) = channel(4); let mut list = SlidingSyncList::builder("testing") - .sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(0..=10).add_range(42..=153)) + .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10).add_range(42..=153)) .build(sender); assert_ranges! { @@ -1376,14 +1438,14 @@ mod tests { }; // Changing from `Paging` to `Selective`. - list.set_sync_mode(SlidingSyncSelectiveModeBuilder::new()); + list.set_sync_mode(SlidingSyncMode::new_selective()); assert_eq!(list.state(), SlidingSyncState::PartiallyLoaded); // we had some partial state, but we can't be sure it's fully loaded until the // next request // We need to update the ranges, of course, as they are not managed // automatically anymore. - list.set_sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(0..=100)); + list.set_sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)); assert_ranges! { list = list, @@ -1415,7 +1477,7 @@ mod tests { let (sender, _receiver) = channel(1); let mut list = SlidingSyncList::builder("foo") - .sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(0..=3)) + .sync_mode(SlidingSyncMode::new_selective().add_range(0..=3)) .build(sender); assert_eq!(**list.inner.maximum_number_of_rooms.read().unwrap(), None); @@ -1463,7 +1525,7 @@ mod tests { ); } - let mut room_list_stream = list.room_list_stream(); + let (_, mut room_list_stream) = list.room_list_stream(); let (room_list_stream_sender, mut room_list_stream_receiver) = unbounded_channel(); spawn(async move { @@ -1583,14 +1645,13 @@ mod tests { entries!( @_ [ $( $( $rest )* )? ] [ $( $accumulator )* RoomListEntry::Invalidated(room_id!( $room_id ).to_owned()), ] ) }; - ( @_ [] [ $( $accumulator:tt )+ ] ) => { + ( @_ [] [ $( $accumulator:tt )* ] ) => { vector![ $( $accumulator )* ] }; ( $( $all:tt )* ) => { entries!( @_ [ $( $all )* ] [] ) }; - } macro_rules! assert_sync_operations { diff --git a/crates/matrix-sdk/src/sliding_sync/list/request_generator.rs b/crates/matrix-sdk/src/sliding_sync/list/request_generator.rs index 38e4ae89d..234a499a8 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/request_generator.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/request_generator.rs @@ -29,14 +29,11 @@ //! user-specified limit representing the maximum number of rooms the user //! actually wants to load. -use std::{cmp::min, ops::RangeInclusive}; +use std::cmp::min; -use super::{Bound, SlidingSyncMode}; +use super::{Range, Ranges, SlidingSyncMode}; use crate::{sliding_sync::Error, SlidingSyncState}; -/// Range of rooms in a response from sliding sync. -pub type Ranges = Vec>; - /// The kind of request generator. #[derive(Debug, PartialEq)] pub(super) enum SlidingSyncListRequestGeneratorKind { @@ -80,7 +77,7 @@ pub(in super::super) struct SlidingSyncListRequestGenerator { /// The current ranges used by this request generator. /// /// Note there's only one range in the `Growing` and `Paging` mode. - ranges: Vec>, + ranges: Ranges, /// The kind of request generator. pub(super) kind: SlidingSyncListRequestGeneratorKind, } @@ -122,7 +119,7 @@ impl SlidingSyncListRequestGenerator { /// For generators in the selective mode, this is the initial set of ranges. /// For growing and paginated generators, this is the range committed in the /// latest response received from the server. - pub(super) fn requested_ranges(&self) -> &[RangeInclusive] { + pub(super) fn requested_ranges(&self) -> &[Range] { &self.ranges } @@ -291,7 +288,7 @@ fn create_range( desired_size: u32, maximum_number_of_rooms_to_fetch: Option, maximum_number_of_rooms: Option, -) -> Result, Error> { +) -> Result { // Calculate the range. // The `start` bound is given. Let's calculate the `end` bound. @@ -322,7 +319,7 @@ fn create_range( return Err(Error::InvalidRange { start, end }); } - Ok(RangeInclusive::new(start, end)) + Ok(Range::new(start, end)) } #[cfg(test)] diff --git a/crates/matrix-sdk/src/sliding_sync/list/room_list_entry.rs b/crates/matrix-sdk/src/sliding_sync/list/room_list_entry.rs index c84e5c579..1600eff41 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/room_list_entry.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/room_list_entry.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; /// Represent a room entry in the [`SlidingSyncList`][super::SlidingSyncList]. #[derive(Clone, Debug, Default, Serialize, Deserialize)] -#[cfg_attr(test, derive(PartialEq))] +#[cfg_attr(any(test, feature = "testing"), derive(PartialEq))] pub enum RoomListEntry { /// This entry isn't known at this point and thus considered `Empty`. #[default] diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index a9370e101..45da544a8 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -25,6 +25,7 @@ mod room; use std::{ collections::{BTreeMap, BTreeSet}, fmt::Debug, + future::Future, sync::{ atomic::{AtomicU8, Ordering}, Arc, RwLock as StdRwLock, @@ -49,7 +50,7 @@ use ruma::{ use serde::{Deserialize, Serialize}; use tokio::{ select, spawn, - sync::{broadcast::Sender, Mutex as AsyncMutex}, + sync::{broadcast::Sender, Mutex as AsyncMutex, RwLock as AsyncRwLock}, }; use tracing::{debug, error, instrument, warn, Instrument, Span}; use url::Url; @@ -97,10 +98,10 @@ pub(super) struct SlidingSyncInner { position: StdRwLock, /// The lists of this Sliding Sync instance. - lists: StdRwLock>, + lists: AsyncRwLock>, /// The rooms details - rooms: StdRwLock>, + rooms: AsyncRwLock>, /// Room subscriptions, i.e. rooms that may be out-of-scope of all lists but /// one wants to receive updates. @@ -162,13 +163,13 @@ impl SlidingSync { } /// Lookup a specific room - pub fn get_room(&self, room_id: &RoomId) -> Option { - self.inner.rooms.read().unwrap().get(room_id).cloned() + pub async fn get_room(&self, room_id: &RoomId) -> Option { + self.inner.rooms.read().await.get(room_id).cloned() } /// Check the number of rooms. pub fn get_number_of_rooms(&self) -> usize { - self.inner.rooms.read().unwrap().len() + self.inner.rooms.blocking_read().len() } #[instrument(skip(self))] @@ -177,13 +178,21 @@ impl SlidingSync { } /// Find a list by its name, and do something on it if it exists. - pub fn on_list(&self, list_name: &str, f: F) -> Option + pub async fn on_list( + &self, + list_name: &str, + function: Function, + ) -> Option where - F: FnOnce(&SlidingSyncList) -> R, + Function: FnOnce(&SlidingSyncList) -> FunctionOutput, + FunctionOutput: Future, { - let lists = self.inner.lists.read().unwrap(); + let lists = self.inner.lists.read().await; - lists.get(list_name).map(f) + match lists.get(list_name) { + Some(list) => Some(function(list).await), + None => None, + } } /// Add the list to the list of lists. @@ -197,7 +206,7 @@ impl SlidingSync { ) -> Result> { let list = list_builder.build(self.inner.internal_channel.clone()); - let old_list = self.inner.lists.write().unwrap().insert(list.name().to_owned(), list); + let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list); self.inner.internal_channel_send_if_possible( SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration, @@ -224,7 +233,7 @@ impl SlidingSync { list_builder.set_cached_and_reload(&self.inner.client, storage_key).await?; if !reloaded_rooms.is_empty() { - let mut rooms = self.inner.rooms.write().unwrap(); + let mut rooms = self.inner.rooms.write().await; for (key, frozen) in reloaded_rooms { rooms.entry(key).or_insert_with(|| { @@ -237,18 +246,18 @@ impl SlidingSync { } /// Lookup a set of rooms - pub fn get_rooms>( + pub async fn get_rooms>( &self, room_ids: I, ) -> Vec> { - let rooms = self.inner.rooms.read().unwrap(); + let rooms = self.inner.rooms.read().await; room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect() } /// Get all rooms. - pub fn get_all_rooms(&self) -> Vec { - self.inner.rooms.read().unwrap().values().cloned().collect() + pub async fn get_all_rooms(&self) -> Vec { + self.inner.rooms.read().await.values().cloned().collect() } fn prepare_extension_config(&self, pos: Option<&str>) -> ExtensionsConfig { @@ -271,7 +280,7 @@ impl SlidingSync { } /// Handle the HTTP response. - #[instrument(skip_all, fields(lists = self.inner.lists.read().unwrap().len()))] + #[instrument(skip_all)] async fn handle_response( &self, sliding_sync_response: v4::Response, @@ -302,7 +311,7 @@ impl SlidingSync { let update_summary = { // Update the rooms. let updated_rooms = { - let mut rooms_map = self.inner.rooms.write().unwrap(); + let mut rooms_map = self.inner.rooms.write().await; let mut updated_rooms = Vec::with_capacity(sliding_sync_response.rooms.len()); @@ -346,7 +355,7 @@ impl SlidingSync { // Update the lists. let updated_lists = { let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len()); - let mut lists = self.inner.lists.write().unwrap(); + let mut lists = self.inner.lists.write().await; for (name, updates) in sliding_sync_response.lists { let Some(list) = lists.get_mut(&name) else { @@ -378,17 +387,13 @@ impl SlidingSync { } #[instrument(skip_all, fields(pos))] - async fn sync_once(&self) -> Result> { + async fn sync_once(&self) -> Result { let (request, request_config, requested_room_unsubscriptions) = { // Collect requests for lists. let mut requests_lists = BTreeMap::new(); { - let mut lists = self.inner.lists.write().unwrap(); - - if lists.is_empty() { - return Ok(None); - } + let mut lists = self.inner.lists.write().await; for (name, list) in lists.iter_mut() { requests_lists.insert(name.clone(), list.next_request()?); @@ -445,7 +450,7 @@ impl SlidingSync { // coming from the `OlmMachine::outgoing_requests()` method. #[cfg(feature = "e2e-encryption")] let response = { - debug!("Sliding Sync is sending the request along with outgoing E2EE requests"); + debug!("Sliding Sync is sending the request along with outgoing E2EE requests"); let (e2ee_uploads, response) = futures_util::future::join(self.inner.client.send_outgoing_requests(), request) @@ -510,7 +515,7 @@ impl SlidingSync { debug!("Sliding Sync response has been fully handled"); - Ok(Some(updates)) + Ok(updates) }; spawn(future.instrument(Span::current())).await.unwrap() @@ -520,6 +525,10 @@ impl SlidingSync { /// /// This method returns a `Stream`, which will send requests and will handle /// responses automatically. Lists and rooms are updated automatically. + /// + /// This function returns `Ok(…)` if everything went well, otherwise it will + /// return `Err(…)`. An `Err` will _always_ lead to the `Stream` + /// termination. #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro #[instrument(name = "sync_stream", skip_all)] pub fn sync(&self) -> impl Stream> + '_ { @@ -557,16 +566,12 @@ impl SlidingSync { update_summary = self.sync_once().instrument(sync_span.clone()) => { match update_summary { - Ok(Some(updates)) => { + Ok(updates) => { self.inner.reset_counter.store(0, Ordering::SeqCst); yield Ok(updates); } - Ok(None) => { - break; - } - Err(error) => { if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) { // The session has expired. @@ -582,6 +587,7 @@ impl SlidingSync { // The session has expired too many times, let's raise an error! yield Err(error); + // Terminates the loop, and terminates the stream. break; } @@ -597,11 +603,13 @@ impl SlidingSync { debug!(?self.inner.extensions, ?self.inner.position, "Sliding Sync has been reset"); }); + + continue; } yield Err(error); - continue; + break; } } } @@ -871,7 +879,7 @@ mod tests { ) .await?; - let lists = sliding_sync.inner.lists.read().unwrap(); + let lists = sliding_sync.inner.lists.read().await; assert!(lists.contains_key("foo")); assert!(lists.contains_key("bar")); @@ -892,26 +900,20 @@ mod tests { pin_mut!(stream); // The sync-loop is actually running. - for _ in 0..3 { - assert!(stream.next().await.is_some()); - } + assert!(stream.next().await.is_some()); // Stop the sync-loop. sliding_sync.stop_sync()?; // The sync-loop is actually stopped. - for _ in 0..3 { - assert!(stream.next().await.is_none()); - } + assert!(stream.next().await.is_none()); // Start a new sync-loop. let stream = sliding_sync.sync(); pin_mut!(stream); // The sync-loop is actually running. - for _ in 0..3 { - assert!(stream.next().await.is_some()); - } + assert!(stream.next().await.is_some()); Ok(()) } diff --git a/xtask/src/ci.rs b/xtask/src/ci.rs index 2551cfba2..15ba3b4b4 100644 --- a/xtask/src/ci.rs +++ b/xtask/src/ci.rs @@ -169,12 +169,13 @@ fn check_typos() -> Result<()> { } fn check_clippy() -> Result<()> { - cmd!("rustup run {NIGHTLY} cargo clippy --all-targets -- -D warnings").run()?; + cmd!("rustup run {NIGHTLY} cargo clippy --all-targets --features testing -- -D warnings") + .run()?; cmd!( "rustup run {NIGHTLY} cargo clippy --workspace --all-targets --exclude matrix-sdk-crypto --exclude xtask --no-default-features - --features native-tls,experimental-sliding-sync,sso-login + --features native-tls,experimental-sliding-sync,sso-login,testing -- -D warnings" ) .run()?;