feat(ui): RoomList, step 1: the Foundation

feat(ui): `RoomList`, step 1: the Foundation
This commit is contained in:
Ivan Enderlin
2023-06-05 20:50:25 +02:00
committed by GitHub
22 changed files with 2438 additions and 256 deletions

View File

@@ -185,7 +185,7 @@ jobs:
- name: Test
run: |
cargo nextest run --workspace \
--exclude matrix-sdk-integration-testing --exclude sliding-sync-integration-test
--exclude matrix-sdk-integration-testing --exclude sliding-sync-integration-test --features testing
- name: Test documentation
run: |

View File

@@ -52,7 +52,7 @@ jobs:
- name: Run tarpaulin
run: |
cargo tarpaulin --out Xml -e sliding-sync-integration-test
cargo tarpaulin --out Xml -e sliding-sync-integration-test --features testing
- name: Upload to codecov.io
uses: codecov/codecov-action@v3

18
Cargo.lock generated
View File

@@ -1519,6 +1519,18 @@ dependencies = [
"tokio-stream",
]
[[package]]
name = "eyeball-im-util"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "988b102aa389565187ccb138e51339c72258deb8af8e459180a582af40ca323b"
dependencies = [
"eyeball-im",
"futures-core",
"imbl",
"pin-project-lite",
]
[[package]]
name = "eyre"
version = "0.6.8"
@@ -2532,6 +2544,7 @@ dependencies = [
"event-listener",
"eyeball",
"eyeball-im",
"eyeball-im-util",
"eyre",
"futures-core",
"futures-executor",
@@ -2924,16 +2937,21 @@ name = "matrix-sdk-ui"
version = "0.6.0"
dependencies = [
"anyhow",
"assert-json-diff",
"assert_matches",
"async-stream",
"async-trait",
"chrono",
"ctor",
"eyeball",
"eyeball-im",
"eyeball-im-util",
"futures-core",
"futures-util",
"imbl",
"indexmap",
"matrix-sdk",
"matrix-sdk-base",
"matrix-sdk-test",
"mime",
"once_cell",

View File

@@ -30,6 +30,7 @@ ctor = "0.2.0"
dashmap = "5.2.0"
eyeball = "0.7.0"
eyeball-im = "0.2.0"
eyeball-im-util = "0.1.0"
futures-core = "0.3.28"
futures-executor = "0.3.21"
futures-util = { version = "0.3.26", default-features = false, features = ["alloc"] }

View File

@@ -592,7 +592,7 @@ impl SlidingSyncList {
&self,
observer: Box<dyn SlidingSyncListRoomListObserver>,
) -> Arc<TaskHandle> {
let mut room_list_stream = self.inner.room_list_stream();
let (_, mut room_list_stream) = self.inner.room_list_stream();
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
loop {
@@ -695,12 +695,16 @@ impl SlidingSync {
}
pub fn get_room(&self, room_id: String) -> Result<Option<Arc<SlidingSyncRoom>>, ClientError> {
Ok(self.inner.get_room(<&RoomId>::try_from(room_id.as_str())?).map(|inner| {
Arc::new(SlidingSyncRoom {
inner,
sliding_sync: self.inner.clone(),
client: self.client.clone(),
timeline: Default::default(),
let room_id = <&RoomId>::try_from(room_id.as_str())?;
Ok(RUNTIME.block_on(async move {
self.inner.get_room(room_id).await.map(|inner| {
Arc::new(SlidingSyncRoom {
inner,
sliding_sync: self.inner.clone(),
client: self.client.clone(),
timeline: Default::default(),
})
})
}))
}
@@ -713,21 +717,24 @@ impl SlidingSync {
.into_iter()
.map(OwnedRoomId::try_from)
.collect::<Result<Vec<OwnedRoomId>, IdParseError>>()?;
Ok(self
.inner
.get_rooms(actual_ids.into_iter())
.into_iter()
.map(|o| {
o.map(|inner| {
Arc::new(SlidingSyncRoom {
inner,
sliding_sync: self.inner.clone(),
client: self.client.clone(),
timeline: Default::default(),
Ok(RUNTIME.block_on(async move {
self.inner
.get_rooms(actual_ids.into_iter())
.await
.into_iter()
.map(|o| {
o.map(|inner| {
Arc::new(SlidingSyncRoom {
inner,
sliding_sync: self.inner.clone(),
client: self.client.clone(),
timeline: Default::default(),
})
})
})
})
.collect())
.collect()
}))
}
pub fn add_list(&self, list_builder: Arc<SlidingSyncListBuilder>) -> Arc<TaskHandle> {

View File

@@ -4,24 +4,31 @@ version = "0.6.0"
edition = "2021"
[features]
default = ["e2e-encryption", "native-tls"]
default = ["e2e-encryption", "native-tls", "experimental-room-list"]
e2e-encryption = ["matrix-sdk/e2e-encryption"]
native-tls = ["matrix-sdk/native-tls"]
rustls-tls = ["matrix-sdk/rustls-tls"]
experimental-room-list = ["experimental-sliding-sync", "dep:async-stream", "dep:eyeball-im-util"]
experimental-sliding-sync = ["matrix-sdk/experimental-sliding-sync"]
testing = ["matrix-sdk/testing"]
[dependencies]
async-stream = { workspace = true, optional = true }
async-trait = { workspace = true }
chrono = "0.4.23"
eyeball = { workspace = true }
eyeball-im = { workspace = true }
eyeball-im-util = { workspace = true, optional = true }
futures-core = { workspace = true }
futures-util = { workspace = true }
imbl = { version = "2.0.0", features = ["serde"] }
indexmap = "1.9.1"
matrix-sdk = { version = "0.6.2", path = "../matrix-sdk", default-features = false }
matrix-sdk-base = { version = "0.6.1", path = "../matrix-sdk-base" }
mime = "0.3.16"
once_cell = { workspace = true }
pin-project-lite = "0.2.9"
@@ -34,9 +41,14 @@ tracing = { workspace = true, features = ["attributes"] }
[dev-dependencies]
anyhow = { workspace = true }
assert-json-diff = "2.0"
assert_matches = { workspace = true }
ctor = { workspace = true }
matrix-sdk-test = { version = "0.6.0", path = "../../testing/matrix-sdk-test" }
stream_assert = "0.1.0"
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
wiremock = "0.5.13"
[[test]]
name = "integration"
required-features = ["testing"]

View File

@@ -13,6 +13,11 @@
// limitations under the License.
mod events;
#[cfg(feature = "experimental-room-list")]
pub mod room_list;
pub mod timeline;
#[cfg(feature = "experimental-room-list")]
pub use self::room_list::RoomList;
pub use self::timeline::Timeline;

View File

@@ -0,0 +1,688 @@
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for that specific language governing permissions and
// limitations under the License.
//! `RoomList` API.
//!
//! The `RoomList` is a UI API dedicated to present a list of Matrix rooms to
//! the user. The syncing is handled by
//! [`SlidingSync`][matrix_sdk::SlidingSync]. The idea is to expose a simple API
//! to handle most of the client app use cases, like: Showing and updating a
//! list of rooms, filtering a list of rooms, handling particular updates of a
//! range of rooms (the ones the client app is showing to the view, i.e. the
//! rooms present in the viewport) etc.
//!
//! As such, the `RoomList` works as an opinionated state machine. The states
//! are defined by [`State`]. Actions are attached to the each state transition.
//! Apart from that, one can apply [`Input`]s on the state machine, like
//! notifying that the client app viewport of the room list has changed (if the
//! user of the client app has scrolled in the room list for example) etc.
//!
//! The API is purposely small. Sliding Sync is versatile. `RoomList` is _one_
//! specific usage of Sliding Sync.
//!
//! # Basic principle
//!
//! `RoomList` works with 2 Sliding Sync List:
//!
//! * `all_rooms` (referred by the constant [`ALL_ROOMS_LIST_NAME`]) is the main
//! list. Its goal is to load all the user' rooms. It starts with a
//! [`SlidingSyncMode::Selective`] sync-mode with a small range (i.e. a small
//! set of rooms) to load the first rooms quickly, and then updates to a
//! [`SlidingSyncMode::Growing`] sync-mode to load the remaining rooms “in the
//! background”: it will sync the existing rooms and will fetch new rooms, by
//! a certain batch size.
//! * `visible_rooms` (referred by the constant [`VISIBLE_ROOMS_LIST_NAME`]) is
//! the “reactive” list. It's goal is to react to the client app user actions.
//! If the user scrolls in the room list, the `visible_rooms` will be
//! configured to sync for the particular range of rooms the user is actually
//! seeing (the rooms in the current viewport). `visible_rooms` has a
//! different configuration than `all_rooms` as it loads more timeline events:
//! it means that the room will already have a “history”, a timeline, ready to
//! be presented when the user enters the room.
//!
//! This behavior has proven to be empirically satisfying to provide a fast and
//! fluid user experience for a Matrix client.
//!
//! [`RoomList::entries`] provides a way to get a stream of room list entry.
//! This stream can be filtered, and the filter can be changed over time.
//!
//! [`RoomList::state_stream`] provides a way to get a stream of the state
//! machine's state, which can be pretty helpful for the client app.
use std::{future::ready, sync::Arc};
use async_stream::stream;
use async_trait::async_trait;
use eyeball::shared::Observable;
use eyeball_im::VectorDiff;
use futures_util::{pin_mut, Stream, StreamExt};
use imbl::Vector;
pub use matrix_sdk::RoomListEntry;
use matrix_sdk::{
sliding_sync::Ranges, Client, Error as SlidingSyncError, SlidingSync, SlidingSyncList,
SlidingSyncMode, SlidingSyncRoom,
};
use once_cell::sync::Lazy;
use ruma::{OwnedRoomId, RoomId};
use thiserror::Error;
use crate::{timeline::EventTimelineItem, Timeline};
pub const ALL_ROOMS_LIST_NAME: &str = "all_rooms";
pub const VISIBLE_ROOMS_LIST_NAME: &str = "visible_rooms";
/// The [`RoomList`] type. See the module's documentation to learn more.
#[derive(Debug)]
pub struct RoomList {
sliding_sync: SlidingSync,
state: Observable<State>,
}
impl RoomList {
/// Create a new `RoomList`.
///
/// A [`matrix_sdk::SlidingSync`] client will be created, with a cached list
/// already pre-configured.
pub async fn new(client: Client) -> Result<Self, Error> {
let sliding_sync = client
.sliding_sync("room-list")
.map_err(Error::SlidingSync)?
.enable_caching()
.map_err(Error::SlidingSync)?
.add_cached_list(
SlidingSyncList::builder(ALL_ROOMS_LIST_NAME)
.sync_mode(SlidingSyncMode::new_selective().add_range(0..=19))
.timeline_limit(1),
)
.await
.map_err(Error::SlidingSync)?
.build()
.await
.map_err(Error::SlidingSync)?;
Ok(Self { sliding_sync, state: Observable::new(State::Init) })
}
/// Start to sync the room list.
///
/// It's the main method of this entire API. Calling `sync` allows to
/// receive updates on the room list: new rooms, rooms updates etc. Those
/// updates can be read with [`Self::entries`]. This method returns a
/// [`Stream`] where produced items only hold an empty value in case of a
/// sync success, otherwise an error.
///
/// The `RoomList`' state machine is run by this method.
///
/// Stopping the [`Stream`] (i.e. stop polling it) and calling
/// [`Self::sync`] again will resume from the previous state of the state
/// machine.
pub fn sync(&self) -> impl Stream<Item = Result<(), Error>> + '_ {
stream! {
let sync = self.sliding_sync.sync();
pin_mut!(sync);
// This is a state machine implementation.
// Things happen in this order:
//
// 1. The next state is calculated,
// 2. The actions associated to the next state are run,
// 3. The next state is stored,
// 4. A sync is done.
//
// So the sync is done after the machine _has entered_ into a new state.
loop {
{
let next_state = self.state.read().next(&self.sliding_sync).await?;
Observable::set(&self.state, next_state);
}
match sync.next().await {
Some(Ok(_update_summary)) => {
yield Ok(());
}
Some(Err(error)) => {
let next_state = State::Terminated { from: Box::new(self.state.get()) };
Observable::set(&self.state, next_state);
yield Err(Error::SlidingSync(error));
break;
}
None => {
let next_state = State::Terminated { from: Box::new(self.state.get()) };
Observable::set(&self.state, next_state);
break;
}
}
}
}
}
/// Get the current state of the state machine.
pub fn state(&self) -> State {
self.state.get()
}
/// Get a [`Stream`] of [`State`]s.
pub fn state_stream(&self) -> impl Stream<Item = State> {
Observable::subscribe(&self.state)
}
/// Get all previous room list entries, in addition to a [`Stream`] to room
/// list entry's updates.
pub async fn entries(
&self,
) -> Result<(Vector<RoomListEntry>, impl Stream<Item = VectorDiff<RoomListEntry>>), Error> {
self.sliding_sync
.on_list(ALL_ROOMS_LIST_NAME, |list| ready(list.room_list_stream()))
.await
.ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_string()))
}
/// Similar to [`Self::entries`] except that it's possible to provide a
/// filter that will filter out room list entries.
pub async fn entries_filtered<F>(
&self,
filter: F,
) -> Result<(Vector<RoomListEntry>, impl Stream<Item = VectorDiff<RoomListEntry>>), Error>
where
F: Fn(&RoomListEntry) -> bool + Send + Sync + 'static,
{
self.sliding_sync
.on_list(ALL_ROOMS_LIST_NAME, |list| ready(list.room_list_filtered_stream(filter)))
.await
.ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_string()))
}
/// Pass an [`Input`] onto the state machine.
pub async fn apply_input(&self, input: Input) -> Result<(), Error> {
use Input::*;
match input {
Viewport(ranges) => {
self.update_viewport(ranges).await?;
}
}
Ok(())
}
async fn update_viewport(&self, ranges: Ranges) -> Result<(), Error> {
self.sliding_sync
.on_list(VISIBLE_ROOMS_LIST_NAME, |list| {
list.set_sync_mode(SlidingSyncMode::new_selective().add_ranges(ranges.clone()));
ready(())
})
.await
.ok_or_else(|| Error::InputHasNotBeenApplied(Input::Viewport(ranges)))?;
Ok(())
}
/// Get a [`Room`] if it exists.
pub async fn room(&self, room_id: &RoomId) -> Result<Room, Error> {
match self.sliding_sync.get_room(room_id).await {
Some(room) => Room::new(room).await,
None => Err(Error::RoomNotFound(room_id.to_owned())),
}
}
#[cfg(any(test, feature = "testing"))]
pub fn sliding_sync(&self) -> &SlidingSync {
&self.sliding_sync
}
}
/// A room in the room list.
///
/// It's cheap to clone this type.
#[derive(Clone, Debug)]
pub struct Room {
inner: Arc<RoomInner>,
}
#[derive(Debug)]
struct RoomInner {
/// The Sliding Sync room.
sliding_sync_room: SlidingSyncRoom,
/// The underlying client room.
room: matrix_sdk::room::Room,
/// The timeline of the room.
timeline: Timeline,
/// The “sneaky” timeline of the room, i.e. this timeline doesn't track the
/// read marker nor the receipts.
sneaky_timeline: Timeline,
}
impl Room {
/// Create a new `Room`.
async fn new(sliding_sync_room: SlidingSyncRoom) -> Result<Self, Error> {
let room = sliding_sync_room
.client()
.get_room(sliding_sync_room.room_id())
.ok_or_else(|| Error::RoomNotFound(sliding_sync_room.room_id().to_owned()))?;
let timeline = Timeline::builder(&room)
.events(sliding_sync_room.prev_batch(), sliding_sync_room.timeline_queue())
.track_read_marker_and_receipts()
.build()
.await;
let sneaky_timeline = Timeline::builder(&room)
.events(sliding_sync_room.prev_batch(), sliding_sync_room.timeline_queue())
.build()
.await;
Ok(Self {
inner: Arc::new(RoomInner { sliding_sync_room, room, timeline, sneaky_timeline }),
})
}
/// Get the best possible name for the room.
///
/// If the sliding sync room has received a name from the server, then use
/// it, otherwise, let's calculate a name.
pub async fn name(&self) -> Option<String> {
Some(match self.inner.sliding_sync_room.name() {
Some(name) => name,
None => self.inner.room.display_name().await.ok()?.to_string(),
})
}
/// Get the timeline of the room.
pub fn timeline(&self) -> &Timeline {
&self.inner.timeline
}
/// Get the latest event of the timeline.
///
/// It's different from `Self::timeline().latest_event()` as it won't track
/// the read marker and receipts.
pub async fn latest_event(&self) -> Option<EventTimelineItem> {
self.inner.sneaky_timeline.latest_event().await
}
}
/// [`RoomList`]'s errors.
#[derive(Debug, Error)]
pub enum Error {
/// Error from [`matrix_sdk::SlidingSync`].
#[error("SlidingSync failed")]
SlidingSync(SlidingSyncError),
/// An operation has been requested on an unknown list.
#[error("Unknown list `{0}`")]
UnknownList(String),
/// An input was asked to be applied but it wasn't possible to apply it.
#[error("The input has been not applied")]
InputHasNotBeenApplied(Input),
/// The requested room doesn't exist.
#[error("Room `{0}` not found")]
RoomNotFound(OwnedRoomId),
}
/// The state of the [`RoomList`]' state machine.
#[derive(Clone, Debug, PartialEq)]
pub enum State {
/// That's the first initial state.
Init,
/// At this state, the first rooms start to be synced.
FirstRooms,
/// At this state, all rooms start to be synced.
AllRooms,
/// This state is the cruising speed, i.e. the “normal” state, where nothing
/// fancy happens: all rooms are syncing, and life is great.
Enjoy,
/// At this state, the sync has been stopped (because it was requested, or
/// because it has errored too many times previously).
Terminated { from: Box<State> },
}
impl State {
/// Transition to the next state, and execute the associated transition's
/// [`Actions`].
async fn next(&self, sliding_sync: &SlidingSync) -> Result<Self, Error> {
use State::*;
let (next_state, actions) = match self {
Init => (FirstRooms, Actions::none()),
FirstRooms => (AllRooms, Actions::first_rooms_are_loaded()),
AllRooms => (Enjoy, Actions::none()),
Enjoy => (Enjoy, Actions::none()),
// If the state was `Terminated` but the next state is calculated again, it means the
// sync has been restarted. In this case, let's jump back on the previous state that led
// to the termination. No action is required in this scenario.
Terminated { from: previous_state } => {
match previous_state.as_ref() {
state @ Init | state @ FirstRooms => {
// Do nothing.
(state.to_owned(), Actions::none())
}
state @ AllRooms | state @ Enjoy => {
// Refresh the lists.
(state.to_owned(), Actions::refresh_lists())
}
Terminated { .. } => {
// Having `Terminated { from: Terminated { … } }` is not allowed.
unreachable!("It's impossible to reach `Terminated` from `Terminated`");
}
}
}
};
for action in actions.iter() {
action.run(sliding_sync).await?;
}
Ok(next_state)
}
}
/// A trait to define what an `Action` is.
#[async_trait]
trait Action {
async fn run(&self, sliding_sync: &SlidingSync) -> Result<(), Error>;
}
struct AddVisibleRoomsList;
#[async_trait]
impl Action for AddVisibleRoomsList {
async fn run(&self, sliding_sync: &SlidingSync) -> Result<(), Error> {
sliding_sync
.add_list(
SlidingSyncList::builder(VISIBLE_ROOMS_LIST_NAME)
.sync_mode(SlidingSyncMode::new_selective())
.timeline_limit(20),
)
.await
.map_err(Error::SlidingSync)?;
Ok(())
}
}
struct SetAllRoomsListToGrowingSyncMode;
#[async_trait]
impl Action for SetAllRoomsListToGrowingSyncMode {
async fn run(&self, sliding_sync: &SlidingSync) -> Result<(), Error> {
sliding_sync
.on_list(ALL_ROOMS_LIST_NAME, |list| {
list.set_sync_mode(SlidingSyncMode::new_growing(50));
ready(())
})
.await
.ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_string()))?;
Ok(())
}
}
/// Type alias to represent one action.
type OneAction = Box<dyn Action + Send + Sync>;
/// Type alias to represent many actions.
type ManyActions = Vec<OneAction>;
/// A type to represent multiple actions.
///
/// It contains helper methods to create pre-configured set of actions.
struct Actions {
actions: &'static Lazy<ManyActions>,
}
macro_rules! actions {
(
$(
$action_group_name:ident => [
$( $action_name:ident ),* $(,)?
]
),*
$(,)?
) => {
$(
fn $action_group_name () -> Self {
static ACTIONS: Lazy<ManyActions> = Lazy::new(|| {
vec![
$( Box::new( $action_name ) ),*
]
});
Self { actions: &ACTIONS }
}
)*
};
}
impl Actions {
actions! {
none => [],
first_rooms_are_loaded => [SetAllRoomsListToGrowingSyncMode, AddVisibleRoomsList],
refresh_lists => [SetAllRoomsListToGrowingSyncMode],
}
fn iter(&self) -> &[OneAction] {
self.actions.as_slice()
}
}
/// An input for the [`RoomList`]' state machine.
///
/// An input is something that has happened or is happening or is requested by
/// the client app using this [`RoomList`].
#[derive(Debug)]
pub enum Input {
/// The client app's viewport of the room list has changed.
///
/// Use this input when the user of the client app is scrolling inside the
/// room list, and the viewport has changed. The viewport is defined as the
/// range of visible rooms in the room list.
Viewport(Ranges),
}
#[cfg(test)]
mod tests {
use matrix_sdk::{config::RequestConfig, Session};
use matrix_sdk_test::async_test;
use ruma::{api::MatrixVersion, device_id, user_id};
use wiremock::MockServer;
use super::*;
async fn new_client() -> (Client, MockServer) {
let session = Session {
access_token: "1234".to_owned(),
refresh_token: None,
user_id: user_id!("@example:localhost").to_owned(),
device_id: device_id!("DEVICEID").to_owned(),
};
let server = MockServer::start().await;
let client = Client::builder()
.homeserver_url(server.uri())
.server_versions([MatrixVersion::V1_0])
.request_config(RequestConfig::new().disable_retry())
.build()
.await
.unwrap();
client.restore_session(session).await.unwrap();
(client, server)
}
async fn new_room_list() -> Result<RoomList, Error> {
let (client, _) = new_client().await;
RoomList::new(client).await
}
#[async_test]
async fn test_all_rooms_are_declared() -> Result<(), Error> {
let room_list = new_room_list().await?;
let sliding_sync = room_list.sliding_sync();
// List is present, in Selective mode.
assert_eq!(
sliding_sync
.on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
list.sync_mode(),
SlidingSyncMode::Selective { ranges } if ranges == vec![0..=19]
)))
.await,
Some(true)
);
Ok(())
}
#[async_test]
async fn test_states() -> Result<(), Error> {
let room_list = new_room_list().await?;
let sliding_sync = room_list.sliding_sync();
// First state.
let state = State::Init;
// Hypothetical termination.
{
let state =
State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?;
assert_eq!(state, State::Init);
}
// Next state.
let state = state.next(sliding_sync).await?;
assert_eq!(state, State::FirstRooms);
// Hypothetical termination.
{
let state =
State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?;
assert_eq!(state, State::FirstRooms);
}
// Next state.
let state = state.next(sliding_sync).await?;
assert_eq!(state, State::AllRooms);
// Hypothetical termination.
{
let state =
State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?;
assert_eq!(state, State::AllRooms);
}
// Next state.
let state = state.next(sliding_sync).await?;
assert_eq!(state, State::Enjoy);
// Hypothetical termination.
{
let state =
State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?;
assert_eq!(state, State::Enjoy);
}
// Next state.
let state = state.next(sliding_sync).await?;
assert_eq!(state, State::Enjoy);
// Hypothetical termination.
{
let state =
State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?;
assert_eq!(state, State::Enjoy);
}
Ok(())
}
#[async_test]
async fn test_action_add_visible_rooms_list() -> Result<(), Error> {
let room_list = new_room_list().await?;
let sliding_sync = room_list.sliding_sync();
// List is absent.
assert_eq!(sliding_sync.on_list(VISIBLE_ROOMS_LIST_NAME, |_list| ready(())).await, None);
// Run the action!
AddVisibleRoomsList.run(sliding_sync).await?;
// List is present!
assert_eq!(
sliding_sync
.on_list(VISIBLE_ROOMS_LIST_NAME, |list| ready(matches!(
list.sync_mode(),
SlidingSyncMode::Selective { ranges } if ranges.is_empty()
)))
.await,
Some(true)
);
Ok(())
}
#[async_test]
async fn test_action_set_all_rooms_list_to_growing_sync_mode() -> Result<(), Error> {
let room_list = new_room_list().await?;
let sliding_sync = room_list.sliding_sync();
// List is present, in Selective mode.
assert_eq!(
sliding_sync
.on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
list.sync_mode(),
SlidingSyncMode::Selective { ranges } if ranges == vec![0..=19]
)))
.await,
Some(true)
);
// Run the action!
SetAllRoomsListToGrowingSyncMode.run(sliding_sync).await.unwrap();
// List is still present, in Growing mode.
assert_eq!(
sliding_sync
.on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
list.sync_mode(),
SlidingSyncMode::Growing { batch_size: 50, .. }
)))
.await,
Some(true)
);
Ok(())
}
}

