From 8a9b72ae6c8d9f55a40cfe396bdc55163e2834ad Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Wed, 1 Feb 2023 17:35:03 +0100 Subject: [PATCH 1/2] test(sliding-sync): ensure unknown pos does recover the room subscription properly --- .../sliding-sync-integration-test/src/lib.rs | 178 ++++++++++++++++++ 1 file changed, 178 insertions(+) diff --git a/testing/sliding-sync-integration-test/src/lib.rs b/testing/sliding-sync-integration-test/src/lib.rs index 4eaa880ed..3a5f72695 100644 --- a/testing/sliding-sync-integration-test/src/lib.rs +++ b/testing/sliding-sync-integration-test/src/lib.rs @@ -1035,4 +1035,182 @@ mod tests { Ok(()) } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn restart_room_resubscription() -> anyhow::Result<()> { + let (client, sync_proxy_builder) = random_setup_with_rooms(3).await?; + let sliding_window_view = SlidingSyncViewBuilder::default() + .sync_mode(SlidingSyncMode::Selective) + .set_range(0u32, 2u32) + .sort(vec!["by_recency".to_string(), "by_name".to_string()]) + .name("sliding") + .build()?; + let sync_proxy = sync_proxy_builder.add_view(sliding_window_view).build().await?; + let view = sync_proxy.view("sliding").context("but we just added that view!")?; + let stream = sync_proxy.stream(); + pin_mut!(stream); + let room_summary = + stream.next().await.context("No room summary found, loop ended unsuccessfully")?; + let summary = room_summary?; + // we only heard about the ones we had asked for + assert_eq!(summary.rooms.len(), 3); + let collection_simple = view + .rooms_list + .lock_ref() + .iter() + .map(Into::::into) + .collect::>(); + assert_eq!( + collection_simple, + repeat(RoomListEntryEasy::Filled).take(3).collect::>() + ); + + let _signal = view.rooms_list.signal_vec_cloned(); + + // let's move the window + + view.set_range(1, 2); + + for _n in 0..2 { + let room_summary = stream.next().await.context("sync has closed unexpectedly")?; + let summary = room_summary?; + // we only heard about the ones we had asked for + if summary.views.iter().any(|s| s == "sliding") { + break; + } + } + + let collection_simple = view + .rooms_list + .lock_ref() + .iter() + .map(Into::::into) + .collect::>(); + assert_eq!( + collection_simple, + repeat(RoomListEntryEasy::Invalid) + .take(1) + .chain(repeat(RoomListEntryEasy::Filled).take(2)) + .collect::>() + ); + + // let's get that first entry + + let Some(RoomListEntry::Invalidated(room_id)) = view + .rooms_list + .lock_ref() + .iter() + .next() + .map(Clone::clone) else + { + panic!("2nd room has moved? how?"); + }; + + // send a message + + let room = client.get_joined_room(&room_id).context("No joined room {room_id}")?; + + let content = RoomMessageEventContent::text_plain("Hello world"); + + room.send(content, None).await?; // this should put our room up to the most recent + + // let's subscribe + + sync_proxy.subscribe(room_id.clone(), Default::default()); + + let mut room_updated = false; + + for _n in 0..2 { + let room_summary = stream.next().await.context("sync has closed unexpectedly")?; + let summary = room_summary?; + // we only heard about the ones we had asked for + if summary.rooms.iter().any(|s| s == &room_id) { + room_updated = true; + break; + } + } + + assert!(room_updated, "Room update has not been seen"); + + // force the pos to be invalid and thus this being reset internally + force_sliding_sync_pos(&sync_proxy, "100".to_owned()); + let mut error_seen = false; + let mut room_updated = false; + for _n in 0..2 { + let summary = match stream.next().await { + Some(Ok(e)) => e, + Some(Err(e)) => { + match e.client_api_error_kind() { + Some(RumaError::UnknownPos) => { + // we expect this to come through. + error_seen = true; + continue; + } + _ => Err(e)?, + } + } + None => anyhow::bail!("Stream ended unexpectedly."), + }; + // we only heard about the ones we had asked for + if summary.rooms.iter().any(|s| s == &room_id) { + room_updated = true; + break; + } + } + + assert!(error_seen, "We have not seen the UnknownPos error"); + assert!(room_updated, "Room update has not been seen"); + + // send another message + + let room = client.get_joined_room(&room_id).context("No joined room {room_id}")?; + + let content = RoomMessageEventContent::text_plain("Hello world"); + + let event_id = room.send(content, None).await?.event_id; // this should put our room up to the most recent + + // let's see for it to come down the pipe + let mut room_updated = false; + + for _n in 0..2 { + let room_summary = stream.next().await.context("sync has closed unexpectedly")?; + let summary = room_summary?; + // we only heard about the ones we had asked for + if summary.rooms.iter().any(|s| s == &room_id) { + room_updated = true; + break; + } + } + assert!(room_updated, "Room update has not been seen"); + + let Some(sliding_sync_room) = sync_proxy.get_room(room_id) else { + panic!("Sliding Sync room not found"); + }; + + let Some(event) = sliding_sync_room.latest_event().await else { + panic!("No event found") + }; + + let collection_simple = view + .rooms_list + .lock_ref() + .iter() + .map(Into::::into) + .collect::>(); + assert_eq!( + collection_simple, + repeat(RoomListEntryEasy::Invalid) + .take(1) + .chain(repeat(RoomListEntryEasy::Filled).take(2)) + .collect::>() + ); + + assert_eq!( + event.event_id().unwrap(), + event_id, + "Latest event is different than what we've sent" + ); + + Ok(()) + } } From 94b60f915fb6569e583cfac069f46398a43d5ac4 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 16 Feb 2023 17:54:28 +0100 Subject: [PATCH 2/2] test(sdk): Address feedbacks. --- .../sliding-sync-integration-test/src/lib.rs | 67 +++++++++++-------- 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/testing/sliding-sync-integration-test/src/lib.rs b/testing/sliding-sync-integration-test/src/lib.rs index dc74f864f..310f2f56d 100644 --- a/testing/sliding-sync-integration-test/src/lib.rs +++ b/testing/sliding-sync-integration-test/src/lib.rs @@ -1267,27 +1267,37 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn restart_room_resubscription() -> anyhow::Result<()> { let (client, sync_proxy_builder) = random_setup_with_rooms(3).await?; - let sliding_window_view = SlidingSyncViewBuilder::default() - .sync_mode(SlidingSyncMode::Selective) - .set_range(0u32, 2u32) - .sort(vec!["by_recency".to_string(), "by_name".to_string()]) - .name("sliding") - .build()?; - let sync_proxy = sync_proxy_builder.add_view(sliding_window_view).build().await?; - let view = sync_proxy.view("sliding").context("but we just added that view!")?; + + let sync_proxy = sync_proxy_builder + .add_view( + SlidingSyncViewBuilder::default() + .sync_mode(SlidingSyncMode::Selective) + .set_range(0u32, 2u32) + .sort(vec!["by_recency".to_string(), "by_name".to_string()]) + .name("sliding_view") + .build()?, + ) + .build() + .await?; + + let view = sync_proxy.view("sliding_view").context("View `sliding_view` isn't found")?; + let stream = sync_proxy.stream(); pin_mut!(stream); + let room_summary = - stream.next().await.context("No room summary found, loop ended unsuccessfully")?; - let summary = room_summary?; + stream.next().await.context("No room summary found, loop ended unsuccessfully")??; + // we only heard about the ones we had asked for - assert_eq!(summary.rooms.len(), 3); + assert_eq!(room_summary.rooms.len(), 3); + let collection_simple = view .rooms_list .lock_ref() .iter() .map(Into::::into) .collect::>(); + assert_eq!( collection_simple, repeat(RoomListEntryEasy::Filled).take(3).collect::>() @@ -1300,10 +1310,10 @@ mod tests { view.set_range(1, 2); for _n in 0..2 { - let room_summary = stream.next().await.context("sync has closed unexpectedly")?; - let summary = room_summary?; + let room_summary = stream.next().await.context("sync has closed unexpectedly")??; + // we only heard about the ones we had asked for - if summary.views.iter().any(|s| s == "sliding") { + if room_summary.views.iter().any(|s| s == "sliding") { break; } } @@ -1314,6 +1324,7 @@ mod tests { .iter() .map(Into::::into) .collect::>(); + assert_eq!( collection_simple, repeat(RoomListEntryEasy::Invalid) @@ -1329,8 +1340,7 @@ mod tests { .lock_ref() .iter() .next() - .map(Clone::clone) else - { + .map(Clone::clone) else { panic!("2nd room has moved? how?"); }; @@ -1349,10 +1359,10 @@ mod tests { let mut room_updated = false; for _n in 0..2 { - let room_summary = stream.next().await.context("sync has closed unexpectedly")?; - let summary = room_summary?; + let room_summary = stream.next().await.context("sync has closed unexpectedly")??; + // we only heard about the ones we had asked for - if summary.rooms.iter().any(|s| s == &room_id) { + if room_summary.rooms.iter().any(|s| s == &room_id) { room_updated = true; break; } @@ -1362,8 +1372,10 @@ mod tests { // force the pos to be invalid and thus this being reset internally force_sliding_sync_pos(&sync_proxy, "100".to_owned()); + let mut error_seen = false; let mut room_updated = false; + for _n in 0..2 { let summary = match stream.next().await { Some(Ok(e)) => e, @@ -1379,6 +1391,7 @@ mod tests { } None => anyhow::bail!("Stream ended unexpectedly."), }; + // we only heard about the ones we had asked for if summary.rooms.iter().any(|s| s == &room_id) { room_updated = true; @@ -1401,23 +1414,18 @@ mod tests { let mut room_updated = false; for _n in 0..2 { - let room_summary = stream.next().await.context("sync has closed unexpectedly")?; - let summary = room_summary?; + let room_summary = stream.next().await.context("sync has closed unexpectedly")??; + // we only heard about the ones we had asked for - if summary.rooms.iter().any(|s| s == &room_id) { + if room_summary.rooms.iter().any(|s| s == &room_id) { room_updated = true; break; } } assert!(room_updated, "Room update has not been seen"); - let Some(sliding_sync_room) = sync_proxy.get_room(room_id) else { - panic!("Sliding Sync room not found"); - }; - - let Some(event) = sliding_sync_room.latest_event().await else { - panic!("No event found") - }; + let sliding_sync_room = sync_proxy.get_room(&room_id).expect("Slidin Sync room not found"); + let event = sliding_sync_room.latest_event().await.expect("No even found"); let collection_simple = view .rooms_list @@ -1425,6 +1433,7 @@ mod tests { .iter() .map(Into::::into) .collect::>(); + assert_eq!( collection_simple, repeat(RoomListEntryEasy::Invalid)