Merge pull request #1335 from gnunicorn/ben-sliding-sync-integration-test

Various smaller fixes on sliding-sync for elem-x:

 - Infrastructure and smoke test only for integration testing sliding-sync
 - Expose Pop and Clear on VecDiff
 - Expose adding views during sliding sync live runtime incl integration tests
 - bug fix on replacing only in-window-items in the insert-code (was leading to a wrong list setting), including integration test
This commit is contained in:
Benjamin Kampmann
2023-01-13 10:11:30 +00:00
committed by GitHub
14 changed files with 1125 additions and 36 deletions

View File

@@ -204,7 +204,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: nextest
args: run --workspace --exclude matrix-sdk-integration-testing
args: run --workspace --exclude matrix-sdk-integration-testing --exclude sliding-sync-integration-test
- name: Test documentation
uses: actions-rs/cargo@v1
@@ -443,3 +443,75 @@ jobs:
with:
command: nextest
args: run -p matrix-sdk-integration-testing
sliding-sync-integration-tests:
name: Sliding Sync Integration test
# disabled until we can figure out the weird docker-not-starting-situation
if: false
# if: github.event_name == 'push' || !github.event.pull_request.draft
runs-on: ubuntu-latest
# Service containers to run with `runner-job`
services:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres
# Provide the password for postgres
env:
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres
POSTGRES_DB: syncv3
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
# Maps tcp port 5432 on service container to the host
- 5432:5432
steps:
- name: Checkout the repo
uses: actions/checkout@v3
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
override: true
- name: Load cache
uses: Swatinem/rust-cache@v1
- name: Install nextest
uses: taiki-e/install-action@nextest
- uses: actions/setup-python@v4
with:
python-version: 3.8
# local synapse
- uses: michaelkaye/setup-matrix-synapse@main
with:
uploadLogs: true
httpPort: 8228
disableRateLimiting: true
# latest sliding sync proxy
- uses: addnab/docker-run-action@v3
with:
registry: gcr.io
image: "matrix-org/sliding-sync:v0.98.0"
docker_network: "host"
options: '-e "SYNCV3_SERVER=http://locahost:8228" -e "SYNCV3_SECRET=SUPER_CI_SECRET" -e "SYNCV3_BINDADDR=:8118" -e "SYNCV3_DB=user=postgres password=postgres dbname=syncv3 sslmode=disable host=postgres" -p 8118:8118'
- name: Test
uses: actions-rs/cargo@v1
with:
command: nextest
args: run -p sliding-sync-integration-tests

View File

@@ -53,7 +53,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: tarpaulin
args: --out Xml
args: --out Xml -e sliding-sync-integration-test
- name: Upload to codecov.io
uses: codecov/codecov-action@v3

13
Cargo.lock generated
View File

@@ -4441,6 +4441,19 @@ dependencies = [
"syntect",
]
[[package]]
name = "sliding-sync-integration-test"
version = "0.1.0"
dependencies = [
"anyhow",
"ctor",
"futures",
"matrix-sdk",
"matrix-sdk-integration-testing",
"tokio",
"uuid",
]
[[package]]
name = "slog"
version = "2.7.0"

View File

