mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-05 14:35:20 -04:00
test(sliding-sync): ensure unknown pos does recover the room subscription properly
test(sliding-sync): ensure unknown pos does recover the room subscription properly
This commit is contained in:
@@ -1263,4 +1263,191 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn restart_room_resubscription() -> anyhow::Result<()> {
|
||||
let (client, sync_proxy_builder) = random_setup_with_rooms(3).await?;
|
||||
|
||||
let sync_proxy = sync_proxy_builder
|
||||
.add_view(
|
||||
SlidingSyncViewBuilder::default()
|
||||
.sync_mode(SlidingSyncMode::Selective)
|
||||
.set_range(0u32, 2u32)
|
||||
.sort(vec!["by_recency".to_string(), "by_name".to_string()])
|
||||
.name("sliding_view")
|
||||
.build()?,
|
||||
)
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
let view = sync_proxy.view("sliding_view").context("View `sliding_view` isn't found")?;
|
||||
|
||||
let stream = sync_proxy.stream();
|
||||
pin_mut!(stream);
|
||||
|
||||
let room_summary =
|
||||
stream.next().await.context("No room summary found, loop ended unsuccessfully")??;
|
||||
|
||||
// we only heard about the ones we had asked for
|
||||
assert_eq!(room_summary.rooms.len(), 3);
|
||||
|
||||
let collection_simple = view
|
||||
.rooms_list
|
||||
.lock_ref()
|
||||
.iter()
|
||||
.map(Into::<RoomListEntryEasy>::into)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(
|
||||
collection_simple,
|
||||
repeat(RoomListEntryEasy::Filled).take(3).collect::<Vec<_>>()
|
||||
);
|
||||
|
||||
let _signal = view.rooms_list.signal_vec_cloned();
|
||||
|
||||
// let's move the window
|
||||
|
||||
view.set_range(1, 2);
|
||||
|
||||
for _n in 0..2 {
|
||||
let room_summary = stream.next().await.context("sync has closed unexpectedly")??;
|
||||
|
||||
// we only heard about the ones we had asked for
|
||||
if room_summary.views.iter().any(|s| s == "sliding") {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let collection_simple = view
|
||||
.rooms_list
|
||||
.lock_ref()
|
||||
.iter()
|
||||
.map(Into::<RoomListEntryEasy>::into)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(
|
||||
collection_simple,
|
||||
repeat(RoomListEntryEasy::Invalid)
|
||||
.take(1)
|
||||
.chain(repeat(RoomListEntryEasy::Filled).take(2))
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
|
||||
// let's get that first entry
|
||||
|
||||
let Some(RoomListEntry::Invalidated(room_id)) = view
|
||||
.rooms_list
|
||||
.lock_ref()
|
||||
.iter()
|
||||
.next()
|
||||
.map(Clone::clone) else {
|
||||
panic!("2nd room has moved? how?");
|
||||
};
|
||||
|
||||
// send a message
|
||||
|
||||
let room = client.get_joined_room(&room_id).context("No joined room {room_id}")?;
|
||||
|
||||
let content = RoomMessageEventContent::text_plain("Hello world");
|
||||
|
||||
room.send(content, None).await?; // this should put our room up to the most recent
|
||||
|
||||
// let's subscribe
|
||||
|
||||
sync_proxy.subscribe(room_id.clone(), Default::default());
|
||||
|
||||
let mut room_updated = false;
|
||||
|
||||
for _n in 0..2 {
|
||||
let room_summary = stream.next().await.context("sync has closed unexpectedly")??;
|
||||
|
||||
// we only heard about the ones we had asked for
|
||||
if room_summary.rooms.iter().any(|s| s == &room_id) {
|
||||
room_updated = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert!(room_updated, "Room update has not been seen");
|
||||
|
||||
// force the pos to be invalid and thus this being reset internally
|
||||
force_sliding_sync_pos(&sync_proxy, "100".to_owned());
|
||||
|
||||
let mut error_seen = false;
|
||||
let mut room_updated = false;
|
||||
|
||||
for _n in 0..2 {
|
||||
let summary = match stream.next().await {
|
||||
Some(Ok(e)) => e,
|
||||
Some(Err(e)) => {
|
||||
match e.client_api_error_kind() {
|
||||
Some(RumaError::UnknownPos) => {
|
||||
// we expect this to come through.
|
||||
error_seen = true;
|
||||
continue;
|
||||
}
|
||||
_ => Err(e)?,
|
||||
}
|
||||
}
|
||||
None => anyhow::bail!("Stream ended unexpectedly."),
|
||||
};
|
||||
|
||||
// we only heard about the ones we had asked for
|
||||
if summary.rooms.iter().any(|s| s == &room_id) {
|
||||
room_updated = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert!(error_seen, "We have not seen the UnknownPos error");
|
||||
assert!(room_updated, "Room update has not been seen");
|
||||
|
||||
// send another message
|
||||
|
||||
let room = client.get_joined_room(&room_id).context("No joined room {room_id}")?;
|
||||
|
||||
let content = RoomMessageEventContent::text_plain("Hello world");
|
||||
|
||||
let event_id = room.send(content, None).await?.event_id; // this should put our room up to the most recent
|
||||
|
||||
// let's see for it to come down the pipe
|
||||
let mut room_updated = false;
|
||||
|
||||
for _n in 0..2 {
|
||||
let room_summary = stream.next().await.context("sync has closed unexpectedly")??;
|
||||
|
||||
// we only heard about the ones we had asked for
|
||||
if room_summary.rooms.iter().any(|s| s == &room_id) {
|
||||
room_updated = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert!(room_updated, "Room update has not been seen");
|
||||
|
||||
let sliding_sync_room = sync_proxy.get_room(&room_id).expect("Slidin Sync room not found");
|
||||
let event = sliding_sync_room.latest_event().await.expect("No even found");
|
||||
|
||||
let collection_simple = view
|
||||
.rooms_list
|
||||
.lock_ref()
|
||||
.iter()
|
||||
.map(Into::<RoomListEntryEasy>::into)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(
|
||||
collection_simple,
|
||||
repeat(RoomListEntryEasy::Invalid)
|
||||
.take(1)
|
||||
.chain(repeat(RoomListEntryEasy::Filled).take(2))
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
event.event_id().unwrap(),
|
||||
event_id,
|
||||
"Latest event is different than what we've sent"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user