diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5840fe3d3..0c48cb3b5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -204,7 +204,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: nextest - args: run --workspace --exclude matrix-sdk-integration-testing + args: run --workspace --exclude matrix-sdk-integration-testing --exclude sliding-sync-integration-test - name: Test documentation uses: actions-rs/cargo@v1 @@ -443,3 +443,75 @@ jobs: with: command: nextest args: run -p matrix-sdk-integration-testing + + + sliding-sync-integration-tests: + name: Sliding Sync Integration test + # disabled until we can figure out the weird docker-not-starting-situation + if: false + # if: github.event_name == 'push' || !github.event.pull_request.draft + + runs-on: ubuntu-latest + # Service containers to run with `runner-job` + services: + # Label used to access the service container + postgres: + # Docker Hub image + image: postgres + # Provide the password for postgres + env: + POSTGRES_PASSWORD: postgres + POSTGRES_USER: postgres + POSTGRES_DB: syncv3 + # Set health checks to wait until postgres has started + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + # Maps tcp port 5432 on service container to the host + - 5432:5432 + + steps: + - name: Checkout the repo + uses: actions/checkout@v3 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + profile: minimal + override: true + + - name: Load cache + uses: Swatinem/rust-cache@v1 + + - name: Install nextest + uses: taiki-e/install-action@nextest + + - uses: actions/setup-python@v4 + with: + python-version: 3.8 + + # local synapse + - uses: michaelkaye/setup-matrix-synapse@main + with: + uploadLogs: true + httpPort: 8228 + disableRateLimiting: true + + # latest sliding sync proxy + + - uses: addnab/docker-run-action@v3 + with: + registry: gcr.io + image: "matrix-org/sliding-sync:v0.98.0" + docker_network: "host" + options: '-e "SYNCV3_SERVER=http://locahost:8228" -e "SYNCV3_SECRET=SUPER_CI_SECRET" -e "SYNCV3_BINDADDR=:8118" -e "SYNCV3_DB=user=postgres password=postgres dbname=syncv3 sslmode=disable host=postgres" -p 8118:8118' + + - name: Test + uses: actions-rs/cargo@v1 + with: + command: nextest + args: run -p sliding-sync-integration-tests diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 09488a68d..a4103d8b7 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -53,7 +53,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: tarpaulin - args: --out Xml + args: --out Xml -e sliding-sync-integration-test - name: Upload to codecov.io uses: codecov/codecov-action@v3 diff --git a/Cargo.lock b/Cargo.lock index b8d6f154b..44f49ea13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4441,6 +4441,19 @@ dependencies = [ "syntect", ] +[[package]] +name = "sliding-sync-integration-test" +version = "0.1.0" +dependencies = [ + "anyhow", + "ctor", + "futures", + "matrix-sdk", + "matrix-sdk-integration-testing", + "tokio", + "uuid", +] + [[package]] name = "slog" version = "2.7.0" diff --git a/bindings/matrix-sdk-ffi/src/api.udl b/bindings/matrix-sdk-ffi/src/api.udl index d7885c1c9..d52536c7e 100644 --- a/bindings/matrix-sdk-ffi/src/api.udl +++ b/bindings/matrix-sdk-ffi/src/api.udl @@ -82,9 +82,8 @@ interface SlidingSyncViewRoomsListDiff { u32 new_index ); Push(RoomListEntry value); - // The following are supported by the generic VecDiff-type but - // in sliding sync effectively do not happen and thus aren't exposed - // to not pollute the API: Pop(); Clear(); + Pop(); + Clear(); }; callback interface SlidingSyncViewRoomListObserver { diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index 1533a44fd..cda9402ae 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -250,6 +250,8 @@ pub enum SlidingSyncViewRoomsListDiff { RemoveAt { index: u32 }, Move { old_index: u32, new_index: u32 }, Push { value: RoomListEntry }, + Pop, // removes the last item + Clear, // clears the list } impl From> for SlidingSyncViewRoomsListDiff { @@ -276,7 +278,8 @@ impl From> for SlidingSyncViewRoomsListDiff { VecDiff::Push { value } => { SlidingSyncViewRoomsListDiff::Push { value: (&value).into() } } - _ => unimplemented!("Clear and Pop aren't provided within sliding sync"), + VecDiff::Pop {} => SlidingSyncViewRoomsListDiff::Pop, + VecDiff::Clear {} => SlidingSyncViewRoomsListDiff::Clear, } } } @@ -634,13 +637,15 @@ impl SlidingSync { impl SlidingSync { #[allow(clippy::significant_drop_in_scrutinee)] pub fn get_view(&self, name: String) -> Option> { - let views = self.inner.views.lock_ref(); - for s in views.iter() { - if s.name == name { - return Some(Arc::new(SlidingSyncView { inner: s.clone() })); - } - } - None + self.inner.view(&name).map(|inner| Arc::new(SlidingSyncView { inner })) + } + + pub fn add_view(&self, view: Arc) -> Option { + self.inner.add_view(view.inner.clone()).map(|u| u as u32) + } + + pub fn pop_view(&self, name: String) -> Option> { + self.inner.pop_view(&name).map(|inner| Arc::new(SlidingSyncView { inner })) } pub fn sync(&self) -> Arc { diff --git a/crates/matrix-sdk/src/sliding_sync.rs b/crates/matrix-sdk/src/sliding_sync.rs index e9291852c..458796d34 100644 --- a/crates/matrix-sdk/src/sliding_sync.rs +++ b/crates/matrix-sdk/src/sliding_sync.rs @@ -693,6 +693,46 @@ impl SlidingSync { .since = Some(since); } + /// Get access to the SlidingSyncView named `view_name` + /// + /// Note: Remember that this list might have been changed since you started + /// listening to the stream and is therefor not necessarily up to date + /// with the views used for the stream. + pub fn view(&self, view_name: &str) -> Option { + self.views.lock_ref().iter().find(|v| v.name == view_name).cloned() + } + + /// Remove the SlidingSyncView named `view_name` from the views list if + /// found + /// + /// Note: Remember that this change will only be applicable for any new + /// stream created after this. The old stream will still continue to use the + /// previous set of views + pub fn pop_view(&self, _view_name: &str) -> Option { + unimplemented!("Index based sliding sync doesn't support removing views"); + } + + /// Add the view to the list of views + /// + /// As views need to have a unique `.name`, if a view with the same name + /// is found the new view will replace the old one and the return will give + /// the position it was found at. If none is found, the view will be pushed + /// to the end and `None` is returned. + /// + /// Note: Remember that this change will only be applicable for any new + /// stream created after this. The old stream will still continue to use the + /// previous set of views + pub fn add_view(&self, view: SlidingSyncView) -> Option { + let mut v = self.views.lock_mut(); + if let Some(idx) = v.iter().position(|v| v.name == view.name) { + v.set_cloned(idx, view); + Some(idx) + } else { + v.push_cloned(view); + None + } + } + /// Lookup a set of rooms pub fn get_rooms>( &self, @@ -707,6 +747,7 @@ impl SlidingSync { resp: v4::Response, extensions: Option, views: &[SlidingSyncView], + ranges: &[Vec<(usize, usize)>], ) -> Result { let mut processed = self.client.process_sliding_sync(resp.clone()).await?; debug!("main client processed."); @@ -721,11 +762,13 @@ impl SlidingSync { .into()); } - for (view, updates) in std::iter::zip(views, &resp.lists) { + for ((view, ranges), updates) in + std::iter::zip(std::iter::zip(views, ranges), &resp.lists) + { let count: u32 = updates.count.try_into().expect("the list total count convertible into u32"); trace!("view {:?} update: {:?}", view.name, !updates.ops.is_empty()); - if view.handle_response(count, &updates.ops)? { + if view.handle_response(count, &updates.ops, ranges)? { updated_views.push(view.name.clone()); } } @@ -794,12 +837,14 @@ impl SlidingSync { debug!(?self.extensions, "Sync loop running"); let mut requests = Vec::new(); + let mut current_ranges = vec![]; let mut new_remaining_generators = Vec::new(); let mut new_remaining_views = Vec::new(); for (mut generator, view) in std::iter::zip(remaining_generators, remaining_views) { - if let Some(request) = generator.next() { + if let Some((request, range)) = generator.next() { requests.push(request); + current_ranges.push(range); new_remaining_generators.push(generator); new_remaining_views.push(view); } @@ -893,7 +938,7 @@ impl SlidingSync { debug!("received"); - let updates = match self.handle_response(resp, extensions, &remaining_views).await { + let updates = match self.handle_response(resp, extensions, &remaining_views, ¤t_ranges).await { Ok(r) => r, Err(e) => { yield Err(e.into()); @@ -1054,6 +1099,12 @@ impl SlidingSyncViewBuilder { self } + /// Set a single range fetch + pub fn set_range>(mut self, from: U, to: U) -> Self { + self.ranges = Some(RangeState::new(vec![(from.into(), to.into())])); + self + } + /// Set the ranges to fetch pub fn add_range>(mut self, from: U, to: U) -> Self { let r = self.ranges.get_or_insert_with(|| RangeState::new(Vec::new())); @@ -1130,7 +1181,7 @@ impl<'a> SlidingSyncViewRequestGenerator<'a> { start: u32, batch_size: u32, limit: Option, - ) -> (u32, v4::SyncRequestList) { + ) -> (u32, (v4::SyncRequestList, Vec<(usize, usize)>)) { let calc_end = start + batch_size; let end = match limit { Some(l) => std::cmp::min(l, calc_end), @@ -1140,30 +1191,44 @@ impl<'a> SlidingSyncViewRequestGenerator<'a> { (end, self.make_request_for_ranges(ranges)) } - fn make_request_for_ranges(&self, ranges: Vec<(UInt, UInt)>) -> v4::SyncRequestList { + fn make_request_for_ranges( + &self, + ranges: Vec<(UInt, UInt)>, + ) -> (v4::SyncRequestList, Vec<(usize, usize)>) { let sort = self.view.sort.clone(); let required_state = self.view.required_state.clone(); let timeline_limit = self.view.timeline_limit; let filters = self.view.filters.clone(); - assign!(v4::SyncRequestList::default(), { - ranges, - required_state, - sort, - timeline_limit, - filters, - }) + ( + assign!(v4::SyncRequestList::default(), { + ranges: ranges.clone(), + required_state, + sort, + timeline_limit, + filters, + }), + ranges + .into_iter() + .map(|(a, b)| { + ( + usize::try_from(a).expect("range is a valid u32"), + usize::try_from(b).expect("range is a valid u32"), + ) + }) + .collect(), + ) } // generate the next live request - fn live_request(&self) -> v4::SyncRequestList { + fn live_request(&self) -> (v4::SyncRequestList, Vec<(usize, usize)>) { let ranges = self.view.ranges.read_only().get_cloned(); self.make_request_for_ranges(ranges) } } impl<'a> Iterator for SlidingSyncViewRequestGenerator<'a> { - type Item = v4::SyncRequestList; + type Item = (v4::SyncRequestList, Vec<(usize, usize)>); fn next(&mut self) -> Option { if let InnerSlidingSyncViewRequestGenerator::PagingFullSync { position, limit, .. } @@ -1311,9 +1376,16 @@ impl SlidingSyncView { } #[instrument(skip(self, ops))] - fn room_ops(&self, ops: &Vec) -> Result<(), Error> { + fn room_ops( + &self, + ops: &Vec, + room_ranges: &Vec<(usize, usize)>, + ) -> Result<(), Error> { let mut rooms_list = self.rooms_list.lock_mut(); let _rooms_map = self.rooms.lock_mut(); + + let index_in_range = + |idx| room_ranges.iter().any(|(start, end)| idx >= *start && idx <= *end); for op in ops { match &op.op { v4::SlidingOp::Sync => { @@ -1375,9 +1447,10 @@ impl SlidingSyncView { loop { // find the next empty slot and drop it let (prev_p, prev_overflow) = pos.overflowing_sub(dif); - let check_prev = !prev_overflow; + let check_prev = !prev_overflow && index_in_range(prev_p); let (next_p, overflown) = pos.overflowing_add(dif); - let check_after = !overflown && next_p < sliced.len(); + let check_after = + !overflown && next_p < sliced.len() && index_in_range(next_p); if !check_prev && !check_after { return Err(Error::BadResponse("We were asked to insert but could not find any direction to shift to".to_owned())); } @@ -1450,7 +1523,12 @@ impl SlidingSyncView { } #[instrument(skip(self, ops))] - fn handle_response(&self, rooms_count: u32, ops: &Vec) -> Result { + fn handle_response( + &self, + rooms_count: u32, + ops: &Vec, + ranges: &Vec<(usize, usize)>, + ) -> Result { let mut missing = rooms_count.checked_sub(self.rooms_list.lock_ref().len() as u32).unwrap_or_default(); let mut changed = false; @@ -1465,7 +1543,7 @@ impl SlidingSyncView { } if !ops.is_empty() { - self.room_ops(ops)?; + self.room_ops(ops, ranges)?; changed = true; } diff --git a/testing/matrix-sdk-integration-testing/Cargo.toml b/testing/matrix-sdk-integration-testing/Cargo.toml index 585a28003..a6814bfb2 100644 --- a/testing/matrix-sdk-integration-testing/Cargo.toml +++ b/testing/matrix-sdk-integration-testing/Cargo.toml @@ -5,7 +5,11 @@ version = "0.1.0" edition = "2021" publish = false -[dev-dependencies] +[features] +# activate to steal helper functions from this crate for other testing +helpers = [] + +[dependencies] anyhow = { workspace = true } assign = "1" ctor = { workspace = true } diff --git a/testing/matrix-sdk-integration-testing/src/lib.rs b/testing/matrix-sdk-integration-testing/src/lib.rs index 5c74eb97b..fbbbf6dbe 100644 --- a/testing/matrix-sdk-integration-testing/src/lib.rs +++ b/testing/matrix-sdk-integration-testing/src/lib.rs @@ -1,5 +1,5 @@ #[cfg(test)] mod tests; -#[cfg(test)] -mod helpers; +#[cfg(any(test, feature = "helpers"))] +pub mod helpers; diff --git a/testing/sliding-sync-integration-test/Cargo.toml b/testing/sliding-sync-integration-test/Cargo.toml new file mode 100644 index 000000000..1922c730c --- /dev/null +++ b/testing/sliding-sync-integration-test/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "sliding-sync-integration-test" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +anyhow = { workspace = true } +ctor = { workspace = true } +matrix-sdk-integration-testing = { path = "../matrix-sdk-integration-testing", features = ["helpers"] } +matrix-sdk = { path = "../../crates/matrix-sdk", features = ["experimental-sliding-sync"] } +tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] } +futures = { version = "0.3.25" } +uuid = { version = "1.2.2" } diff --git a/testing/sliding-sync-integration-test/README.md b/testing/sliding-sync-integration-test/README.md new file mode 100644 index 000000000..8f9a7ebce --- /dev/null +++ b/testing/sliding-sync-integration-test/README.md @@ -0,0 +1,41 @@ +# Matrix SDK sliding sync integration test + +A test harness and tests for integration testing sliding sync against a full synapse server and [sliding sync proxy](https://github.com/matrix-org/sliding-sync/) + + +## Requirements + +This requires a synapse backend with a ci patched configuration and a sliding-sync-proxy. +You can easily get it up and running with `docker-compose` via: + +```sh +docker-compose -f assets/docker-compose.yml up -d +docker-compose -f assets/docker-compose.yml logs --tail 100 -f +``` + +**Patches** +You can see the patches we do to configuration (namely activate registration and +resetting rate limits), check out what `assets/ci-start.sh` changes. + +## Running + +The integration tests can be run with `cargo test` or `cargo nextest run`. + +The integration tests expect the environment variables `HOMESERVER_URL` to be the HTTP URL to +access the synapse server and `HOMESERVER_DOMAIN` to be set to the domain configured in +that server. If you are using the provided `docker-compose`, the default will be fine. + +## Maintenance + +To drop the database of your docker-compose run: + +```bash +docker-compose -f assets/docker-compose.yml stop +docker volume rm -f matrix-rust-sdk-sliding-sync-ci-data +``` + +or simply: + +```bash +docker-compose -f assets/docker-compose.yml down -v +``` diff --git a/testing/sliding-sync-integration-test/assets/Dockerfile b/testing/sliding-sync-integration-test/assets/Dockerfile new file mode 100644 index 000000000..56dd700a7 --- /dev/null +++ b/testing/sliding-sync-integration-test/assets/Dockerfile @@ -0,0 +1,4 @@ +FROM matrixdotorg/synapse:latest +ADD ci-start.sh /ci-start.sh +RUN chmod 770 /ci-start.sh +ENTRYPOINT /ci-start.sh diff --git a/testing/sliding-sync-integration-test/assets/ci-start.sh b/testing/sliding-sync-integration-test/assets/ci-start.sh new file mode 100644 index 000000000..cce1ff4bd --- /dev/null +++ b/testing/sliding-sync-integration-test/assets/ci-start.sh @@ -0,0 +1,52 @@ +#!/bin/bash +set -e +export SYNAPSE_SERVER_NAME=matrix-sdk.rs +export SYNAPSE_REPORT_STATS=no +echo " ====== Generating config ====== " +/start.py generate +echo " ====== Patching for CI ====== " +echo """ +enable_registration: true +enable_registration_without_verification: true + +rc_message: + per_second: 1000 + burst_count: 1000 + +rc_registration: + per_second: 1000 + burst_count: 1000 + +rc_joins: + local: + per_second: 1000 + burst_count: 1000 + +rc_invites: + per_room: + per_second: 1000 + burst_count: 1000 + per_user: + per_second: 1000 + burst_count: 1000 + per_issuer: + per_second: 1000 + burst_count: 1000 + +rc_login: + address: + per_second: 1000 + burst_count: 1000 +# account: +# per_second: 0.17 +# burst_count: 3 +# failed_attempts: +# per_second: 0.17 +# burst_count: 3 + +""" >> /data/homeserver.yaml + +echo " ====== Starting server with: ====== " +cat /data/homeserver.yaml +echo " ====== STARTING ====== " +/start.py run diff --git a/testing/sliding-sync-integration-test/assets/docker-compose.yml b/testing/sliding-sync-integration-test/assets/docker-compose.yml new file mode 100644 index 000000000..0468562eb --- /dev/null +++ b/testing/sliding-sync-integration-test/assets/docker-compose.yml @@ -0,0 +1,42 @@ +version: '3' + +services: + + synapse: + build: . + restart: "no" + healthcheck: + disable: true + volumes: + - matrix-rust-sdk-sliding-sync-ci-data:/data + + ports: + - 8228:8008/tcp + + postgres: + image: postgres + environment: + POSTGRES_PASSWORD: postgres + POSTGRES_USER: postgres + POSTGRES_DB: syncv3 + healthcheck: + test: ["pg_isready"] + interval: 10s + timeout: 5s + retries: 5 + + sliding-sync-proxy: + image: ghcr.io/matrix-org/sliding-sync:v0.98.0 + links: + - synapse + - postgres + environment: + SYNCV3_SERVER: http://synapse:8008 + SYNCV3_SECRET: SUPER_SECRET + SYNCV3_BINDADDR: ":8338" + SYNCV3_DB: "user=postgres password=postgres dbname=syncv3 sslmode=disable host=postgres" + ports: + - 8338:8338 + +volumes: + matrix-rust-sdk-sliding-sync-ci-data: diff --git a/testing/sliding-sync-integration-test/src/lib.rs b/testing/sliding-sync-integration-test/src/lib.rs new file mode 100644 index 000000000..895c96bb1 --- /dev/null +++ b/testing/sliding-sync-integration-test/src/lib.rs @@ -0,0 +1,765 @@ +use matrix_sdk::{ + ruma::api::client::room::create_room::v3::Request as CreateRoomRequest, Client, RoomListEntry, + SlidingSyncBuilder, +}; +use matrix_sdk_integration_testing::helpers::get_client_for_user; + +#[allow(dead_code)] +async fn setup(name: String) -> anyhow::Result<(Client, SlidingSyncBuilder)> { + let sliding_sync_proxy_url = + option_env!("SLIDING_SYNC_PROXY_URL").unwrap_or("http://localhost:8338").to_owned(); + let client = get_client_for_user(name, false).await?; + let sliding_sync_builder = client + .sliding_sync() + .await + .homeserver(sliding_sync_proxy_url.parse()?) + .with_common_extensions(); + Ok((client, sliding_sync_builder)) +} + +#[allow(dead_code)] +async fn random_setup_with_rooms( + number_of_rooms: usize, +) -> anyhow::Result<(Client, SlidingSyncBuilder)> { + let namespace = uuid::Uuid::new_v4().to_string(); + let (client, sliding_sync_builder) = setup(namespace.clone()).await?; + + for room_num in 0..number_of_rooms { + let mut request = CreateRoomRequest::new(); + request.name = Some(format!("{namespace}-{room_num}")); + let _event_id = client.create_room(request).await?; + } + + Ok((client, sliding_sync_builder)) +} + +#[derive(PartialEq, Eq, Debug)] +enum RoomListEntryEasy { + Empty, + Invalid, + Filled, +} + +impl From<&RoomListEntry> for RoomListEntryEasy { + fn from(value: &RoomListEntry) -> Self { + match value { + RoomListEntry::Empty => RoomListEntryEasy::Empty, + RoomListEntry::Invalidated(_) => RoomListEntryEasy::Invalid, + RoomListEntry::Filled(_) => RoomListEntryEasy::Filled, + } + } +} + +#[cfg(test)] +mod tests { + use anyhow::Context; + use futures::{pin_mut, stream::StreamExt}; + use matrix_sdk::{ + ruma::events::room::message::RoomMessageEventContent, SlidingSyncMode, + SlidingSyncViewBuilder, + }; + + use super::*; + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn it_works_smoke_test() -> anyhow::Result<()> { + let (_client, sync_proxy_builder) = setup("odo".to_owned()).await?; + let sync_proxy = sync_proxy_builder.add_fullsync_view().build().await?; + let stream = sync_proxy.stream().await?; + pin_mut!(stream); + let room_summary = + stream.next().await.context("No room summary found, loop ended unsuccessfully")?; + let summary = room_summary?; + assert_eq!(summary.rooms.len(), 0); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn adding_view_later() -> anyhow::Result<()> { + let view_name_1 = "sliding1"; + let view_name_2 = "sliding2"; + let view_name_3 = "sliding3"; + + let (client, sync_proxy_builder) = random_setup_with_rooms(20).await?; + let build_view = |name| { + SlidingSyncViewBuilder::default() + .sync_mode(SlidingSyncMode::Selective) + .set_range(0u32, 10u32) + .sort(vec!["by_recency".to_string(), "by_name".to_string()]) + .name(name) + .build() + }; + let sync_proxy = sync_proxy_builder + .add_view(build_view(view_name_1)?) + .add_view(build_view(view_name_2)?) + .build() + .await?; + let view1 = sync_proxy.view(view_name_1).context("but we just added that view!")?; + let _view2 = sync_proxy.view(view_name_2).context("but we just added that view!")?; + + assert!(sync_proxy.view(view_name_3).is_none()); + + let stream = sync_proxy.stream().await?; + pin_mut!(stream); + let room_summary = + stream.next().await.context("No room summary found, loop ended unsuccessfully")?; + let summary = room_summary?; + // we only heard about the ones we had asked for + assert_eq!(summary.views, [view_name_1, view_name_2]); + + assert!(sync_proxy.add_view(build_view(view_name_3)?).is_none()); + + // we need to restart the stream after every view listing update + let stream = sync_proxy.stream().await?; + pin_mut!(stream); + + let mut saw_update = false; + for _n in 0..2 { + let room_summary = stream.next().await.context("sync has closed unexpectedly")?; + let summary = room_summary?; + // we only heard about the ones we had asked for + if !summary.views.is_empty() { + // only if we saw an update come through + assert_eq!(summary.views, [view_name_3]); + // we didn't update the other views, so only no 2 should se an update + saw_update = true; + break; + } + } + + assert!(saw_update, "We didn't see the updae come through the pipe"); + + // and let's update the order of all views again + let Some(RoomListEntry::Filled(room_id)) = view1 + .rooms_list + .lock_ref() + .iter() + .nth(4) + .map(Clone::clone) else { + panic!("4th room has moved? how?") + }; + + let room = client.get_joined_room(&room_id).context("No joined room {room_id}")?; + + let content = RoomMessageEventContent::text_plain("Hello world"); + + room.send(content, None).await?; // this should put our room up to the most recent + + let mut saw_update = false; + for _n in 0..2 { + let room_summary = stream.next().await.context("sync has closed unexpectedly")?; + let summary = room_summary?; + // we only heard about the ones we had asked for + if !summary.views.is_empty() { + // only if we saw an update come through + assert_eq!(summary.views, [view_name_1, view_name_2, view_name_3,]); + // notice that our view 2 is now the last view, but all have seen updates + saw_update = true; + break; + } + } + + assert!(saw_update, "We didn't see the updae come through the pipe"); + + Ok(()) + } + + // index-based views don't support removing views. Leaving this test for an API + // update later. + // + // #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + // async fn live_views() -> anyhow::Result<()> { + // let view_name_1 = "sliding1"; + // let view_name_2 = "sliding2"; + // let view_name_3 = "sliding3"; + + // let (client, sync_proxy_builder) = random_setup_with_rooms(20).await?; + // let build_view = |name| { + // SlidingSyncViewBuilder::default() + // .sync_mode(SlidingSyncMode::Selective) + // .set_range(0u32, 10u32) + // .sort(vec!["by_recency".to_string(), "by_name".to_string()]) + // .name(name) + // .build() + // }; + // let sync_proxy = sync_proxy_builder + // .add_view(build_view(view_name_1)?) + // .add_view(build_view(view_name_2)?) + // .add_view(build_view(view_name_3)?) + // .build() + // .await?; + // let Some(view1 )= sync_proxy.view(view_name_1) else { + // bail!("but we just added that view!"); + // }; + // let Some(_view2 )= sync_proxy.view(view_name_2) else { + // bail!("but we just added that view!"); + // }; + + // let Some(_view3 )= sync_proxy.view(view_name_3) else { + // bail!("but we just added that view!"); + // }; + + // let stream = sync_proxy.stream().await?; + // pin_mut!(stream); + // let Some(room_summary ) = stream.next().await else { + // bail!("No room summary found, loop ended unsuccessfully"); + // }; + // let summary = room_summary?; + // // we only heard about the ones we had asked for + // assert_eq!(summary.views, [view_name_1, view_name_2, view_name_3]); + + // let Some(view_2) = sync_proxy.pop_view(view_name_2) else { + // bail!("Room exists"); + // }; + + // // we need to restart the stream after every view listing update + // let stream = sync_proxy.stream().await?; + // pin_mut!(stream); + + // // Let's trigger an update by sending a message to room pos=3, making it + // move to // pos 0 + + // let Some(RoomListEntry::Filled(room_id)) = view1 + // .rooms_list + // .lock_ref() + // .iter().nth(3).map(Clone::clone) else + // { + // bail!("2nd room has moved? how?"); + // }; + + // let Some(room) = client.get_joined_room(&room_id) else { + // bail!("No joined room {room_id}"); + // }; + + // let content = RoomMessageEventContent::text_plain("Hello world"); + + // room.send(content, None).await?; // this should put our room up to the + // most recent + + // let mut saw_update = false; + // for _n in 0..2 { + // let Some(room_summary ) = stream.next().await else { + // bail!("sync has closed unexpectedly"); + // }; + // let summary = room_summary?; + // // we only heard about the ones we had asked for + // if !summary.views.is_empty() { + // // only if we saw an update come through + // assert_eq!(summary.views, [view_name_1, view_name_3]); + // saw_update = true; + // break; + // } + // } + + // assert!(saw_update, "We didn't see the updae come through the pipe"); + + // assert!(sync_proxy.add_view(view_2).is_none()); + + // // we need to restart the stream after every view listing update + // let stream = sync_proxy.stream().await?; + // pin_mut!(stream); + + // let mut saw_update = false; + // for _n in 0..2 { + // let Some(room_summary ) = stream.next().await else { + // bail!("sync has closed unexpectedly"); + // }; + // let summary = room_summary?; + // // we only heard about the ones we had asked for + // if !summary.views.is_empty() { + // // only if we saw an update come through + // assert_eq!(summary.views, [view_name_2]); + // // we didn't update the other views, so only no 2 should se an + // update saw_update = true; + // break; + // } + // } + + // assert!(saw_update, "We didn't see the updae come through the pipe"); + + // // and let's update the order of all views again + // let Some(RoomListEntry::Filled(room_id)) = view1 + // .rooms_list + // .lock_ref() + // .iter().nth(4).map(Clone::clone) else + // { + // bail!("4th room has moved? how?"); + // }; + + // let Some(room) = client.get_joined_room(&room_id) else { + // bail!("No joined room {room_id}"); + // }; + + // let content = RoomMessageEventContent::text_plain("Hello world"); + + // room.send(content, None).await?; // this should put our room up to the + // most recent + + // let mut saw_update = false; + // for _n in 0..2 { + // let Some(room_summary ) = stream.next().await else { + // bail!("sync has closed unexpectedly"); + // }; + // let summary = room_summary?; + // // we only heard about the ones we had asked for + // if !summary.views.is_empty() { + // // only if we saw an update come through + // assert_eq!(summary.views, [view_name_1, view_name_3, + // view_name_2]); // notice that our view 2 is now the last + // view, but all have seen updates saw_update = true; + // break; + // } + // } + + // assert!(saw_update, "We didn't see the updae come through the pipe"); + + // Ok(()) + // } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn resizing_sliding_window() -> anyhow::Result<()> { + let (_client, sync_proxy_builder) = random_setup_with_rooms(20).await?; + let sliding_window_view = SlidingSyncViewBuilder::default() + .sync_mode(SlidingSyncMode::Selective) + .set_range(0u32, 10u32) + .sort(vec!["by_recency".to_string(), "by_name".to_string()]) + .name("sliding") + .build()?; + let sync_proxy = sync_proxy_builder.add_view(sliding_window_view).build().await?; + let view = sync_proxy.view("sliding").context("but we just added that view!")?; + let stream = sync_proxy.stream().await?; + pin_mut!(stream); + let room_summary = + stream.next().await.context("No room summary found, loop ended unsuccessfully")?; + let summary = room_summary?; + // we only heard about the ones we had asked for + assert_eq!(summary.rooms.len(), 11); + let collection_simple = view + .rooms_list + .lock_ref() + .iter() + .map(Into::::into) + .collect::>(); + assert_eq!( + collection_simple, + [ + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + ] + ); + + let _signal = view.rooms_list.signal_vec_cloned(); + + // let's move the window + + view.set_range(1, 10); + // Ensure 0-0 invalidation ranges work. + + for _n in 0..2 { + let room_summary = stream.next().await.context("sync has closed unexpectedly")?; + let summary = room_summary?; + // we only heard about the ones we had asked for + if summary.views.iter().any(|s| s == "sliding") { + break; + } + } + + let collection_simple = view + .rooms_list + .lock_ref() + .iter() + .map(Into::::into) + .collect::>(); + assert_eq!( + collection_simple, + [ + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + ] + ); + + view.set_range(5, 10); + + for _n in 0..2 { + let room_summary = stream.next().await.context("sync has closed unexpectedly")?; + let summary = room_summary?; + // we only heard about the ones we had asked for + if summary.views.iter().any(|s| s == "sliding") { + break; + } + } + + let collection_simple = view + .rooms_list + .lock_ref() + .iter() + .map(Into::::into) + .collect::>(); + assert_eq!( + collection_simple, + [ + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + ] + ); + + // let's move the window + + view.set_range(5, 15); + + for _n in 0..2 { + let room_summary = stream.next().await.context("sync has closed unexpectedly")?; + let summary = room_summary?; + // we only heard about the ones we had asked for + if summary.views.iter().any(|s| s == "sliding") { + break; + } + } + + let collection_simple = view + .rooms_list + .lock_ref() + .iter() + .map(Into::::into) + .collect::>(); + assert_eq!( + collection_simple, + [ + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + ] + ); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn moving_out_of_sliding_window() -> anyhow::Result<()> { + let (client, sync_proxy_builder) = random_setup_with_rooms(20).await?; + let sliding_window_view = SlidingSyncViewBuilder::default() + .sync_mode(SlidingSyncMode::Selective) + .set_range(1u32, 10u32) + .sort(vec!["by_recency".to_string(), "by_name".to_string()]) + .name("sliding") + .build()?; + let sync_proxy = sync_proxy_builder.add_view(sliding_window_view).build().await?; + let view = sync_proxy.view("sliding").context("but we just added that view!")?; + let stream = sync_proxy.stream().await?; + pin_mut!(stream); + let room_summary = + stream.next().await.context("No room summary found, loop ended unsuccessfully")?; + let summary = room_summary?; + // we only heard about the ones we had asked for + assert_eq!(summary.rooms.len(), 10); + let collection_simple = view + .rooms_list + .lock_ref() + .iter() + .map(Into::::into) + .collect::>(); + assert_eq!( + collection_simple, + [ + RoomListEntryEasy::Empty, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + ] + ); + + let _signal = view.rooms_list.signal_vec_cloned(); + + // let's move the window + + view.set_range(0, 10); + + for _n in 0..2 { + let room_summary = stream.next().await.context("sync has closed unexpectedly")?; + let summary = room_summary?; + // we only heard about the ones we had asked for + if summary.views.iter().any(|s| s == "sliding") { + break; + } + } + + let collection_simple = view + .rooms_list + .lock_ref() + .iter() + .map(Into::::into) + .collect::>(); + assert_eq!( + collection_simple, + [ + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + ] + ); + + // let's move the window again + + view.set_range(2, 12); + + for _n in 0..2 { + let room_summary = stream.next().await.context("sync has closed unexpectedly")?; + let summary = room_summary?; + // we only heard about the ones we had asked for + if summary.views.iter().any(|s| s == "sliding") { + break; + } + } + + let collection_simple = view + .rooms_list + .lock_ref() + .iter() + .map(Into::::into) + .collect::>(); + assert_eq!( + collection_simple, + [ + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + ] + ); + + // now we "move" the room of pos 3 to pos 0; + // this is a bordering case + + let Some(RoomListEntry::Filled(room_id)) = view + .rooms_list + .lock_ref() + .iter() + .nth(3) + .map(Clone::clone) else + { + panic!("2nd room has moved? how?"); + }; + + let room = client.get_joined_room(&room_id).context("No joined room {room_id}")?; + + let content = RoomMessageEventContent::text_plain("Hello world"); + + room.send(content, None).await?; // this should put our room up to the most recent + + for _n in 0..2 { + let room_summary = stream.next().await.context("sync has closed unexpectedly")?; + let summary = room_summary?; + // we only heard about the ones we had asked for + if summary.views.iter().any(|s| s == "sliding") { + break; + } + } + + let collection_simple = view + .rooms_list + .lock_ref() + .iter() + .map(Into::::into) + .collect::>(); + assert_eq!( + collection_simple, + [ + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + ] + ); + + // items has moved, thus we shouldn't find it where it was + assert!(view.rooms_list.lock_ref().iter().nth(3).unwrap().as_room_id().unwrap() != room_id); + + // let's move the window again + + view.set_range(0, 10); + + for _n in 0..2 { + let room_summary = stream.next().await.context("sync has closed unexpectedly")?; + let summary = room_summary?; + // we only heard about the ones we had asked for + if summary.views.iter().any(|s| s == "sliding") { + break; + } + } + + let collection_simple = view + .rooms_list + .lock_ref() + .iter() + .map(Into::::into) + .collect::>(); + assert_eq!( + collection_simple, + [ + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Filled, + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Invalid, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + RoomListEntryEasy::Empty, + ] + ); + + // and check that our room move has been accepted properly, too. + assert_eq!( + view.rooms_list.lock_ref().iter().next().unwrap().as_room_id().unwrap(), + &room_id + ); + + Ok(()) + } +}