@@ -82,9 +82,8 @@ interface SlidingSyncViewRoomsListDiff {
u32 new_index
);
Push(RoomListEntry value);
// The following are supported by the generic VecDiff-type but
// in sliding sync effectively do not happen and thus aren't exposed
// to not pollute the API: Pop(); Clear();
Pop();
Clear();
};
callback interface SlidingSyncViewRoomListObserver {

View File

@@ -250,6 +250,8 @@ pub enum SlidingSyncViewRoomsListDiff {
RemoveAt { index: u32 },
Move { old_index: u32, new_index: u32 },
Push { value: RoomListEntry },
Pop, // removes the last item
Clear, // clears the list
}
impl From<VecDiff<MatrixRoomEntry>> for SlidingSyncViewRoomsListDiff {
@@ -276,7 +278,8 @@ impl From<VecDiff<MatrixRoomEntry>> for SlidingSyncViewRoomsListDiff {
VecDiff::Push { value } => {
SlidingSyncViewRoomsListDiff::Push { value: (&value).into() }
}
_ => unimplemented!("Clear and Pop aren't provided within sliding sync"),
VecDiff::Pop {} => SlidingSyncViewRoomsListDiff::Pop,
VecDiff::Clear {} => SlidingSyncViewRoomsListDiff::Clear,
}
}
}
@@ -634,13 +637,15 @@ impl SlidingSync {
impl SlidingSync {
#[allow(clippy::significant_drop_in_scrutinee)]
pub fn get_view(&self, name: String) -> Option<Arc<SlidingSyncView>> {
let views = self.inner.views.lock_ref();
for s in views.iter() {
if s.name == name {
return Some(Arc::new(SlidingSyncView { inner: s.clone() }));
}
}
None
self.inner.view(&name).map(|inner| Arc::new(SlidingSyncView { inner }))
}
pub fn add_view(&self, view: Arc<SlidingSyncView>) -> Option<u32> {
self.inner.add_view(view.inner.clone()).map(|u| u as u32)
}
pub fn pop_view(&self, name: String) -> Option<Arc<SlidingSyncView>> {
self.inner.pop_view(&name).map(|inner| Arc::new(SlidingSyncView { inner }))
}
pub fn sync(&self) -> Arc<StoppableSpawn> {

View File

@@ -693,6 +693,46 @@ impl SlidingSync {
.since = Some(since);
}
/// Get access to the SlidingSyncView named `view_name`
///
/// Note: Remember that this list might have been changed since you started
/// listening to the stream and is therefor not necessarily up to date
/// with the views used for the stream.
pub fn view(&self, view_name: &str) -> Option<SlidingSyncView> {
self.views.lock_ref().iter().find(|v| v.name == view_name).cloned()
}
/// Remove the SlidingSyncView named `view_name` from the views list if
/// found
///
/// Note: Remember that this change will only be applicable for any new
/// stream created after this. The old stream will still continue to use the
/// previous set of views
pub fn pop_view(&self, _view_name: &str) -> Option<SlidingSyncView> {
unimplemented!("Index based sliding sync doesn't support removing views");
}
/// Add the view to the list of views
///
/// As views need to have a unique `.name`, if a view with the same name
/// is found the new view will replace the old one and the return will give
/// the position it was found at. If none is found, the view will be pushed
/// to the end and `None` is returned.
///
/// Note: Remember that this change will only be applicable for any new
/// stream created after this. The old stream will still continue to use the
/// previous set of views
pub fn add_view(&self, view: SlidingSyncView) -> Option<usize> {
let mut v = self.views.lock_mut();
if let Some(idx) = v.iter().position(|v| v.name == view.name) {
v.set_cloned(idx, view);
Some(idx)
} else {
v.push_cloned(view);
None
}
}
/// Lookup a set of rooms
pub fn get_rooms<I: Iterator<Item = OwnedRoomId>>(
&self,
@@ -707,6 +747,7 @@ impl SlidingSync {
resp: v4::Response,
extensions: Option<ExtensionsConfig>,
views: &[SlidingSyncView],
ranges: &[Vec<(usize, usize)>],
) -> Result<UpdateSummary, crate::Error> {
let mut processed = self.client.process_sliding_sync(resp.clone()).await?;
debug!("main client processed.");
@@ -721,11 +762,13 @@ impl SlidingSync {
.into());
}
for (view, updates) in std::iter::zip(views, &resp.lists) {
for ((view, ranges), updates) in
std::iter::zip(std::iter::zip(views, ranges), &resp.lists)
{
let count: u32 =
updates.count.try_into().expect("the list total count convertible into u32");
trace!("view {:?} update: {:?}", view.name, !updates.ops.is_empty());
if view.handle_response(count, &updates.ops)? {
if view.handle_response(count, &updates.ops, ranges)? {
updated_views.push(view.name.clone());
}
}
@@ -794,12 +837,14 @@ impl SlidingSync {
debug!(?self.extensions, "Sync loop running");
let mut requests = Vec::new();
let mut current_ranges = vec![];
let mut new_remaining_generators = Vec::new();
let mut new_remaining_views = Vec::new();
for (mut generator, view) in std::iter::zip(remaining_generators, remaining_views) {
if let Some(request) = generator.next() {
if let Some((request, range)) = generator.next() {
requests.push(request);
current_ranges.push(range);
new_remaining_generators.push(generator);
new_remaining_views.push(view);
}
@@ -893,7 +938,7 @@ impl SlidingSync {
debug!("received");
let updates = match self.handle_response(resp, extensions, &remaining_views).await {
let updates = match self.handle_response(resp, extensions, &remaining_views, &current_ranges).await {
Ok(r) => r,
Err(e) => {
yield Err(e.into());
@@ -1054,6 +1099,12 @@ impl SlidingSyncViewBuilder {
self
}
/// Set a single range fetch
pub fn set_range<U: Into<UInt>>(mut self, from: U, to: U) -> Self {
self.ranges = Some(RangeState::new(vec![(from.into(), to.into())]));
self
}
/// Set the ranges to fetch
pub fn add_range<U: Into<UInt>>(mut self, from: U, to: U) -> Self {
let r = self.ranges.get_or_insert_with(|| RangeState::new(Vec::new()));
@@ -1130,7 +1181,7 @@ impl<'a> SlidingSyncViewRequestGenerator<'a> {
start: u32,
batch_size: u32,
limit: Option<u32>,
) -> (u32, v4::SyncRequestList) {
) -> (u32, (v4::SyncRequestList, Vec<(usize, usize)>)) {
let calc_end = start + batch_size;
let end = match limit {
Some(l) => std::cmp::min(l, calc_end),
@@ -1140,30 +1191,44 @@ impl<'a> SlidingSyncViewRequestGenerator<'a> {
(end, self.make_request_for_ranges(ranges))
}
fn make_request_for_ranges(&self, ranges: Vec<(UInt, UInt)>) -> v4::SyncRequestList {
fn make_request_for_ranges(
&self,
ranges: Vec<(UInt, UInt)>,
) -> (v4::SyncRequestList, Vec<(usize, usize)>) {
let sort = self.view.sort.clone();
let required_state = self.view.required_state.clone();
let timeline_limit = self.view.timeline_limit;
let filters = self.view.filters.clone();
assign!(v4::SyncRequestList::default(), {
ranges,
required_state,
sort,
timeline_limit,
filters,
})
(
assign!(v4::SyncRequestList::default(), {
ranges: ranges.clone(),
required_state,
sort,
timeline_limit,
filters,
}),
ranges
.into_iter()
.map(|(a, b)| {
(
usize::try_from(a).expect("range is a valid u32"),
usize::try_from(b).expect("range is a valid u32"),
)
})
.collect(),
)
}
// generate the next live request
fn live_request(&self) -> v4::SyncRequestList {
fn live_request(&self) -> (v4::SyncRequestList, Vec<(usize, usize)>) {
let ranges = self.view.ranges.read_only().get_cloned();
self.make_request_for_ranges(ranges)
}
}
impl<'a> Iterator for SlidingSyncViewRequestGenerator<'a> {
type Item = v4::SyncRequestList;
type Item = (v4::SyncRequestList, Vec<(usize, usize)>);
fn next(&mut self) -> Option<Self::Item> {
if let InnerSlidingSyncViewRequestGenerator::PagingFullSync { position, limit, .. }
@@ -1311,9 +1376,16 @@ impl SlidingSyncView {
}
#[instrument(skip(self, ops))]
fn room_ops(&self, ops: &Vec<v4::SyncOp>) -> Result<(), Error> {
fn room_ops(
&self,
ops: &Vec<v4::SyncOp>,
room_ranges: &Vec<(usize, usize)>,
) -> Result<(), Error> {
let mut rooms_list = self.rooms_list.lock_mut();
let _rooms_map = self.rooms.lock_mut();
let index_in_range =
|idx| room_ranges.iter().any(|(start, end)| idx >= *start && idx <= *end);
for op in ops {
match &op.op {
v4::SlidingOp::Sync => {
@@ -1375,9 +1447,10 @@ impl SlidingSyncView {
loop {
// find the next empty slot and drop it
let (prev_p, prev_overflow) = pos.overflowing_sub(dif);
let check_prev = !prev_overflow;
let check_prev = !prev_overflow && index_in_range(prev_p);
let (next_p, overflown) = pos.overflowing_add(dif);
let check_after = !overflown && next_p < sliced.len();
let check_after =
!overflown && next_p < sliced.len() && index_in_range(next_p);
if !check_prev && !check_after {
return Err(Error::BadResponse("We were asked to insert but could not find any direction to shift to".to_owned()));
}
@@ -1450,7 +1523,12 @@ impl SlidingSyncView {
}
#[instrument(skip(self, ops))]
fn handle_response(&self, rooms_count: u32, ops: &Vec<v4::SyncOp>) -> Result<bool, Error> {
fn handle_response(
&self,
rooms_count: u32,
ops: &Vec<v4::SyncOp>,
ranges: &Vec<(usize, usize)>,
) -> Result<bool, Error> {
let mut missing =
rooms_count.checked_sub(self.rooms_list.lock_ref().len() as u32).unwrap_or_default();
let mut changed = false;
@@ -1465,7 +1543,7 @@ impl SlidingSyncView {
}
if !ops.is_empty() {
self.room_ops(ops)?;
self.room_ops(ops, ranges)?;
changed = true;
}

View File

@@ -5,7 +5,11 @@ version = "0.1.0"
edition = "2021"
publish = false
[dev-dependencies]
[features]
# activate to steal helper functions from this crate for other testing
helpers = []
[dependencies]
anyhow = { workspace = true }
assign = "1"
ctor = { workspace = true }

View File

@@ -1,5 +1,5 @@
#[cfg(test)]
mod tests;
#[cfg(test)]
mod helpers;
#[cfg(any(test, feature = "helpers"))]
pub mod helpers;

View File

@@ -0,0 +1,14 @@
[package]
name = "sliding-sync-integration-test"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
anyhow = { workspace = true }
ctor = { workspace = true }
matrix-sdk-integration-testing = { path = "../matrix-sdk-integration-testing", features = ["helpers"] }
matrix-sdk = { path = "../../crates/matrix-sdk", features = ["experimental-sliding-sync"] }
tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] }
futures = { version = "0.3.25" }
uuid = { version = "1.2.2" }

View File

@@ -0,0 +1,41 @@
# Matrix SDK sliding sync integration test
A test harness and tests for integration testing sliding sync against a full synapse server and [sliding sync proxy](https://github.com/matrix-org/sliding-sync/)
## Requirements
This requires a synapse backend with a ci patched configuration and a sliding-sync-proxy.
You can easily get it up and running with `docker-compose` via:
```sh
docker-compose -f assets/docker-compose.yml up -d
docker-compose -f assets/docker-compose.yml logs --tail 100 -f
```
**Patches**
You can see the patches we do to configuration (namely activate registration and
resetting rate limits), check out what `assets/ci-start.sh` changes.
## Running
The integration tests can be run with `cargo test` or `cargo nextest run`.
The integration tests expect the environment variables `HOMESERVER_URL` to be the HTTP URL to
access the synapse server and `HOMESERVER_DOMAIN` to be set to the domain configured in
that server. If you are using the provided `docker-compose`, the default will be fine.
## Maintenance
To drop the database of your docker-compose run:
```bash
docker-compose -f assets/docker-compose.yml stop
docker volume rm -f matrix-rust-sdk-sliding-sync-ci-data
```
or simply:
```bash
docker-compose -f assets/docker-compose.yml down -v
```

View File

@@ -0,0 +1,4 @@
FROM matrixdotorg/synapse:latest
ADD ci-start.sh /ci-start.sh
RUN chmod 770 /ci-start.sh
ENTRYPOINT /ci-start.sh

View File

@@ -0,0 +1,52 @@
#!/bin/bash
set -e
export SYNAPSE_SERVER_NAME=matrix-sdk.rs
export SYNAPSE_REPORT_STATS=no
echo " ====== Generating config ====== "
/start.py generate
echo " ====== Patching for CI ====== "
echo """
enable_registration: true
enable_registration_without_verification: true
rc_message:
per_second: 1000
burst_count: 1000
rc_registration:
per_second: 1000
burst_count: 1000
rc_joins:
local:
per_second: 1000
burst_count: 1000
rc_invites:
per_room:
per_second: 1000
burst_count: 1000
per_user:
per_second: 1000
burst_count: 1000
per_issuer:
per_second: 1000
burst_count: 1000
rc_login:
address:
per_second: 1000
burst_count: 1000
# account:
# per_second: 0.17
# burst_count: 3
# failed_attempts:
# per_second: 0.17
# burst_count: 3
""" >> /data/homeserver.yaml
echo " ====== Starting server with: ====== "
cat /data/homeserver.yaml
echo " ====== STARTING ====== "
/start.py run

View File

@@ -0,0 +1,42 @@
version: '3'
services:
synapse:
build: .
restart: "no"
healthcheck:
disable: true
volumes:
- matrix-rust-sdk-sliding-sync-ci-data:/data
ports:
- 8228:8008/tcp
postgres:
image: postgres
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres
POSTGRES_DB: syncv3
healthcheck:
test: ["pg_isready"]
interval: 10s
timeout: 5s
retries: 5
sliding-sync-proxy:
image: ghcr.io/matrix-org/sliding-sync:v0.98.0
links:
- synapse
- postgres
environment:
SYNCV3_SERVER: http://synapse:8008
SYNCV3_SECRET: SUPER_SECRET
SYNCV3_BINDADDR: ":8338"
SYNCV3_DB: "user=postgres password=postgres dbname=syncv3 sslmode=disable host=postgres"
ports:
- 8338:8338
volumes:
matrix-rust-sdk-sliding-sync-ci-data:

View File

@@ -0,0 +1,765 @@
use matrix_sdk::{
ruma::api::client::room::create_room::v3::Request as CreateRoomRequest, Client, RoomListEntry,
SlidingSyncBuilder,
};
use matrix_sdk_integration_testing::helpers::get_client_for_user;
#[allow(dead_code)]
async fn setup(name: String) -> anyhow::Result<(Client, SlidingSyncBuilder)> {
let sliding_sync_proxy_url =
option_env!("SLIDING_SYNC_PROXY_URL").unwrap_or("http://localhost:8338").to_owned();
let client = get_client_for_user(name, false).await?;
let sliding_sync_builder = client
.sliding_sync()
.await
.homeserver(sliding_sync_proxy_url.parse()?)
.with_common_extensions();
Ok((client, sliding_sync_builder))
}
#[allow(dead_code)]
async fn random_setup_with_rooms(
number_of_rooms: usize,
) -> anyhow::Result<(Client, SlidingSyncBuilder)> {
let namespace = uuid::Uuid::new_v4().to_string();
let (client, sliding_sync_builder) = setup(namespace.clone()).await?;
for room_num in 0..number_of_rooms {
let mut request = CreateRoomRequest::new();
request.name = Some(format!("{namespace}-{room_num}"));
let _event_id = client.create_room(request).await?;
}
Ok((client, sliding_sync_builder))
}
#[derive(PartialEq, Eq, Debug)]
enum RoomListEntryEasy {
Empty,
Invalid,
Filled,
}
impl From<&RoomListEntry> for RoomListEntryEasy {
fn from(value: &RoomListEntry) -> Self {
match value {
RoomListEntry::Empty => RoomListEntryEasy::Empty,
RoomListEntry::Invalidated(_) => RoomListEntryEasy::Invalid,
RoomListEntry::Filled(_) => RoomListEntryEasy::Filled,
}
}
}
#[cfg(test)]
mod tests {
use anyhow::Context;
use futures::{pin_mut, stream::StreamExt};
use matrix_sdk::{
ruma::events::room::message::RoomMessageEventContent, SlidingSyncMode,
SlidingSyncViewBuilder,
};
use super::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn it_works_smoke_test() -> anyhow::Result<()> {
let (_client, sync_proxy_builder) = setup("odo".to_owned()).await?;
let sync_proxy = sync_proxy_builder.add_fullsync_view().build().await?;
let stream = sync_proxy.stream().await?;
pin_mut!(stream);
let room_summary =
stream.next().await.context("No room summary found, loop ended unsuccessfully")?;
let summary = room_summary?;
assert_eq!(summary.rooms.len(), 0);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn adding_view_later() -> anyhow::Result<()> {
let view_name_1 = "sliding1";
let view_name_2 = "sliding2";
let view_name_3 = "sliding3";
let (client, sync_proxy_builder) = random_setup_with_rooms(20).await?;
let build_view = |name| {
SlidingSyncViewBuilder::default()
.sync_mode(SlidingSyncMode::Selective)
.set_range(0u32, 10u32)
.sort(vec!["by_recency".to_string(), "by_name".to_string()])
.name(name)
.build()
};
let sync_proxy = sync_proxy_builder
.add_view(build_view(view_name_1)?)
.add_view(build_view(view_name_2)?)
.build()
.await?;
let view1 = sync_proxy.view(view_name_1).context("but we just added that view!")?;
let _view2 = sync_proxy.view(view_name_2).context("but we just added that view!")?;
assert!(sync_proxy.view(view_name_3).is_none());
let stream = sync_proxy.stream().await?;
pin_mut!(stream);
let room_summary =
stream.next().await.context("No room summary found, loop ended unsuccessfully")?;
let summary = room_summary?;
// we only heard about the ones we had asked for
assert_eq!(summary.views, [view_name_1, view_name_2]);
assert!(sync_proxy.add_view(build_view(view_name_3)?).is_none());
// we need to restart the stream after every view listing update
let stream = sync_proxy.stream().await?;
pin_mut!(stream);
let mut saw_update = false;
for _n in 0..2 {
let room_summary = stream.next().await.context("sync has closed unexpectedly")?;
let summary = room_summary?;
// we only heard about the ones we had asked for
if !summary.views.is_empty() {
// only if we saw an update come through
assert_eq!(summary.views, [view_name_3]);
// we didn't update the other views, so only no 2 should se an update
saw_update = true;
break;
}
}
assert!(saw_update, "We didn't see the updae come through the pipe");
// and let's update the order of all views again
let Some(RoomListEntry::Filled(room_id)) = view1
.rooms_list
.lock_ref()
.iter()
.nth(4)
.map(Clone::clone) else {
panic!("4th room has moved? how?")
};
let room = client.get_joined_room(&room_id).context("No joined room {room_id}")?;
let content = RoomMessageEventContent::text_plain("Hello world");
room.send(content, None).await?; // this should put our room up to the most recent
let mut saw_update = false;
for _n in 0..2 {
let room_summary = stream.next().await.context("sync has closed unexpectedly")?;
let summary = room_summary?;
// we only heard about the ones we had asked for
if !summary.views.is_empty() {
// only if we saw an update come through
assert_eq!(summary.views, [view_name_1, view_name_2, view_name_3,]);
// notice that our view 2 is now the last view, but all have seen updates
saw_update = true;
break;
}
}
assert!(saw_update, "We didn't see the updae come through the pipe");
Ok(())
}
// index-based views don't support removing views. Leaving this test for an API
// update later.
//
// #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
// async fn live_views() -> anyhow::Result<()> {
// let view_name_1 = "sliding1";
// let view_name_2 = "sliding2";
// let view_name_3 = "sliding3";
// let (client, sync_proxy_builder) = random_setup_with_rooms(20).await?;
// let build_view = |name| {
// SlidingSyncViewBuilder::default()
// .sync_mode(SlidingSyncMode::Selective)
// .set_range(0u32, 10u32)
// .sort(vec!["by_recency".to_string(), "by_name".to_string()])
// .name(name)
// .build()
// };
// let sync_proxy = sync_proxy_builder
// .add_view(build_view(view_name_1)?)
// .add_view(build_view(view_name_2)?)
// .add_view(build_view(view_name_3)?)
// .build()
// .await?;
// let Some(view1 )= sync_proxy.view(view_name_1) else {
// bail!("but we just added that view!");
// };
// let Some(_view2 )= sync_proxy.view(view_name_2) else {
// bail!("but we just added that view!");
// };
// let Some(_view3 )= sync_proxy.view(view_name_3) else {
// bail!("but we just added that view!");
// };
// let stream = sync_proxy.stream().await?;
// pin_mut!(stream);
// let Some(room_summary ) = stream.next().await else {
// bail!("No room summary found, loop ended unsuccessfully");
// };
// let summary = room_summary?;
// // we only heard about the ones we had asked for
// assert_eq!(summary.views, [view_name_1, view_name_2, view_name_3]);
// let Some(view_2) = sync_proxy.pop_view(view_name_2) else {
// bail!("Room exists");
// };
// // we need to restart the stream after every view listing update
// let stream = sync_proxy.stream().await?;
// pin_mut!(stream);
// // Let's trigger an update by sending a message to room pos=3, making it
// move to // pos 0
// let Some(RoomListEntry::Filled(room_id)) = view1
// .rooms_list
// .lock_ref()
// .iter().nth(3).map(Clone::clone) else
// {
// bail!("2nd room has moved? how?");
// };
// let Some(room) = client.get_joined_room(&room_id) else {
// bail!("No joined room {room_id}");
// };
// let content = RoomMessageEventContent::text_plain("Hello world");
// room.send(content, None).await?; // this should put our room up to the
// most recent
// let mut saw_update = false;
// for _n in 0..2 {
// let Some(room_summary ) = stream.next().await else {
// bail!("sync has closed unexpectedly");
// };
// let summary = room_summary?;
// // we only heard about the ones we had asked for
// if !summary.views.is_empty() {
// // only if we saw an update come through
// assert_eq!(summary.views, [view_name_1, view_name_3]);
// saw_update = true;
// break;
// }
// }
// assert!(saw_update, "We didn't see the updae come through the pipe");
// assert!(sync_proxy.add_view(view_2).is_none());
// // we need to restart the stream after every view listing update
// let stream = sync_proxy.stream().await?;
// pin_mut!(stream);
// let mut saw_update = false;
// for _n in 0..2 {
// let Some(room_summary ) = stream.next().await else {
// bail!("sync has closed unexpectedly");
// };
// let summary = room_summary?;
// // we only heard about the ones we had asked for
// if !summary.views.is_empty() {
// // only if we saw an update come through
// assert_eq!(summary.views, [view_name_2]);
// // we didn't update the other views, so only no 2 should se an
// update saw_update = true;
// break;
// }
// }
// assert!(saw_update, "We didn't see the updae come through the pipe");
// // and let's update the order of all views again
// let Some(RoomListEntry::Filled(room_id)) = view1
// .rooms_list
// .lock_ref()
// .iter().nth(4).map(Clone::clone) else
// {
// bail!("4th room has moved? how?");
// };
// let Some(room) = client.get_joined_room(&room_id) else {
// bail!("No joined room {room_id}");
// };
// let content = RoomMessageEventContent::text_plain("Hello world");
// room.send(content, None).await?; // this should put our room up to the
// most recent
// let mut saw_update = false;
// for _n in 0..2 {
// let Some(room_summary ) = stream.next().await else {
// bail!("sync has closed unexpectedly");
// };
// let summary = room_summary?;
// // we only heard about the ones we had asked for
// if !summary.views.is_empty() {
// // only if we saw an update come through
// assert_eq!(summary.views, [view_name_1, view_name_3,
// view_name_2]); // notice that our view 2 is now the last
// view, but all have seen updates saw_update = true;
// break;
// }
// }
// assert!(saw_update, "We didn't see the updae come through the pipe");
// Ok(())
// }
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn resizing_sliding_window() -> anyhow::Result<()> {
let (_client, sync_proxy_builder) = random_setup_with_rooms(20).await?;
let sliding_window_view = SlidingSyncViewBuilder::default()
.sync_mode(SlidingSyncMode::Selective)
.set_range(0u32, 10u32)
.sort(vec!["by_recency".to_string(), "by_name".to_string()])
.name("sliding")
.build()?;
let sync_proxy = sync_proxy_builder.add_view(sliding_window_view).build().await?;
let view = sync_proxy.view("sliding").context("but we just added that view!")?;
let stream = sync_proxy.stream().await?;
pin_mut!(stream);
let room_summary =
stream.next().await.context("No room summary found, loop ended unsuccessfully")?;
let summary = room_summary?;
// we only heard about the ones we had asked for
assert_eq!(summary.rooms.len(), 11);
let collection_simple = view
.rooms_list
.lock_ref()
.iter()
.map(Into::<RoomListEntryEasy>::into)
.collect::<Vec<_>>();
assert_eq!(
collection_simple,
[
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
]
);
let _signal = view.rooms_list.signal_vec_cloned();
// let's move the window
view.set_range(1, 10);
// Ensure 0-0 invalidation ranges work.
for _n in 0..2 {
let room_summary = stream.next().await.context("sync has closed unexpectedly")?;
let summary = room_summary?;
// we only heard about the ones we had asked for
if summary.views.iter().any(|s| s == "sliding") {
break;
}
}
let collection_simple = view
.rooms_list
.lock_ref()
.iter()
.map(Into::<RoomListEntryEasy>::into)
.collect::<Vec<_>>();
assert_eq!(
collection_simple,
[
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
]
);
view.set_range(5, 10);
for _n in 0..2 {
let room_summary = stream.next().await.context("sync has closed unexpectedly")?;
let summary = room_summary?;
// we only heard about the ones we had asked for
if summary.views.iter().any(|s| s == "sliding") {
break;
}
}
let collection_simple = view
.rooms_list
.lock_ref()
.iter()
.map(Into::<RoomListEntryEasy>::into)
.collect::<Vec<_>>();
assert_eq!(
collection_simple,
[
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
]
);
// let's move the window
view.set_range(5, 15);
for _n in 0..2 {
let room_summary = stream.next().await.context("sync has closed unexpectedly")?;
let summary = room_summary?;
// we only heard about the ones we had asked for
if summary.views.iter().any(|s| s == "sliding") {
break;
}
}
let collection_simple = view
.rooms_list
.lock_ref()
.iter()
.map(Into::<RoomListEntryEasy>::into)
.collect::<Vec<_>>();
assert_eq!(
collection_simple,
[
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
]
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn moving_out_of_sliding_window() -> anyhow::Result<()> {
let (client, sync_proxy_builder) = random_setup_with_rooms(20).await?;
let sliding_window_view = SlidingSyncViewBuilder::default()
.sync_mode(SlidingSyncMode::Selective)
.set_range(1u32, 10u32)
.sort(vec!["by_recency".to_string(), "by_name".to_string()])
.name("sliding")
.build()?;
let sync_proxy = sync_proxy_builder.add_view(sliding_window_view).build().await?;
let view = sync_proxy.view("sliding").context("but we just added that view!")?;
let stream = sync_proxy.stream().await?;
pin_mut!(stream);
let room_summary =
stream.next().await.context("No room summary found, loop ended unsuccessfully")?;
let summary = room_summary?;
// we only heard about the ones we had asked for
assert_eq!(summary.rooms.len(), 10);
let collection_simple = view
.rooms_list
.lock_ref()
.iter()
.map(Into::<RoomListEntryEasy>::into)
.collect::<Vec<_>>();
assert_eq!(
collection_simple,
[
RoomListEntryEasy::Empty,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
]
);
let _signal = view.rooms_list.signal_vec_cloned();
// let's move the window
view.set_range(0, 10);
for _n in 0..2 {
let room_summary = stream.next().await.context("sync has closed unexpectedly")?;
let summary = room_summary?;
// we only heard about the ones we had asked for
if summary.views.iter().any(|s| s == "sliding") {
break;
}
}
let collection_simple = view
.rooms_list
.lock_ref()
.iter()
.map(Into::<RoomListEntryEasy>::into)
.collect::<Vec<_>>();
assert_eq!(
collection_simple,
[
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
]
);
// let's move the window again
view.set_range(2, 12);
for _n in 0..2 {
let room_summary = stream.next().await.context("sync has closed unexpectedly")?;
let summary = room_summary?;
// we only heard about the ones we had asked for
if summary.views.iter().any(|s| s == "sliding") {
break;
}
}
let collection_simple = view
.rooms_list
.lock_ref()
.iter()
.map(Into::<RoomListEntryEasy>::into)
.collect::<Vec<_>>();
assert_eq!(
collection_simple,
[
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
]
);
// now we "move" the room of pos 3 to pos 0;
// this is a bordering case
let Some(RoomListEntry::Filled(room_id)) = view
.rooms_list
.lock_ref()
.iter()
.nth(3)
.map(Clone::clone) else
{
panic!("2nd room has moved? how?");
};
let room = client.get_joined_room(&room_id).context("No joined room {room_id}")?;
let content = RoomMessageEventContent::text_plain("Hello world");
room.send(content, None).await?; // this should put our room up to the most recent
for _n in 0..2 {
let room_summary = stream.next().await.context("sync has closed unexpectedly")?;
let summary = room_summary?;
// we only heard about the ones we had asked for
if summary.views.iter().any(|s| s == "sliding") {
break;
}
}
let collection_simple = view
.rooms_list
.lock_ref()
.iter()
.map(Into::<RoomListEntryEasy>::into)
.collect::<Vec<_>>();
assert_eq!(
collection_simple,
[
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
]
);
// items has moved, thus we shouldn't find it where it was
assert!(view.rooms_list.lock_ref().iter().nth(3).unwrap().as_room_id().unwrap() != room_id);
// let's move the window again
view.set_range(0, 10);
for _n in 0..2 {
let room_summary = stream.next().await.context("sync has closed unexpectedly")?;
let summary = room_summary?;
// we only heard about the ones we had asked for
if summary.views.iter().any(|s| s == "sliding") {
break;
}
}
let collection_simple = view
.rooms_list
.lock_ref()
.iter()
.map(Into::<RoomListEntryEasy>::into)
.collect::<Vec<_>>();
assert_eq!(
collection_simple,
[
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Filled,
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Invalid,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
RoomListEntryEasy::Empty,
]
);
// and check that our room move has been accepted properly, too.
assert_eq!(
view.rooms_list.lock_ref().iter().next().unwrap().as_room_id().unwrap(),
&room_id
);
Ok(())
}
}