feat(sliding-sync): adding views on live sliding-sync

This commit is contained in:
Benjamin Kampmann
2023-01-11 17:24:18 +01:00
parent 2949dcc773
commit a8d49b86df
3 changed files with 297 additions and 8 deletions

View File

@@ -599,13 +599,15 @@ impl SlidingSync {
impl SlidingSync {
#[allow(clippy::significant_drop_in_scrutinee)]
pub fn get_view(&self, name: String) -> Option<Arc<SlidingSyncView>> {
let views = self.inner.views.lock_ref();
for s in views.iter() {
if s.name == name {
return Some(Arc::new(SlidingSyncView { inner: s.clone() }));
}
}
None
self.inner.view(&name).map(|inner| Arc::new(SlidingSyncView { inner }))
}
pub fn add_view(&self, view: Arc<SlidingSyncView>) -> Option<u32> {
self.inner.add_view(view.inner.clone()).map(|u| u as u32)
}
pub fn pop_view(&self, name: String) -> Option<Arc<SlidingSyncView>> {
self.inner.pop_view(&name).map(|inner| Arc::new(SlidingSyncView { inner }))
}
pub fn sync(&self) -> Arc<StoppableSpawn> {

View File

@@ -694,8 +694,43 @@ impl SlidingSync {
}
/// Get access to the SlidingSyncView named `view_name`
///
/// Note: Remember that this list might have been changed since you started
/// listening to the stream and is therefor not necessarily up to date
/// with the views used for the stream.
pub fn view(&self, view_name: &str) -> Option<SlidingSyncView> {
self.views.lock_ref().iter().find(|v| v.name == view_name).cloned().clone()
self.views.lock_ref().iter().find(|v| v.name == view_name).cloned()
}
/// Remove the SlidingSyncView named `view_name` from the views list if
/// found
///
/// Note: Remember that this change will only be applicable for any new
/// stream created after this. The old stream will still continue to use the
/// previous set of views
pub fn pop_view(&self, _view_name: &str) -> Option<SlidingSyncView> {
unimplemented!("Index based sliding sync doesn't have support removing views");
}
/// Add the view to the list of views
///
/// As views need to have a unique `.name`, if a view with the same name
/// is found the new view will replace the old one and the return will give
/// the position it was found at. If none is found, the view will be pushed
/// to the end and `None` is returned.
///
/// Note: Remember that this change will only be applicable for any new
/// stream created after this. The old stream will still continue to use the
/// previous set of views
pub fn add_view(&self, view: SlidingSyncView) -> Option<usize> {
let mut v = self.views.lock_mut();
if let Some(idx) = v.iter().position(|v| v.name == view.name) {
v.set_cloned(idx, view);
Some(idx)
} else {
v.push_cloned(view);
None
}
}
/// Lookup a set of rooms

View File

@@ -74,6 +74,258 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn adding_view_later() -> anyhow::Result<()> {
let view_name_1 = "sliding1";
let view_name_2 = "sliding2";
let view_name_3 = "sliding3";
let (client, sync_proxy_builder) = random_setup_with_rooms(20).await?;
let build_view = |name| {
SlidingSyncViewBuilder::default()
.sync_mode(SlidingSyncMode::Selective)
.set_range(0u32, 10u32)
.sort(vec!["by_recency".to_string(), "by_name".to_string()])
.name(name)
.build()
};
let sync_proxy = sync_proxy_builder
.add_view(build_view(view_name_1)?)
.add_view(build_view(view_name_2)?)
.build()
.await?;
let Some(view1 )= sync_proxy.view(view_name_1) else {
anyhow::bail!("but we just added that view!");
};
let Some(_view2 )= sync_proxy.view(view_name_2) else {
anyhow::bail!("but we just added that view!");
};
assert!(sync_proxy.view(view_name_3).is_none());
let stream = sync_proxy.stream().await?;
pin_mut!(stream);
let Some(room_summary ) = stream.next().await else {
anyhow::bail!("No room summary found, loop ended unsuccessfully");
};
let summary = room_summary?;
// we only heard about the ones we had asked for
assert_eq!(summary.views, [view_name_1, view_name_2]);
assert!(sync_proxy.add_view(build_view(view_name_3)?).is_none());
// we need to restart the stream after every view listing update
let stream = sync_proxy.stream().await?;
pin_mut!(stream);
let mut saw_update = false;
for _n in 0..2 {
let Some(room_summary ) = stream.next().await else {
anyhow::bail!("sync has closed unexpectedly");
};
let summary = room_summary?;
// we only heard about the ones we had asked for
if !summary.views.is_empty() {
// only if we saw an update come through
assert_eq!(summary.views, [view_name_3]);
// we didn't update the other views, so only no 2 should se an update
saw_update = true;
break;
}
}
assert!(saw_update, "We didn't see the updae come through the pipe");
// and let's update the order of all views again
let Some(RoomListEntry::Filled(room_id)) = view1
.rooms_list
.lock_ref()
.iter().nth(4).map(Clone::clone) else
{
anyhow::bail!("4th room has moved? how?");
};
let Some(room) = client.get_joined_room(&room_id) else {
anyhow::bail!("No joined room {room_id}");
};
let content = RoomMessageEventContent::text_plain("Hello world");
room.send(content, None).await?; // this should put our room up to the most recent
let mut saw_update = false;
for _n in 0..2 {
let Some(room_summary ) = stream.next().await else {
anyhow::bail!("sync has closed unexpectedly");
};
let summary = room_summary?;
// we only heard about the ones we had asked for
if !summary.views.is_empty() {
// only if we saw an update come through
assert_eq!(summary.views, [view_name_1, view_name_2, view_name_3,]);
// notice that our view 2 is now the last view, but all have seen updates
saw_update = true;
break;
}
}
assert!(saw_update, "We didn't see the updae come through the pipe");
Ok(())
}
// index-based views don't support removing views. Leaving this test for an API
// update later.
//
// #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
// async fn live_views() -> anyhow::Result<()> {
// let view_name_1 = "sliding1";
// let view_name_2 = "sliding2";
// let view_name_3 = "sliding3";
// let (client, sync_proxy_builder) = random_setup_with_rooms(20).await?;
// let build_view = |name| {
// SlidingSyncViewBuilder::default()
// .sync_mode(SlidingSyncMode::Selective)
// .set_range(0u32, 10u32)
// .sort(vec!["by_recency".to_string(), "by_name".to_string()])
// .name(name)
// .build()
// };
// let sync_proxy = sync_proxy_builder
// .add_view(build_view(view_name_1)?)
// .add_view(build_view(view_name_2)?)
// .add_view(build_view(view_name_3)?)
// .build()
// .await?;
// let Some(view1 )= sync_proxy.view(view_name_1) else {
// anyhow::bail!("but we just added that view!");
// };
// let Some(_view2 )= sync_proxy.view(view_name_2) else {
// anyhow::bail!("but we just added that view!");
// };
// let Some(_view3 )= sync_proxy.view(view_name_3) else {
// anyhow::bail!("but we just added that view!");
// };
// let stream = sync_proxy.stream().await?;
// pin_mut!(stream);
// let Some(room_summary ) = stream.next().await else {
// anyhow::bail!("No room summary found, loop ended unsuccessfully");
// };
// let summary = room_summary?;
// // we only heard about the ones we had asked for
// assert_eq!(summary.views, [view_name_1, view_name_2, view_name_3]);
// let Some(view_2) = sync_proxy.pop_view(view_name_2) else {
// anyhow::bail!("Room exists");
// };
// // we need to restart the stream after every view listing update
// let stream = sync_proxy.stream().await?;
// pin_mut!(stream);
// // Let's trigger an update by sending a message to room pos=3, making it
// move to // pos 0
// let Some(RoomListEntry::Filled(room_id)) = view1
// .rooms_list
// .lock_ref()
// .iter().nth(3).map(Clone::clone) else
// {
// anyhow::bail!("2nd room has moved? how?");
// };
// let Some(room) = client.get_joined_room(&room_id) else {
// anyhow::bail!("No joined room {room_id}");
// };
// let content = RoomMessageEventContent::text_plain("Hello world");
// room.send(content, None).await?; // this should put our room up to the
// most recent
// let mut saw_update = false;
// for _n in 0..2 {
// let Some(room_summary ) = stream.next().await else {
// anyhow::bail!("sync has closed unexpectedly");
// };
// let summary = room_summary?;
// // we only heard about the ones we had asked for
// if !summary.views.is_empty() {
// // only if we saw an update come through
// assert_eq!(summary.views, [view_name_1, view_name_3]);
// saw_update = true;
// break;
// }
// }
// assert!(saw_update, "We didn't see the updae come through the pipe");
// assert!(sync_proxy.add_view(view_2).is_none());
// // we need to restart the stream after every view listing update
// let stream = sync_proxy.stream().await?;
// pin_mut!(stream);
// let mut saw_update = false;
// for _n in 0..2 {
// let Some(room_summary ) = stream.next().await else {
// anyhow::bail!("sync has closed unexpectedly");
// };
// let summary = room_summary?;
// // we only heard about the ones we had asked for
// if !summary.views.is_empty() {
// // only if we saw an update come through
// assert_eq!(summary.views, [view_name_2]);
// // we didn't update the other views, so only no 2 should se an
// update saw_update = true;
// break;
// }
// }
// assert!(saw_update, "We didn't see the updae come through the pipe");
// // and let's update the order of all views again
// let Some(RoomListEntry::Filled(room_id)) = view1
// .rooms_list
// .lock_ref()
// .iter().nth(4).map(Clone::clone) else
// {
// anyhow::bail!("4th room has moved? how?");
// };
// let Some(room) = client.get_joined_room(&room_id) else {
// anyhow::bail!("No joined room {room_id}");
// };
// let content = RoomMessageEventContent::text_plain("Hello world");
// room.send(content, None).await?; // this should put our room up to the
// most recent
// let mut saw_update = false;
// for _n in 0..2 {
// let Some(room_summary ) = stream.next().await else {
// anyhow::bail!("sync has closed unexpectedly");
// };
// let summary = room_summary?;
// // we only heard about the ones we had asked for
// if !summary.views.is_empty() {
// // only if we saw an update come through
// assert_eq!(summary.views, [view_name_1, view_name_3,
// view_name_2]); // notice that our view 2 is now the last
// view, but all have seen updates saw_update = true;
// break;
// }
// }
// assert!(saw_update, "We didn't see the updae come through the pipe");
// Ok(())
// }
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn resizing_sliding_window() -> anyhow::Result<()> {
let (_client, sync_proxy_builder) = random_setup_with_rooms(20).await?;