diff --git a/testing/sliding-sync-integration-test/src/lib.rs b/testing/sliding-sync-integration-test/src/lib.rs index 49f517a74..310f2f56d 100644 --- a/testing/sliding-sync-integration-test/src/lib.rs +++ b/testing/sliding-sync-integration-test/src/lib.rs @@ -1263,4 +1263,191 @@ mod tests { Ok(()) } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn restart_room_resubscription() -> anyhow::Result<()> { + let (client, sync_proxy_builder) = random_setup_with_rooms(3).await?; + + let sync_proxy = sync_proxy_builder + .add_view( + SlidingSyncViewBuilder::default() + .sync_mode(SlidingSyncMode::Selective) + .set_range(0u32, 2u32) + .sort(vec!["by_recency".to_string(), "by_name".to_string()]) + .name("sliding_view") + .build()?, + ) + .build() + .await?; + + let view = sync_proxy.view("sliding_view").context("View `sliding_view` isn't found")?; + + let stream = sync_proxy.stream(); + pin_mut!(stream); + + let room_summary = + stream.next().await.context("No room summary found, loop ended unsuccessfully")??; + + // we only heard about the ones we had asked for + assert_eq!(room_summary.rooms.len(), 3); + + let collection_simple = view + .rooms_list + .lock_ref() + .iter() + .map(Into::::into) + .collect::>(); + + assert_eq!( + collection_simple, + repeat(RoomListEntryEasy::Filled).take(3).collect::>() + ); + + let _signal = view.rooms_list.signal_vec_cloned(); + + // let's move the window + + view.set_range(1, 2); + + for _n in 0..2 { + let room_summary = stream.next().await.context("sync has closed unexpectedly")??; + + // we only heard about the ones we had asked for + if room_summary.views.iter().any(|s| s == "sliding") { + break; + } + } + + let collection_simple = view + .rooms_list + .lock_ref() + .iter() + .map(Into::::into) + .collect::>(); + + assert_eq!( + collection_simple, + repeat(RoomListEntryEasy::Invalid) + .take(1) + .chain(repeat(RoomListEntryEasy::Filled).take(2)) + .collect::>() + ); + + // let's get that first entry + + let Some(RoomListEntry::Invalidated(room_id)) = view + .rooms_list + .lock_ref() + .iter() + .next() + .map(Clone::clone) else { + panic!("2nd room has moved? how?"); + }; + + // send a message + + let room = client.get_joined_room(&room_id).context("No joined room {room_id}")?; + + let content = RoomMessageEventContent::text_plain("Hello world"); + + room.send(content, None).await?; // this should put our room up to the most recent + + // let's subscribe + + sync_proxy.subscribe(room_id.clone(), Default::default()); + + let mut room_updated = false; + + for _n in 0..2 { + let room_summary = stream.next().await.context("sync has closed unexpectedly")??; + + // we only heard about the ones we had asked for + if room_summary.rooms.iter().any(|s| s == &room_id) { + room_updated = true; + break; + } + } + + assert!(room_updated, "Room update has not been seen"); + + // force the pos to be invalid and thus this being reset internally + force_sliding_sync_pos(&sync_proxy, "100".to_owned()); + + let mut error_seen = false; + let mut room_updated = false; + + for _n in 0..2 { + let summary = match stream.next().await { + Some(Ok(e)) => e, + Some(Err(e)) => { + match e.client_api_error_kind() { + Some(RumaError::UnknownPos) => { + // we expect this to come through. + error_seen = true; + continue; + } + _ => Err(e)?, + } + } + None => anyhow::bail!("Stream ended unexpectedly."), + }; + + // we only heard about the ones we had asked for + if summary.rooms.iter().any(|s| s == &room_id) { + room_updated = true; + break; + } + } + + assert!(error_seen, "We have not seen the UnknownPos error"); + assert!(room_updated, "Room update has not been seen"); + + // send another message + + let room = client.get_joined_room(&room_id).context("No joined room {room_id}")?; + + let content = RoomMessageEventContent::text_plain("Hello world"); + + let event_id = room.send(content, None).await?.event_id; // this should put our room up to the most recent + + // let's see for it to come down the pipe + let mut room_updated = false; + + for _n in 0..2 { + let room_summary = stream.next().await.context("sync has closed unexpectedly")??; + + // we only heard about the ones we had asked for + if room_summary.rooms.iter().any(|s| s == &room_id) { + room_updated = true; + break; + } + } + assert!(room_updated, "Room update has not been seen"); + + let sliding_sync_room = sync_proxy.get_room(&room_id).expect("Slidin Sync room not found"); + let event = sliding_sync_room.latest_event().await.expect("No even found"); + + let collection_simple = view + .rooms_list + .lock_ref() + .iter() + .map(Into::::into) + .collect::>(); + + assert_eq!( + collection_simple, + repeat(RoomListEntryEasy::Invalid) + .take(1) + .chain(repeat(RoomListEntryEasy::Filled).take(2)) + .collect::>() + ); + + assert_eq!( + event.event_id().unwrap(), + event_id, + "Latest event is different than what we've sent" + ); + + Ok(()) + } }