View File

@@ -21,6 +21,8 @@ use wiremock::{
Mock, MockServer, ResponseTemplate,
};
#[cfg(feature = "experimental-room-list")]
mod room_list;
mod timeline;
#[cfg(all(test, not(target_arch = "wasm32")))]

View File

File diff suppressed because it is too large Load Diff

View File

@@ -36,7 +36,7 @@ mod echo;
mod pagination;
mod read_receipts;
#[cfg(feature = "experimental-sliding-sync")]
mod sliding_sync;
pub(crate) mod sliding_sync;
use crate::{logged_in_client, mock_sync};

View File

@@ -42,7 +42,7 @@ macro_rules! receive_response {
.mount_as_scoped(&$server)
.await;
let next = $sliding_sync_stream.next().await.context("`stream` trip")??;
let next = $sliding_sync_stream.next().await.context("`sync` trip")??;
next
}
@@ -64,6 +64,8 @@ macro_rules! timeline_event {
}
}
pub(crate) use timeline_event;
macro_rules! assert_timeline_stream {
// `--- day divider ---`
( @_ [ $stream:ident ] [ --- day divider --- ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => {
@@ -77,7 +79,12 @@ macro_rules! assert_timeline_stream {
assert_matches!(
$stream.next().now_or_never(),
Some(Some(VectorDiff::PushBack { value })) => {
assert_matches!(value.as_ref(), TimelineItem::Virtual(VirtualTimelineItem::DayDivider(_)));
assert_matches!(
value.as_ref(),
TimelineItem::Virtual(
VirtualTimelineItem::DayDivider(_)
)
);
}
);
}
@@ -162,6 +169,8 @@ macro_rules! assert_timeline_stream {
};
}
pub(crate) use assert_timeline_stream;
async fn new_sliding_sync(lists: Vec<SlidingSyncListBuilder>) -> Result<(MockServer, SlidingSync)> {
let (client, server) = logged_in_client().await;
@@ -201,7 +210,7 @@ async fn create_one_room(
assert!(update.rooms.contains(&room_id.to_owned()));
let room = sliding_sync.get_room(room_id).context("`get_room`")?;
let room = sliding_sync.get_room(room_id).await.context("`get_room`")?;
assert_eq!(room.name(), Some(room_name.clone()));
Ok(())
@@ -213,6 +222,7 @@ async fn timeline(
) -> Result<(Vector<Arc<TimelineItem>>, impl Stream<Item = VectorDiff<Arc<TimelineItem>>>)> {
Ok(sliding_sync
.get_room(room_id)
.await
.unwrap()
.timeline()
.await

View File

@@ -47,7 +47,7 @@ appservice = ["ruma/appservice-api-s"]
image-proc = ["dep:image"]
image-rayon = ["image-proc", "image?/jpeg_rayon"]
experimental-sliding-sync = ["matrix-sdk-base/experimental-sliding-sync", "reqwest/gzip"]
experimental-sliding-sync = ["matrix-sdk-base/experimental-sliding-sync", "reqwest/gzip", "dep:eyeball-im-util"]
docsrs = [
"e2e-encryption",
@@ -68,6 +68,7 @@ dashmap = { workspace = true }
event-listener = "2.5.2"
eyeball = { workspace = true }
eyeball-im = { workspace = true }
eyeball-im-util = { workspace = true, optional = true }
eyre = { version = "0.6.8", optional = true }
futures-core = { workspace = true }
futures-util = { workspace = true }

View File

@@ -411,6 +411,7 @@ use ruma::{assign, api::client::sync::sync_events::v4, events::StateEventType};
use tracing::{warn, error, info, debug};
use futures_util::{pin_mut, StreamExt};
use url::Url;
use std::future::ready;
# async {
# let homeserver = Url::parse("http://example.com")?;
# let client = Client::new(homeserver).await?;
@@ -447,9 +448,9 @@ let sliding_sync = sliding_sync_builder
// subscribe to the list APIs for updates
let (list_state_stream, list_count_stream, list_stream) = sliding_sync.on_list(&active_list_name, |list| {
(list.state_stream(), list.maximum_number_of_rooms_stream(), list.room_list_stream())
}).unwrap();
let (list_state_stream, list_count_stream, (_, list_stream)) = sliding_sync.on_list(&active_list_name, |list| {
ready((list.state_stream(), list.maximum_number_of_rooms_stream(), list.room_list_stream()))
}).await.unwrap();
tokio::spawn(async move {
pin_mut!(list_state_stream);

View File

@@ -7,7 +7,7 @@ use ruma::{
},
OwnedRoomId,
};
use tokio::sync::broadcast::channel;
use tokio::sync::{broadcast::channel, RwLock as AsyncRwLock};
use url::Url;
use super::{
@@ -249,8 +249,8 @@ impl SlidingSyncBuilder {
.await?;
}
let rooms = StdRwLock::new(self.rooms);
let lists = StdRwLock::new(lists);
let rooms = AsyncRwLock::new(self.rooms);
let lists = AsyncRwLock::new(lists);
Ok(SlidingSync::new(SlidingSyncInner {
_id: Some(self.id),

View File

@@ -76,13 +76,13 @@ pub(super) async fn store_sliding_sync_state(sliding_sync: &SlidingSync) -> Resu
// Write every `SlidingSyncList` that's configured for caching into the store.
let frozen_lists = {
let rooms_lock = sliding_sync.inner.rooms.read().unwrap();
let rooms_lock = sliding_sync.inner.rooms.read().await;
sliding_sync
.inner
.lists
.read()
.unwrap()
.await
.iter()
.filter_map(|(list_name, list)| {
matches!(list.cache_policy(), SlidingSyncListCachePolicy::Enabled).then(|| {
@@ -231,157 +231,152 @@ mod tests {
}
#[allow(clippy::await_holding_lock)]
#[test]
fn test_sliding_sync_can_be_stored_and_restored() -> Result<()> {
block_on(async {
let client = logged_in_client(Some("https://foo.bar".to_owned())).await;
#[tokio::test]
async fn test_sliding_sync_can_be_stored_and_restored() -> Result<()> {
let client = logged_in_client(Some("https://foo.bar".to_owned())).await;
let store = client.store();
let store = client.store();
// Store entries don't exist.
assert!(store
.get_custom_value(format_storage_key_for_sliding_sync("hello").as_bytes())
// Store entries don't exist.
assert!(store
.get_custom_value(format_storage_key_for_sliding_sync("hello").as_bytes())
.await?
.is_none());
assert!(store
.get_custom_value(
format_storage_key_for_sliding_sync_list("hello", "list_foo").as_bytes()
)
.await?
.is_none());
assert!(store
.get_custom_value(
format_storage_key_for_sliding_sync_list("hello", "list_bar").as_bytes()
)
.await?
.is_none());
// Create a new `SlidingSync` instance, and store it.
let storage_key = {
let sync_id = "test-sync-id";
let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap());
let sliding_sync = client
.sliding_sync(sync_id)?
.enable_caching()?
.add_cached_list(SlidingSyncList::builder("list_foo"))
.await?
.is_none());
.add_list(SlidingSyncList::builder("list_bar"))
.build()
.await?;
assert!(store
.get_custom_value(
format_storage_key_for_sliding_sync_list("hello", "list_foo").as_bytes()
)
// Modify both lists, so we can check expected caching behavior later.
{
let lists = sliding_sync.inner.lists.write().await;
let list_foo = lists.get("list_foo").unwrap();
list_foo.set_maximum_number_of_rooms(Some(42));
let list_bar = lists.get("list_bar").unwrap();
list_bar.set_maximum_number_of_rooms(Some(1337));
}
assert!(sliding_sync.cache_to_storage().await.is_ok());
storage_key
};
// Store entries now exist for the sliding sync object and list_foo.
assert!(store
.get_custom_value(format_storage_key_for_sliding_sync(&storage_key).as_bytes())
.await?
.is_some());
assert!(store
.get_custom_value(
format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes()
)
.await?
.is_some());
// But not for list_bar.
assert!(store
.get_custom_value(
format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes()
)
.await?
.is_none());
// Create a new `SlidingSync`, and it should be read from the cache.
let storage_key = {
let sync_id = "test-sync-id";
let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap());
let max_number_of_room_stream = Arc::new(RwLock::new(None));
let cloned_stream = max_number_of_room_stream.clone();
let sliding_sync = client
.sliding_sync(sync_id)?
.enable_caching()?
.add_cached_list(SlidingSyncList::builder("list_foo").once_built(move |list| {
// In the `once_built()` handler, nothing has been read from the cache yet.
assert_eq!(list.maximum_number_of_rooms(), None);
let mut stream = cloned_stream.write().unwrap();
*stream = Some(list.maximum_number_of_rooms_stream());
list
}))
.await?
.is_none());
.add_list(SlidingSyncList::builder("list_bar"))
.build()
.await?;
assert!(store
.get_custom_value(
format_storage_key_for_sliding_sync_list("hello", "list_bar").as_bytes()
)
.await?
.is_none());
// Check the list' state.
{
let lists = sliding_sync.inner.lists.write().await;
// Create a new `SlidingSync` instance, and store it.
let storage_key = {
let sync_id = "test-sync-id";
let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap());
let sliding_sync = client
.sliding_sync(sync_id)?
.enable_caching()?
.add_cached_list(SlidingSyncList::builder("list_foo"))
.await?
.add_list(SlidingSyncList::builder("list_bar"))
.build()
.await?;
// This one was cached.
let list_foo = lists.get("list_foo").unwrap();
assert_eq!(list_foo.maximum_number_of_rooms(), Some(42));
// Modify both lists, so we can check expected caching behavior later.
{
let lists = sliding_sync.inner.lists.write().unwrap();
// This one wasn't.
let list_bar = lists.get("list_bar").unwrap();
assert_eq!(list_bar.maximum_number_of_rooms(), None);
}
let list_foo = lists.get("list_foo").unwrap();
list_foo.set_maximum_number_of_rooms(Some(42));
// The maximum number of rooms reloaded from the cache should have been
// published.
{
let mut stream =
max_number_of_room_stream.write().unwrap().take().expect("stream must be set");
let initial_max_number_of_rooms =
stream.next().await.expect("stream must have emitted something");
assert_eq!(initial_max_number_of_rooms, Some(42));
}
let list_bar = lists.get("list_bar").unwrap();
list_bar.set_maximum_number_of_rooms(Some(1337));
}
// Clean the cache.
let lists = sliding_sync.inner.lists.read().await;
clean_storage(&client, &storage_key, &lists).await;
storage_key
};
assert!(sliding_sync.cache_to_storage().await.is_ok());
storage_key
};
// Store entries don't exist.
assert!(store
.get_custom_value(format_storage_key_for_sliding_sync(&storage_key).as_bytes())
.await?
.is_none());
// Store entries now exist for the sliding sync object and list_foo.
assert!(store
.get_custom_value(format_storage_key_for_sliding_sync(&storage_key).as_bytes())
.await?
.is_some());
assert!(store
.get_custom_value(
format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes()
)
.await?
.is_none());
assert!(store
.get_custom_value(
format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes()
)
.await?
.is_some());
assert!(store
.get_custom_value(
format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes()
)
.await?
.is_none());
// But not for list_bar.
assert!(store
.get_custom_value(
format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes()
)
.await?
.is_none());
// Create a new `SlidingSync`, and it should be read from the cache.
let storage_key = {
let sync_id = "test-sync-id";
let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap());
let max_number_of_room_stream = Arc::new(RwLock::new(None));
let cloned_stream = max_number_of_room_stream.clone();
let sliding_sync = client
.sliding_sync(sync_id)?
.enable_caching()?
.add_cached_list(SlidingSyncList::builder("list_foo").once_built(move |list| {
// In the `once_built()` handler, nothing has been read from the cache yet.
assert_eq!(list.maximum_number_of_rooms(), None);
let mut stream = cloned_stream.write().unwrap();
*stream = Some(list.maximum_number_of_rooms_stream());
list
}))
.await?
.add_list(SlidingSyncList::builder("list_bar"))
.build()
.await?;
// Check the list' state.
{
let lists = sliding_sync.inner.lists.write().unwrap();
// This one was cached.
let list_foo = lists.get("list_foo").unwrap();
assert_eq!(list_foo.maximum_number_of_rooms(), Some(42));
// This one wasn't.
let list_bar = lists.get("list_bar").unwrap();
assert_eq!(list_bar.maximum_number_of_rooms(), None);
}
// The maximum number of rooms reloaded from the cache should have been
// published.
{
let mut stream = max_number_of_room_stream
.write()
.unwrap()
.take()
.expect("stream must be set");
let initial_max_number_of_rooms =
stream.next().await.expect("stream must have emitted something");
assert_eq!(initial_max_number_of_rooms, Some(42));
}
// Clean the cache.
clean_storage(&client, &storage_key, &sliding_sync.inner.lists.read().unwrap())
.await;
storage_key
};
// Store entries don't exist.
assert!(store
.get_custom_value(format_storage_key_for_sliding_sync(&storage_key).as_bytes())
.await?
.is_none());
assert!(store
.get_custom_value(
format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes()
)
.await?
.is_none());
assert!(store
.get_custom_value(
format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes()
)
.await?
.is_none());
Ok(())
})
Ok(())
}
}

View File

@@ -194,6 +194,9 @@ impl SlidingSyncListBuilder {
) -> SlidingSyncList {
let list = SlidingSyncList {
inner: Arc::new(SlidingSyncListInner {
#[cfg(any(test, feature = "testing"))]
sync_mode: StdRwLock::new(self.sync_mode.clone()),
// From the builder
sort: self.sort,
required_state: self.required_state,

View File

@@ -14,8 +14,10 @@ use std::{
pub use builder::*;
use eyeball::unique::Observable;
use eyeball_im::{ObservableVector, VectorDiff};
use eyeball_im_util::{FilteredVectorSubscriber, VectorExt};
pub(super) use frozen::FrozenSlidingSyncList;
use futures_core::Stream;
use imbl::Vector;
pub(super) use request_generator::*;
pub use room_list_entry::RoomListEntry;
use ruma::{
@@ -42,9 +44,15 @@ pub(crate) enum SlidingSyncListCachePolicy {
}
/// The type used to express natural bounds (including but not limited to:
/// ranges, timeline limit) in the sliding sync SDK.
/// ranges, timeline limit) in the Sliding Sync.
pub type Bound = u32;
/// One range of rooms in a response from Sliding Sync.
pub type Range = RangeInclusive<Bound>;
/// Many ranges of rooms.
pub type Ranges = Vec<Range>;
/// Holding a specific filtered list within the concept of sliding sync.
///
/// It is OK to clone this type as much as you need: cloning it is cheap.
@@ -53,6 +61,8 @@ pub struct SlidingSyncList {
inner: Arc<SlidingSyncListInner>,
}
type BoxedRoomListEntryFilter = Box<dyn Fn(&RoomListEntry) -> bool + Sync + Send>;
impl SlidingSyncList {
/// Create a new [`SlidingSyncListBuilder`] with the given name.
pub fn builder(name: impl Into<String>) -> SlidingSyncListBuilder {
@@ -75,7 +85,10 @@ impl SlidingSyncList {
/// means that the state is not reset **purposely**. The ranges and the
/// state will be updated when the next request will be sent and a
/// response will be received. The maximum number of rooms won't change.
pub fn set_sync_mode(&self, sync_mode: impl Into<SlidingSyncMode>) {
pub fn set_sync_mode<M>(&self, sync_mode: M)
where
M: Into<SlidingSyncMode>,
{
self.inner.set_sync_mode(sync_mode.into());
// When the sync mode is changed, the sync loop must skip over any work in its
@@ -126,8 +139,35 @@ impl SlidingSyncList {
///
/// There's no guarantee of ordering between items emitted by this stream
/// and those emitted by other streams exposed on this structure.
pub fn room_list_stream(&self) -> impl Stream<Item = VectorDiff<RoomListEntry>> {
ObservableVector::subscribe(&self.inner.room_list.read().unwrap())
///
/// The first part of the returned tuple is the actual room list entries,
/// and the second part is the `Stream` to receive updates on the room list
/// entries.
pub fn room_list_stream(
&self,
) -> (Vector<RoomListEntry>, impl Stream<Item = VectorDiff<RoomListEntry>>) {
let read_lock = self.inner.room_list.read().unwrap();
let previous_values = (*read_lock).clone();
let subscriber = ObservableVector::subscribe(&read_lock);
(previous_values, subscriber)
}
/// Get a stream of room list, but filtered.
///
/// It's similar to [`Self::room_list_stream`] but the room list is filtered
/// by `filter`.
pub fn room_list_filtered_stream<F>(
&self,
filter: F,
) -> (Vector<RoomListEntry>, FilteredVectorSubscriber<RoomListEntry, BoxedRoomListEntryFilter>)
where
F: Fn(&RoomListEntry) -> bool + Sync + Send + 'static,
{
ObservableVector::subscribe_filtered(
&self.inner.room_list.read().unwrap(),
Box::new(filter),
)
}
/// Get the maximum number of rooms. See [`Self::maximum_number_of_rooms`]
@@ -204,14 +244,21 @@ impl SlidingSyncList {
}
}
#[cfg(test)]
#[cfg(any(test, feature = "testing"))]
#[allow(dead_code)]
impl SlidingSyncList {
/// Set the maximum number of rooms.
pub(super) fn set_maximum_number_of_rooms(&self, maximum_number_of_rooms: Option<u32>) {
Observable::set(
&mut self.inner.maximum_number_of_rooms.write().unwrap(),
maximum_number_of_rooms,
);
}
/// Get the sync-mode.
pub fn sync_mode(&self) -> SlidingSyncMode {
self.inner.sync_mode.read().unwrap().clone()
}
}
#[derive(Debug)]
@@ -260,6 +307,9 @@ pub(super) struct SlidingSyncListInner {
/// The `bump_event_types` field. See
/// [`SlidingSyncListBuilder::bump_event_types`] to learn more.
bump_event_types: Vec<TimelineEventType>,
#[cfg(any(test, feature = "testing"))]
sync_mode: StdRwLock<SlidingSyncMode>,
}
impl SlidingSyncListInner {
@@ -270,6 +320,11 @@ impl SlidingSyncListInner {
/// The [`Self::state`] is immediately updated to reflect the new state. The
/// [`Self::maximum_number_of_rooms`] won't change.
pub fn set_sync_mode(&self, sync_mode: SlidingSyncMode) {
#[cfg(any(test, feature = "testing"))]
{
*self.sync_mode.write().unwrap() = sync_mode.clone();
}
{
let mut request_generator = self.request_generator.write().unwrap();
*request_generator = SlidingSyncListRequestGenerator::new(sync_mode);
@@ -729,10 +784,16 @@ impl SlidingSyncSelectiveModeBuilder {
}
/// Select a range to fetch.
pub fn add_range(mut self, range: RangeInclusive<Bound>) -> Self {
pub fn add_range(mut self, range: Range) -> Self {
self.ranges.push(range);
self
}
/// Select many ranges to fetch.
pub fn add_ranges(mut self, ranges: Ranges) -> Self {
self.ranges.extend(ranges);
self
}
}
impl From<SlidingSyncSelectiveModeBuilder> for SlidingSyncMode {
@@ -847,6 +908,7 @@ mod tests {
use futures_util::StreamExt;
use imbl::vector;
use matrix_sdk_test::async_test;
use ruma::{api::client::sync::sync_events::v4::SlidingOp, room_id, uint};
use serde_json::json;
use tokio::{
@@ -869,8 +931,8 @@ mod tests {
};
}
#[test]
fn test_sliding_sync_list_selective_mode() {
#[async_test]
async fn test_sliding_sync_list_selective_mode() {
let (sender, mut receiver) = channel(1);
// Set range on `Selective`.
@@ -889,7 +951,7 @@ mod tests {
// There shouldn't be any internal request to restart the sync loop yet.
assert!(matches!(receiver.try_recv(), Err(TryRecvError::Empty)));
list.set_sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(4..=5));
list.set_sync_mode(SlidingSyncMode::new_selective().add_range(4..=5));
{
let mut generator = list.inner.request_generator.write().unwrap();
@@ -912,7 +974,7 @@ mod tests {
let (sender, _receiver) = channel(1);
let list = SlidingSyncList::builder("foo")
.sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(0..=1))
.sync_mode(SlidingSyncMode::new_selective().add_range(0..=1))
.timeline_limit(7)
.build(sender);
@@ -930,7 +992,7 @@ mod tests {
let (sender, _receiver) = channel(1);
let mut list = SlidingSyncList::builder("foo")
.sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(0..=1))
.sync_mode(SlidingSyncMode::new_selective().add_range(0..=1))
.build(sender);
let room0 = room_id!("!room0:bar.org");
@@ -1174,7 +1236,7 @@ mod tests {
let (sender, _receiver) = channel(1);
let mut list = SlidingSyncList::builder("testing")
.sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(0..=10).add_range(42..=153))
.sync_mode(SlidingSyncMode::new_selective().add_range(0..=10).add_range(42..=153))
.build(sender);
assert_ranges! {
@@ -1201,12 +1263,12 @@ mod tests {
};
}
#[test]
fn test_generator_selective_with_modifying_ranges_on_the_fly() {
#[tokio::test]
async fn test_generator_selective_with_modifying_ranges_on_the_fly() {
let (sender, _receiver) = channel(4);
let mut list = SlidingSyncList::builder("testing")
.sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(0..=10).add_range(42..=153))
.sync_mode(SlidingSyncMode::new_selective().add_range(0..=10).add_range(42..=153))
.build(sender);
assert_ranges! {
@@ -1232,7 +1294,7 @@ mod tests {
}
};
list.set_sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(3..=7));
list.set_sync_mode(SlidingSyncMode::new_selective().add_range(3..=7));
assert_ranges! {
list = list,
@@ -1245,7 +1307,7 @@ mod tests {
},
};
list.set_sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(42..=77));
list.set_sync_mode(SlidingSyncMode::new_selective().add_range(42..=77));
assert_ranges! {
list = list,
@@ -1258,7 +1320,7 @@ mod tests {
},
};
list.set_sync_mode(SlidingSyncSelectiveModeBuilder::new());
list.set_sync_mode(SlidingSyncMode::new_selective());
assert_ranges! {
list = list,
@@ -1272,12 +1334,12 @@ mod tests {
};
}
#[test]
fn test_generator_changing_sync_mode_to_various_modes() {
#[async_test]
async fn test_generator_changing_sync_mode_to_various_modes() {
let (sender, _receiver) = channel(4);
let mut list = SlidingSyncList::builder("testing")
.sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(0..=10).add_range(42..=153))
.sync_mode(SlidingSyncMode::new_selective().add_range(0..=10).add_range(42..=153))
.build(sender);
assert_ranges! {
@@ -1376,14 +1438,14 @@ mod tests {
};
// Changing from `Paging` to `Selective`.
list.set_sync_mode(SlidingSyncSelectiveModeBuilder::new());
list.set_sync_mode(SlidingSyncMode::new_selective());
assert_eq!(list.state(), SlidingSyncState::PartiallyLoaded); // we had some partial state, but we can't be sure it's fully loaded until the
// next request
// We need to update the ranges, of course, as they are not managed
// automatically anymore.
list.set_sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(0..=100));
list.set_sync_mode(SlidingSyncMode::new_selective().add_range(0..=100));
assert_ranges! {
list = list,
@@ -1415,7 +1477,7 @@ mod tests {
let (sender, _receiver) = channel(1);
let mut list = SlidingSyncList::builder("foo")
.sync_mode(SlidingSyncSelectiveModeBuilder::new().add_range(0..=3))
.sync_mode(SlidingSyncMode::new_selective().add_range(0..=3))
.build(sender);
assert_eq!(**list.inner.maximum_number_of_rooms.read().unwrap(), None);
@@ -1463,7 +1525,7 @@ mod tests {
);
}
let mut room_list_stream = list.room_list_stream();
let (_, mut room_list_stream) = list.room_list_stream();
let (room_list_stream_sender, mut room_list_stream_receiver) = unbounded_channel();
spawn(async move {
@@ -1583,14 +1645,13 @@ mod tests {
entries!( @_ [ $( $( $rest )* )? ] [ $( $accumulator )* RoomListEntry::Invalidated(room_id!( $room_id ).to_owned()), ] )
};
( @_ [] [ $( $accumulator:tt )+ ] ) => {
( @_ [] [ $( $accumulator:tt )* ] ) => {
vector![ $( $accumulator )* ]
};
( $( $all:tt )* ) => {
entries!( @_ [ $( $all )* ] [] )
};
}
macro_rules! assert_sync_operations {

View File

@@ -29,14 +29,11 @@
//! user-specified limit representing the maximum number of rooms the user
//! actually wants to load.
use std::{cmp::min, ops::RangeInclusive};
use std::cmp::min;
use super::{Bound, SlidingSyncMode};
use super::{Range, Ranges, SlidingSyncMode};
use crate::{sliding_sync::Error, SlidingSyncState};
/// Range of rooms in a response from sliding sync.
pub type Ranges = Vec<RangeInclusive<u32>>;
/// The kind of request generator.
#[derive(Debug, PartialEq)]
pub(super) enum SlidingSyncListRequestGeneratorKind {
@@ -80,7 +77,7 @@ pub(in super::super) struct SlidingSyncListRequestGenerator {
/// The current ranges used by this request generator.
///
/// Note there's only one range in the `Growing` and `Paging` mode.
ranges: Vec<RangeInclusive<u32>>,
ranges: Ranges,
/// The kind of request generator.
pub(super) kind: SlidingSyncListRequestGeneratorKind,
}
@@ -122,7 +119,7 @@ impl SlidingSyncListRequestGenerator {
/// For generators in the selective mode, this is the initial set of ranges.
/// For growing and paginated generators, this is the range committed in the
/// latest response received from the server.
pub(super) fn requested_ranges(&self) -> &[RangeInclusive<Bound>] {
pub(super) fn requested_ranges(&self) -> &[Range] {
&self.ranges
}
@@ -291,7 +288,7 @@ fn create_range(
desired_size: u32,
maximum_number_of_rooms_to_fetch: Option<u32>,
maximum_number_of_rooms: Option<u32>,
) -> Result<RangeInclusive<Bound>, Error> {
) -> Result<Range, Error> {
// Calculate the range.
// The `start` bound is given. Let's calculate the `end` bound.
@@ -322,7 +319,7 @@ fn create_range(
return Err(Error::InvalidRange { start, end });
}
Ok(RangeInclusive::new(start, end))
Ok(Range::new(start, end))
}
#[cfg(test)]

View File

@@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
/// Represent a room entry in the [`SlidingSyncList`][super::SlidingSyncList].
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
#[cfg_attr(any(test, feature = "testing"), derive(PartialEq))]
pub enum RoomListEntry {
/// This entry isn't known at this point and thus considered `Empty`.
#[default]

View File

@@ -25,6 +25,7 @@ mod room;
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Debug,
future::Future,
sync::{
atomic::{AtomicU8, Ordering},
Arc, RwLock as StdRwLock,
@@ -49,7 +50,7 @@ use ruma::{
use serde::{Deserialize, Serialize};
use tokio::{
select, spawn,
sync::{broadcast::Sender, Mutex as AsyncMutex},
sync::{broadcast::Sender, Mutex as AsyncMutex, RwLock as AsyncRwLock},
};
use tracing::{debug, error, instrument, warn, Instrument, Span};
use url::Url;
@@ -97,10 +98,10 @@ pub(super) struct SlidingSyncInner {
position: StdRwLock<SlidingSyncPositionMarkers>,
/// The lists of this Sliding Sync instance.
lists: StdRwLock<BTreeMap<String, SlidingSyncList>>,
lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
/// The rooms details
rooms: StdRwLock<BTreeMap<OwnedRoomId, SlidingSyncRoom>>,
rooms: AsyncRwLock<BTreeMap<OwnedRoomId, SlidingSyncRoom>>,
/// Room subscriptions, i.e. rooms that may be out-of-scope of all lists but
/// one wants to receive updates.
@@ -162,13 +163,13 @@ impl SlidingSync {
}
/// Lookup a specific room
pub fn get_room(&self, room_id: &RoomId) -> Option<SlidingSyncRoom> {
self.inner.rooms.read().unwrap().get(room_id).cloned()
pub async fn get_room(&self, room_id: &RoomId) -> Option<SlidingSyncRoom> {
self.inner.rooms.read().await.get(room_id).cloned()
}
/// Check the number of rooms.
pub fn get_number_of_rooms(&self) -> usize {
self.inner.rooms.read().unwrap().len()
self.inner.rooms.blocking_read().len()
}
#[instrument(skip(self))]
@@ -177,13 +178,21 @@ impl SlidingSync {
}
/// Find a list by its name, and do something on it if it exists.
pub fn on_list<F, R>(&self, list_name: &str, f: F) -> Option<R>
pub async fn on_list<Function, FunctionOutput, R>(
&self,
list_name: &str,
function: Function,
) -> Option<R>
where
F: FnOnce(&SlidingSyncList) -> R,
Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
FunctionOutput: Future<Output = R>,
{
let lists = self.inner.lists.read().unwrap();
let lists = self.inner.lists.read().await;
lists.get(list_name).map(f)
match lists.get(list_name) {
Some(list) => Some(function(list).await),
None => None,
}
}
/// Add the list to the list of lists.
@@ -197,7 +206,7 @@ impl SlidingSync {
) -> Result<Option<SlidingSyncList>> {
let list = list_builder.build(self.inner.internal_channel.clone());
let old_list = self.inner.lists.write().unwrap().insert(list.name().to_owned(), list);
let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
self.inner.internal_channel_send_if_possible(
SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
@@ -224,7 +233,7 @@ impl SlidingSync {
list_builder.set_cached_and_reload(&self.inner.client, storage_key).await?;
if !reloaded_rooms.is_empty() {
let mut rooms = self.inner.rooms.write().unwrap();
let mut rooms = self.inner.rooms.write().await;
for (key, frozen) in reloaded_rooms {
rooms.entry(key).or_insert_with(|| {
@@ -237,18 +246,18 @@ impl SlidingSync {
}
/// Lookup a set of rooms
pub fn get_rooms<I: Iterator<Item = OwnedRoomId>>(
pub async fn get_rooms<I: Iterator<Item = OwnedRoomId>>(
&self,
room_ids: I,
) -> Vec<Option<SlidingSyncRoom>> {
let rooms = self.inner.rooms.read().unwrap();
let rooms = self.inner.rooms.read().await;
room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect()
}
/// Get all rooms.
pub fn get_all_rooms(&self) -> Vec<SlidingSyncRoom> {
self.inner.rooms.read().unwrap().values().cloned().collect()
pub async fn get_all_rooms(&self) -> Vec<SlidingSyncRoom> {
self.inner.rooms.read().await.values().cloned().collect()
}
fn prepare_extension_config(&self, pos: Option<&str>) -> ExtensionsConfig {
@@ -271,7 +280,7 @@ impl SlidingSync {
}
/// Handle the HTTP response.
#[instrument(skip_all, fields(lists = self.inner.lists.read().unwrap().len()))]
#[instrument(skip_all)]
async fn handle_response(
&self,
sliding_sync_response: v4::Response,
@@ -302,7 +311,7 @@ impl SlidingSync {
let update_summary = {
// Update the rooms.
let updated_rooms = {
let mut rooms_map = self.inner.rooms.write().unwrap();
let mut rooms_map = self.inner.rooms.write().await;
let mut updated_rooms = Vec::with_capacity(sliding_sync_response.rooms.len());
@@ -346,7 +355,7 @@ impl SlidingSync {
// Update the lists.
let updated_lists = {
let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
let mut lists = self.inner.lists.write().unwrap();
let mut lists = self.inner.lists.write().await;
for (name, updates) in sliding_sync_response.lists {
let Some(list) = lists.get_mut(&name) else {
@@ -378,17 +387,13 @@ impl SlidingSync {
}
#[instrument(skip_all, fields(pos))]
async fn sync_once(&self) -> Result<Option<UpdateSummary>> {
async fn sync_once(&self) -> Result<UpdateSummary> {
let (request, request_config, requested_room_unsubscriptions) = {
// Collect requests for lists.
let mut requests_lists = BTreeMap::new();
{
let mut lists = self.inner.lists.write().unwrap();
if lists.is_empty() {
return Ok(None);
}
let mut lists = self.inner.lists.write().await;
for (name, list) in lists.iter_mut() {
requests_lists.insert(name.clone(), list.next_request()?);
@@ -445,7 +450,7 @@ impl SlidingSync {
// coming from the `OlmMachine::outgoing_requests()` method.
#[cfg(feature = "e2e-encryption")]
let response = {
debug!("Sliding Sync is sending the request along with outgoing E2EE requests");
debug!("Sliding Sync is sending the request along with outgoing E2EE requests");
let (e2ee_uploads, response) =
futures_util::future::join(self.inner.client.send_outgoing_requests(), request)
@@ -510,7 +515,7 @@ impl SlidingSync {
debug!("Sliding Sync response has been fully handled");
Ok(Some(updates))
Ok(updates)
};
spawn(future.instrument(Span::current())).await.unwrap()
@@ -520,6 +525,10 @@ impl SlidingSync {
///
/// This method returns a `Stream`, which will send requests and will handle
/// responses automatically. Lists and rooms are updated automatically.
///
/// This function returns `Ok(…)` if everything went well, otherwise it will
/// return `Err(…)`. An `Err` will _always_ lead to the `Stream`
/// termination.
#[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
#[instrument(name = "sync_stream", skip_all)]
pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
@@ -557,16 +566,12 @@ impl SlidingSync {
update_summary = self.sync_once().instrument(sync_span.clone()) => {
match update_summary {
Ok(Some(updates)) => {
Ok(updates) => {
self.inner.reset_counter.store(0, Ordering::SeqCst);
yield Ok(updates);
}
Ok(None) => {
break;
}
Err(error) => {
if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
// The session has expired.
@@ -582,6 +587,7 @@ impl SlidingSync {
// The session has expired too many times, let's raise an error!
yield Err(error);
// Terminates the loop, and terminates the stream.
break;
}
@@ -597,11 +603,13 @@ impl SlidingSync {
debug!(?self.inner.extensions, ?self.inner.position, "Sliding Sync has been reset");
});
continue;
}
yield Err(error);
continue;
break;
}
}
}
@@ -871,7 +879,7 @@ mod tests {
)
.await?;
let lists = sliding_sync.inner.lists.read().unwrap();
let lists = sliding_sync.inner.lists.read().await;
assert!(lists.contains_key("foo"));
assert!(lists.contains_key("bar"));
@@ -892,26 +900,20 @@ mod tests {
pin_mut!(stream);
// The sync-loop is actually running.
for _ in 0..3 {
assert!(stream.next().await.is_some());
}
assert!(stream.next().await.is_some());
// Stop the sync-loop.
sliding_sync.stop_sync()?;
// The sync-loop is actually stopped.
for _ in 0..3 {
assert!(stream.next().await.is_none());
}
assert!(stream.next().await.is_none());
// Start a new sync-loop.
let stream = sliding_sync.sync();
pin_mut!(stream);
// The sync-loop is actually running.
for _ in 0..3 {
assert!(stream.next().await.is_some());
}
assert!(stream.next().await.is_some());
Ok(())
}

View File

@@ -169,12 +169,13 @@ fn check_typos() -> Result<()> {
}
fn check_clippy() -> Result<()> {
cmd!("rustup run {NIGHTLY} cargo clippy --all-targets -- -D warnings").run()?;
cmd!("rustup run {NIGHTLY} cargo clippy --all-targets --features testing -- -D warnings")
.run()?;
cmd!(
"rustup run {NIGHTLY} cargo clippy --workspace --all-targets
--exclude matrix-sdk-crypto --exclude xtask
--no-default-features
--features native-tls,experimental-sliding-sync,sso-login
--features native-tls,experimental-sliding-sync,sso-login,testing
-- -D warnings"
)
.run()?;