feat(ui): Batch the streams returned by RoomList::entries and RoomList::entries_filtered.

feat(ui): Batch the streams returned by `RoomList::entries` and `RoomList::entries_filtered`.
This commit is contained in:
Ivan Enderlin
2023-07-26 15:18:43 +02:00
committed by GitHub
7 changed files with 82 additions and 43 deletions

11
Cargo.lock generated
View File

@@ -346,6 +346,16 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "async-rx"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97b7ceb0aba3bb819c2870ff3a8548d924872f52e1f89d3c79d51303d9437577"
dependencies = [
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-std"
version = "1.12.0"
@@ -2890,6 +2900,7 @@ dependencies = [
"assert-json-diff",
"assert_matches",
"async-once-cell",
"async-rx",
"async-std",
"async-stream",
"async-trait",

View File

@@ -19,6 +19,7 @@ rust-version = "1.70"
[workspace.dependencies]
anyhow = "1.0.68"
assert_matches = "1.5.0"
async-rx = "0.1.1"
async-stream = "0.3.3"
async-trait = "0.1.60"
base64 = "0.21.0"

View File

@@ -164,7 +164,7 @@ impl RoomList {
pin_mut!(entries_stream);
while let Some(diff) = entries_stream.next().await {
listener.on_update(diff.into());
listener.on_update(diff.into_iter().map(Into::into).collect());
}
}))),
})
@@ -278,7 +278,7 @@ impl From<VectorDiff<matrix_sdk::RoomListEntry>> for RoomListEntriesUpdate {
#[uniffi::export(callback_interface)]
pub trait RoomListEntriesListener: Send + Sync + Debug {
fn on_update(&self, room_entries_update: RoomListEntriesUpdate);
fn on_update(&self, room_entries_update: Vec<RoomListEntriesUpdate>);
}
#[derive(uniffi::Object)]

View File

@@ -15,6 +15,7 @@ testing = []
[dependencies]
async-once-cell = "0.5.2"
async-rx = { workspace = true }
async-std = { version = "1.12.0", features = ["unstable"] }
async-stream = { workspace = true }
async-trait = { workspace = true }

View File

@@ -14,9 +14,10 @@
use std::future::ready;
use async_rx::StreamExt as _;
use eyeball::{SharedObservable, Subscriber};
use eyeball_im::{Vector, VectorDiff};
use futures_util::{pin_mut, Stream, StreamExt};
use futures_util::{pin_mut, Stream, StreamExt as _};
use matrix_sdk::{
executor::{spawn, JoinHandle},
RoomListEntry, SlidingSync, SlidingSyncList,
@@ -29,6 +30,7 @@ use super::{Error, State};
#[derive(Debug)]
pub struct RoomList {
sliding_sync_list: SlidingSyncList,
room_list_service_state: Subscriber<State>,
loading_state: SharedObservable<RoomListLoadingState>,
loading_state_task: JoinHandle<()>,
}
@@ -49,10 +51,12 @@ impl RoomList {
.on_list(sliding_sync_list_name, |list| ready(list.clone()))
.await
.ok_or_else(|| Error::UnknownList(sliding_sync_list_name.to_owned()))?;
let loading_state = SharedObservable::new(RoomListLoadingState::NotLoaded);
Ok(Self {
sliding_sync_list: sliding_sync_list.clone(),
room_list_service_state: room_list_service_state.clone(),
loading_state: loading_state.clone(),
loading_state_task: spawn(async move {
pin_mut!(room_list_service_state);
@@ -96,8 +100,15 @@ impl RoomList {
/// list entry's updates.
pub fn entries(
&self,
) -> (Vector<RoomListEntry>, impl Stream<Item = VectorDiff<RoomListEntry>>) {
self.sliding_sync_list.room_list_stream()
) -> (Vector<RoomListEntry>, impl Stream<Item = Vec<VectorDiff<RoomListEntry>>>) {
let (entries, entries_stream) = self.sliding_sync_list.room_list_stream();
(
entries,
// Batch the entries stream. Batch is drained every time the `room_list_service_state`
// is changed.
entries_stream.batch_with(self.room_list_service_state.clone().map(|_| ())),
)
}
/// Similar to [`Self::entries`] except that it's possible to provide a
@@ -105,11 +116,18 @@ impl RoomList {
pub fn entries_filtered<F>(
&self,
filter: F,
) -> (Vector<RoomListEntry>, impl Stream<Item = VectorDiff<RoomListEntry>>)
) -> (Vector<RoomListEntry>, impl Stream<Item = Vec<VectorDiff<RoomListEntry>>>)
where
F: Fn(&RoomListEntry) -> bool + Send + Sync + 'static,
{
self.sliding_sync_list.room_list_filtered_stream(filter)
let (entries, entries_stream) = self.sliding_sync_list.room_list_filtered_stream(filter);
(
entries,
// Batch the entries stream. Batch is drained every time the `room_list_service_state`
// is changed.
entries_stream.batch_with(self.room_list_service_state.clone().map(|_| ())),
)
}
}

View File

@@ -111,17 +111,17 @@ macro_rules! entries {
macro_rules! assert_entries_stream {
// `append [$entries]`
( @_ [ $stream:ident ] [ append [ $( $entries:tt )+ ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => {
( @_ [ $entries:ident ] [ append [ $( $entry:tt )+ ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => {
assert_entries_stream!(
@_
[ $stream ]
[ $entries ]
[ $( $rest )* ]
[
$( $accumulator )*
assert_matches!(
$stream.next().now_or_never(),
Some(Some(VectorDiff::Append { values })) => {
assert_eq!(values, entries!( $( $entries )+ ));
$entries.next(),
Some(&VectorDiff::Append { ref values }) => {
assert_eq!(values, &entries!( $( $entry )+ ));
}
);
]
@@ -129,17 +129,17 @@ macro_rules! assert_entries_stream {
};
// `set [$nth] [$entry]`
( @_ [ $stream:ident ] [ set [ $index:literal ] [ $( $entry:tt )+ ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => {
( @_ [ $entries:ident ] [ set [ $index:literal ] [ $( $entry:tt )+ ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => {
assert_entries_stream!(
@_
[ $stream ]
[ $entries ]
[ $( $rest )* ]
[
$( $accumulator )*
assert_matches!(
$stream.next().now_or_never(),
Some(Some(VectorDiff::Set { index: $index, value })) => {
assert_eq!(value, entries!( $( $entry )+ )[0]);
$entries.next(),
Some(&VectorDiff::Set { index: $index, ref value }) => {
assert_eq!(value, &entries!( $( $entry )+ )[0]);
}
);
]
@@ -147,33 +147,33 @@ macro_rules! assert_entries_stream {
};
// `remove [$nth]`
( @_ [ $stream:ident ] [ remove [ $index:literal ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => {
( @_ [ $entries:ident ] [ remove [ $index:literal ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => {
assert_entries_stream!(
@_
[ $stream ]
[ $entries ]
[ $( $rest )* ]
[
$( $accumulator )*
assert_eq!(
$stream.next().now_or_never(),
Some(Some(VectorDiff::Remove { index: $index })),
$entries.next(),
Some(&VectorDiff::Remove { index: $index }),
);
]
)
};
// `insert [$nth] [ $entry ]`
( @_ [ $stream:ident ] [ insert [ $index:literal ] [ $( $entry:tt )+ ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => {
( @_ [ $entries:ident ] [ insert [ $index:literal ] [ $( $entry:tt )+ ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => {
assert_entries_stream!(
@_
[ $stream ]
[ $entries ]
[ $( $rest )* ]
[
$( $accumulator )*
assert_matches!(
$stream.next().now_or_never(),
Some(Some(VectorDiff::Insert { index: $index, value })) => {
assert_eq!(value, entries!( $( $entry )+ )[0]);
$entries.next(),
Some(&VectorDiff::Insert { index: $index, ref value }) => {
assert_eq!(value, &entries!( $( $entry )+ )[0]);
}
);
]
@@ -181,24 +181,35 @@ macro_rules! assert_entries_stream {
};
// `pending`
( @_ [ $stream:ident ] [ pending ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => {
( @_ [ $entries:ident ] [ pending ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => {
assert_entries_stream!(
@_
[ $stream ]
[ $entries ]
[ $( $rest )* ]
[
$( $accumulator )*
assert_eq!($stream.next().now_or_never(), None);
assert_eq!($entries.next(), None);
]
)
};
( @_ [ $stream:ident ] [] [ $( $accumulator:tt )* ] ) => {
( @_ [ $entries:ident ] [] [ $( $accumulator:tt )* ] ) => {
$( $accumulator )*
};
( [ $stream:ident ] $( $all:tt )* ) => {
assert_entries_stream!( @_ [ $stream ] [ $( $all )* ] [] )
// Wait on Tokio to run all the tasks. Necessary only when testing.
yield_now().await;
let entries = $stream
.next()
.now_or_never()
.expect("stream entry wasn't in the ready state")
.expect("stream was stopped");
let mut entries = entries.iter();
assert_entries_stream!( @_ [ entries ] [ $( $all )* ] [] )
};
}
@@ -1151,7 +1162,7 @@ async fn test_loading_states() -> Result<(), Error> {
},
};
// Wait on Tokio to run all the tasks. It won't happen in the main app.
// Wait on Tokio to run all the tasks. Necessary only when testing.
yield_now().await;
// There is a loading state update, it's loaded now!
@@ -1183,7 +1194,7 @@ async fn test_loading_states() -> Result<(), Error> {
},
};
// Wait on Tokio to run all the tasks. It won't happen in the main app.
// Wait on Tokio to run all the tasks. Necessary only when testing.
yield_now().await;
// There is a loading state update because the number of rooms has been updated.
@@ -1215,7 +1226,7 @@ async fn test_loading_states() -> Result<(), Error> {
},
};
// Wait on Tokio to run all the tasks. It won't happen in the main app.
// Wait on Tokio to run all the tasks. Necessary only when testing.
yield_now().await;
// No loading state update.
@@ -1593,11 +1604,6 @@ async fn test_invites_stream() -> Result<(), Error> {
assert_eq!(room_id, room_id_0);
});
assert_entries_stream! {
[invites_stream]
pending;
};
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Running => Running,

View File

@@ -5,13 +5,15 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
#[cfg_attr(any(test, feature = "testing"), derive(PartialEq))]
pub enum RoomListEntry {
/// This entry isn't known at this point and thus considered `Empty`.
/// The list knows there is an entry but this entry has not been loaded yet,
/// thus it's marked as empty.
#[default]
Empty,
/// There was `OwnedRoomId` but since the server told us to invalid this
/// entry. it is considered stale.
/// The list has loaded this entry in the past, but the entry is now out of
/// range and may no longer be synced, thus it's marked as invalidated (to
/// use the spec's term).
Invalidated(OwnedRoomId),
/// This entry is followed with `OwnedRoomId`.
/// The list has loaded this entry, and it's up-to-date.
Filled(OwnedRoomId),
}