From ac950ac253273eece7eef77d7c52f633abff1854 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 24 Jul 2023 15:12:30 +0200 Subject: [PATCH 1/8] doc(sdk): Update documentation of `RoomListEntry` variants. --- .../src/sliding_sync/list/room_list_entry.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/list/room_list_entry.rs b/crates/matrix-sdk/src/sliding_sync/list/room_list_entry.rs index 1600eff41..7e946a1a9 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/room_list_entry.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/room_list_entry.rs @@ -5,13 +5,15 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Default, Serialize, Deserialize)] #[cfg_attr(any(test, feature = "testing"), derive(PartialEq))] pub enum RoomListEntry { - /// This entry isn't known at this point and thus considered `Empty`. + /// The list knows there is an entry but this entry has not been loaded yet, + /// thus it's marked as empty. #[default] Empty, - /// There was `OwnedRoomId` but since the server told us to invalid this - /// entry. it is considered stale. + /// The list has loaded this entry in the past, but the entry is now out of + /// range and may no longer be synced, thus it's marked as invalidated (to + /// use the spec's term). Invalidated(OwnedRoomId), - /// This entry is followed with `OwnedRoomId`. + /// The list has loaded this entry, and it's up-to-date. Filled(OwnedRoomId), } From 1678d0754d76aedf02af6c65d401f45b94aa9e9a Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 24 Jul 2023 16:29:48 +0200 Subject: [PATCH 2/8] feat(ui): Batch the stream returned by `RoomList::entries` and `RoomList::entries_filtered`. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch uses a newly implemented `async-rx` crate, that provides `StreamExt`. This trait provides new features on `Stream`, like `StreamExt::batch_with` which allows to batch values generated by a `Stream` into a `Vec`. The batch is drained based on another `Stream`: every time a value is produced, it drains the batch stream. This feature is used in `RoomList::entries` and `RoomList::entries_filtered` to batch `Stream>` into `Stream>>`. The “drainer” is a broadcast sender, which sends an (empty) value every time the room list service state changes, so every time something happens during a sync. Note that it even drains when the room list service state jumps to `Error` or `Terminated`. --- Cargo.lock | 11 ++++ Cargo.toml | 2 + crates/matrix-sdk-ui/Cargo.toml | 2 + .../src/room_list_service/room_list.rs | 64 +++++++++++++++++-- 4 files changed, 74 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a49f872ba..030c0c048 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -346,6 +346,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "async-rx" +version = "0.1.0" +source = "git+https://codeberg.org/jplatte/async-rx?rev=4ba058835e947a316d859f3b46e0b86c41754df7#4ba058835e947a316d859f3b46e0b86c41754df7" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-std" version = "1.12.0" @@ -2890,6 +2899,7 @@ dependencies = [ "assert-json-diff", "assert_matches", "async-once-cell", + "async-rx", "async-std", "async-stream", "async-trait", @@ -2916,6 +2926,7 @@ dependencies = [ "stream_assert", "thiserror", "tokio", + "tokio-stream", "tracing", "tracing-subscriber", "wiremock", diff --git a/Cargo.toml b/Cargo.toml index ce7e24cc7..d455f8b6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ rust-version = "1.70" [workspace.dependencies] anyhow = "1.0.68" assert_matches = "1.5.0" +async-rx = { git = "https://codeberg.org/jplatte/async-rx", rev = "4ba058835e947a316d859f3b46e0b86c41754df7" } async-stream = "0.3.3" async-trait = "0.1.60" base64 = "0.21.0" @@ -41,6 +42,7 @@ serde_html_form = "0.2.0" serde_json = "1.0.91" thiserror = "1.0.38" tokio = { version = "1.24", default-features = false, features = ["sync"] } +tokio-stream = "0.1.14" tracing = { version = "0.1.36", default-features = false, features = ["std"] } tracing-core = "0.1.30" uniffi = { git = "https://github.com/mozilla/uniffi-rs", rev = "8565b7f941e7967778efd39c5ab27551dfa23ec6" } diff --git a/crates/matrix-sdk-ui/Cargo.toml b/crates/matrix-sdk-ui/Cargo.toml index 56a430832..9e66091e5 100644 --- a/crates/matrix-sdk-ui/Cargo.toml +++ b/crates/matrix-sdk-ui/Cargo.toml @@ -21,6 +21,7 @@ testing = ["dep:eyeball-im-util"] [dependencies] async-once-cell = "0.5.2" +async-rx = { workspace = true } async-std = { version = "1.12.0", features = ["unstable"] } async-stream = { workspace = true, optional = true } async-trait = { workspace = true } @@ -44,6 +45,7 @@ serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } +tokio-stream = { workspace = true } tracing = { workspace = true, features = ["attributes"] } [dev-dependencies] diff --git a/crates/matrix-sdk-ui/src/room_list_service/room_list.rs b/crates/matrix-sdk-ui/src/room_list_service/room_list.rs index ef5d92e3b..27b9f7fd6 100644 --- a/crates/matrix-sdk-ui/src/room_list_service/room_list.rs +++ b/crates/matrix-sdk-ui/src/room_list_service/room_list.rs @@ -14,13 +14,16 @@ use std::future::ready; +use async_rx::StreamExt as _; use eyeball::{SharedObservable, Subscriber}; use eyeball_im::{Vector, VectorDiff}; -use futures_util::{pin_mut, Stream, StreamExt}; +use futures_util::{pin_mut, Stream, StreamExt as _}; use matrix_sdk::{ executor::{spawn, JoinHandle}, RoomListEntry, SlidingSync, SlidingSyncList, }; +use tokio::sync::broadcast::{channel, Sender}; +use tokio_stream::wrappers::BroadcastStream; use super::{Error, State}; @@ -29,12 +32,15 @@ use super::{Error, State}; #[derive(Debug)] pub struct RoomList { sliding_sync_list: SlidingSyncList, + entries_batch_drainer: Sender<()>, + entries_batch_task: JoinHandle<()>, loading_state: SharedObservable, loading_state_task: JoinHandle<()>, } impl Drop for RoomList { fn drop(&mut self) { + self.entries_batch_task.abort(); self.loading_state_task.abort(); } } @@ -49,10 +55,40 @@ impl RoomList { .on_list(sliding_sync_list_name, |list| ready(list.clone())) .await .ok_or_else(|| Error::UnknownList(sliding_sync_list_name.to_owned()))?; + + // The channel capacity should theoretically be 2, as only one entries stream + // and one entries filtered stream should exist at a time, but we never + // know what the user may do. Let's assume we allow up to 128, as it + // seems a safe value. + // + // Why using a broadcast? Because if there is no receiver (if the entries stream + // is not used), we don't want the sender to fail and disconnect. And we want + // new receiver to have a fresh state: in a broadcast, newly created receiver + // doesn't receive queued messages. + let (entries_batch_drainer, _) = channel(128); + let loading_state = SharedObservable::new(RoomListLoadingState::NotLoaded); Ok(Self { sliding_sync_list: sliding_sync_list.clone(), + + entries_batch_drainer: entries_batch_drainer.clone(), + entries_batch_task: { + let room_list_service_state = room_list_service_state.clone(); + + spawn(async move { + pin_mut!(room_list_service_state); + + // As soon as the state changed, drain the entries stream. + while let Some(_) = room_list_service_state.next().await { + // Since a broadcast channel is used, if there is no receiver, it won't + // break. We are not interested by the result of the drainer send, we just + // drain. + let _ = entries_batch_drainer.send(()); + } + }) + }, + loading_state: loading_state.clone(), loading_state_task: spawn(async move { pin_mut!(room_list_service_state); @@ -96,8 +132,17 @@ impl RoomList { /// list entry's updates. pub fn entries( &self, - ) -> (Vector, impl Stream>) { - self.sliding_sync_list.room_list_stream() + ) -> (Vector, impl Stream>>) { + let (entries, entries_stream) = self.sliding_sync_list.room_list_stream(); + + ( + entries, + // Batch the entries stream. Batch is drained every time the `room_list_service_state` + // is changed. + entries_stream.batch_with( + BroadcastStream::new(self.entries_batch_drainer.subscribe()).map(|_| ()), + ), + ) } /// Similar to [`Self::entries`] except that it's possible to provide a @@ -105,11 +150,20 @@ impl RoomList { pub fn entries_filtered( &self, filter: F, - ) -> (Vector, impl Stream>) + ) -> (Vector, impl Stream>>) where F: Fn(&RoomListEntry) -> bool + Send + Sync + 'static, { - self.sliding_sync_list.room_list_filtered_stream(filter) + let (entries, entries_stream) = self.sliding_sync_list.room_list_filtered_stream(filter); + + ( + entries, + // Batch the entries stream. Batch is drained every time the `room_list_service_state` + // is changed. + entries_stream.batch_with( + BroadcastStream::new(self.entries_batch_drainer.subscribe()).map(|_| ()), + ), + ) } } From 3bcd9680fd3539ebfc386b55b7e092da5a413ee9 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 24 Jul 2023 16:34:06 +0200 Subject: [PATCH 3/8] test(ui): Test the new batch stream on entries and filtered entries. Only updated the macro is required here. Instead of calling `StreamExt::now_or_never` on the `$stream`, we call `Iterator::next` on `$entries` which is a `Vec>` now. --- .../tests/integration/room_list_service.rs | 65 ++++++++++--------- 1 file changed, 36 insertions(+), 29 deletions(-) diff --git a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs index bf94c04fb..8478f62d5 100644 --- a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs +++ b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs @@ -111,17 +111,17 @@ macro_rules! entries { macro_rules! assert_entries_stream { // `append [$entries]` - ( @_ [ $stream:ident ] [ append [ $( $entries:tt )+ ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => { + ( @_ [ $entries:ident ] [ append [ $( $entry:tt )+ ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => { assert_entries_stream!( @_ - [ $stream ] + [ $entries ] [ $( $rest )* ] [ $( $accumulator )* assert_matches!( - $stream.next().now_or_never(), - Some(Some(VectorDiff::Append { values })) => { - assert_eq!(values, entries!( $( $entries )+ )); + $entries.next(), + Some(&VectorDiff::Append { ref values }) => { + assert_eq!(values, &entries!( $( $entry )+ )); } ); ] @@ -129,17 +129,17 @@ macro_rules! assert_entries_stream { }; // `set [$nth] [$entry]` - ( @_ [ $stream:ident ] [ set [ $index:literal ] [ $( $entry:tt )+ ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => { + ( @_ [ $entries:ident ] [ set [ $index:literal ] [ $( $entry:tt )+ ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => { assert_entries_stream!( @_ - [ $stream ] + [ $entries ] [ $( $rest )* ] [ $( $accumulator )* assert_matches!( - $stream.next().now_or_never(), - Some(Some(VectorDiff::Set { index: $index, value })) => { - assert_eq!(value, entries!( $( $entry )+ )[0]); + $entries.next(), + Some(&VectorDiff::Set { index: $index, ref value }) => { + assert_eq!(value, &entries!( $( $entry )+ )[0]); } ); ] @@ -147,33 +147,33 @@ macro_rules! assert_entries_stream { }; // `remove [$nth]` - ( @_ [ $stream:ident ] [ remove [ $index:literal ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => { + ( @_ [ $entries:ident ] [ remove [ $index:literal ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => { assert_entries_stream!( @_ - [ $stream ] + [ $entries ] [ $( $rest )* ] [ $( $accumulator )* assert_eq!( - $stream.next().now_or_never(), - Some(Some(VectorDiff::Remove { index: $index })), + $entries.next(), + Some(&VectorDiff::Remove { index: $index }), ); ] ) }; // `insert [$nth] [ $entry ]` - ( @_ [ $stream:ident ] [ insert [ $index:literal ] [ $( $entry:tt )+ ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => { + ( @_ [ $entries:ident ] [ insert [ $index:literal ] [ $( $entry:tt )+ ] ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => { assert_entries_stream!( @_ - [ $stream ] + [ $entries ] [ $( $rest )* ] [ $( $accumulator )* assert_matches!( - $stream.next().now_or_never(), - Some(Some(VectorDiff::Insert { index: $index, value })) => { - assert_eq!(value, entries!( $( $entry )+ )[0]); + $entries.next(), + Some(&VectorDiff::Insert { index: $index, ref value }) => { + assert_eq!(value, &entries!( $( $entry )+ )[0]); } ); ] @@ -181,24 +181,36 @@ macro_rules! assert_entries_stream { }; // `pending` - ( @_ [ $stream:ident ] [ pending ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => { + ( @_ [ $entries:ident ] [ pending ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => { assert_entries_stream!( @_ - [ $stream ] + [ $entries ] [ $( $rest )* ] [ $( $accumulator )* - assert_eq!($stream.next().now_or_never(), None); + assert_eq!($entries.next(), None); + // assert_eq!($stream.next().now_or_never(), None); ] ) }; - ( @_ [ $stream:ident ] [] [ $( $accumulator:tt )* ] ) => { + ( @_ [ $entries:ident ] [] [ $( $accumulator:tt )* ] ) => { $( $accumulator )* }; ( [ $stream:ident ] $( $all:tt )* ) => { - assert_entries_stream!( @_ [ $stream ] [ $( $all )* ] [] ) + // Wait on Tokio to run all the tasks. It won't happen in the main app. + yield_now().await; + + let entries = $stream + .next() + .now_or_never() + .expect("failed to read from the stream") + .expect("failed to read from the stream (bis)"); + + let mut entries = entries.iter(); + + assert_entries_stream!( @_ [ entries ] [ $( $all )* ] [] ) }; } @@ -1593,11 +1605,6 @@ async fn test_invites_stream() -> Result<(), Error> { assert_eq!(room_id, room_id_0); }); - assert_entries_stream! { - [invites_stream] - pending; - }; - sync_then_assert_request_and_fake_response! { [server, room_list, sync] states = Running => Running, From d39cbd865b8796e922c576b18c718996dadfc7ac Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 24 Jul 2023 16:39:13 +0200 Subject: [PATCH 4/8] feat(ffi): `RoomListEntriesListener::on_update` takes a `Vec<_>` now. Since `RoomList::entries` returns a batch stream, the listener now receives a `Vec>` instead of a `VectorDiff`. --- bindings/matrix-sdk-ffi/src/room_list.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/room_list.rs b/bindings/matrix-sdk-ffi/src/room_list.rs index a063d95ef..89c9f3c97 100644 --- a/bindings/matrix-sdk-ffi/src/room_list.rs +++ b/bindings/matrix-sdk-ffi/src/room_list.rs @@ -149,7 +149,7 @@ impl RoomList { pin_mut!(entries_stream); while let Some(diff) = entries_stream.next().await { - listener.on_update(diff.into()); + listener.on_update(diff.into_iter().map(Into::into).collect()); } }))), }) @@ -263,7 +263,7 @@ impl From> for RoomListEntriesUpdate { #[uniffi::export(callback_interface)] pub trait RoomListEntriesListener: Send + Sync + Debug { - fn on_update(&self, room_entries_update: RoomListEntriesUpdate); + fn on_update(&self, room_entries_update: Vec); } #[derive(uniffi::Object)] From d60a65f82b458af51fd155bb8b1982f60fadf4f7 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 24 Jul 2023 16:59:46 +0200 Subject: [PATCH 5/8] chore: Make Clippy happy. --- crates/matrix-sdk-ui/src/room_list_service/room_list.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk-ui/src/room_list_service/room_list.rs b/crates/matrix-sdk-ui/src/room_list_service/room_list.rs index 27b9f7fd6..fbe9f1469 100644 --- a/crates/matrix-sdk-ui/src/room_list_service/room_list.rs +++ b/crates/matrix-sdk-ui/src/room_list_service/room_list.rs @@ -80,7 +80,7 @@ impl RoomList { pin_mut!(room_list_service_state); // As soon as the state changed, drain the entries stream. - while let Some(_) = room_list_service_state.next().await { + while room_list_service_state.next().await.is_some() { // Since a broadcast channel is used, if there is no receiver, it won't // break. We are not interested by the result of the drainer send, we just // drain. From 2302a7b377ee2266c475afe9d2dc1a53b1ac7e83 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 26 Jul 2023 14:07:46 +0200 Subject: [PATCH 6/8] feat(ui): Use `Subscriber` to simply drain the batch subscriber for entries. This patch brings a nice code simplification. Instead of creating a new `Stream` with `tokio` based on `Subscriber`` to drain the batch subscriber for `RoomList::entries` and `::filtered_entries`, we can _simply_ use `Subscriber` directly! It removes one dependency: `tokio- stream`, and remove possible issues with the broadcast channel `tokio::sync::broadcast`. The code is much simpler and straighforward. --- Cargo.lock | 1 - Cargo.toml | 1 - crates/matrix-sdk-ui/Cargo.toml | 1 - .../src/room_list_service/room_list.rs | 44 ++----------------- 4 files changed, 4 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 030c0c048..eb8f4396f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2926,7 +2926,6 @@ dependencies = [ "stream_assert", "thiserror", "tokio", - "tokio-stream", "tracing", "tracing-subscriber", "wiremock", diff --git a/Cargo.toml b/Cargo.toml index d455f8b6f..b7e745372 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,6 @@ serde_html_form = "0.2.0" serde_json = "1.0.91" thiserror = "1.0.38" tokio = { version = "1.24", default-features = false, features = ["sync"] } -tokio-stream = "0.1.14" tracing = { version = "0.1.36", default-features = false, features = ["std"] } tracing-core = "0.1.30" uniffi = { git = "https://github.com/mozilla/uniffi-rs", rev = "8565b7f941e7967778efd39c5ab27551dfa23ec6" } diff --git a/crates/matrix-sdk-ui/Cargo.toml b/crates/matrix-sdk-ui/Cargo.toml index 9e66091e5..f6b400ad5 100644 --- a/crates/matrix-sdk-ui/Cargo.toml +++ b/crates/matrix-sdk-ui/Cargo.toml @@ -45,7 +45,6 @@ serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } -tokio-stream = { workspace = true } tracing = { workspace = true, features = ["attributes"] } [dev-dependencies] diff --git a/crates/matrix-sdk-ui/src/room_list_service/room_list.rs b/crates/matrix-sdk-ui/src/room_list_service/room_list.rs index fbe9f1469..98b48d90e 100644 --- a/crates/matrix-sdk-ui/src/room_list_service/room_list.rs +++ b/crates/matrix-sdk-ui/src/room_list_service/room_list.rs @@ -22,8 +22,6 @@ use matrix_sdk::{ executor::{spawn, JoinHandle}, RoomListEntry, SlidingSync, SlidingSyncList, }; -use tokio::sync::broadcast::{channel, Sender}; -use tokio_stream::wrappers::BroadcastStream; use super::{Error, State}; @@ -32,15 +30,13 @@ use super::{Error, State}; #[derive(Debug)] pub struct RoomList { sliding_sync_list: SlidingSyncList, - entries_batch_drainer: Sender<()>, - entries_batch_task: JoinHandle<()>, + room_list_service_state: Subscriber, loading_state: SharedObservable, loading_state_task: JoinHandle<()>, } impl Drop for RoomList { fn drop(&mut self) { - self.entries_batch_task.abort(); self.loading_state_task.abort(); } } @@ -56,39 +52,11 @@ impl RoomList { .await .ok_or_else(|| Error::UnknownList(sliding_sync_list_name.to_owned()))?; - // The channel capacity should theoretically be 2, as only one entries stream - // and one entries filtered stream should exist at a time, but we never - // know what the user may do. Let's assume we allow up to 128, as it - // seems a safe value. - // - // Why using a broadcast? Because if there is no receiver (if the entries stream - // is not used), we don't want the sender to fail and disconnect. And we want - // new receiver to have a fresh state: in a broadcast, newly created receiver - // doesn't receive queued messages. - let (entries_batch_drainer, _) = channel(128); - let loading_state = SharedObservable::new(RoomListLoadingState::NotLoaded); Ok(Self { sliding_sync_list: sliding_sync_list.clone(), - - entries_batch_drainer: entries_batch_drainer.clone(), - entries_batch_task: { - let room_list_service_state = room_list_service_state.clone(); - - spawn(async move { - pin_mut!(room_list_service_state); - - // As soon as the state changed, drain the entries stream. - while room_list_service_state.next().await.is_some() { - // Since a broadcast channel is used, if there is no receiver, it won't - // break. We are not interested by the result of the drainer send, we just - // drain. - let _ = entries_batch_drainer.send(()); - } - }) - }, - + room_list_service_state: room_list_service_state.clone(), loading_state: loading_state.clone(), loading_state_task: spawn(async move { pin_mut!(room_list_service_state); @@ -139,9 +107,7 @@ impl RoomList { entries, // Batch the entries stream. Batch is drained every time the `room_list_service_state` // is changed. - entries_stream.batch_with( - BroadcastStream::new(self.entries_batch_drainer.subscribe()).map(|_| ()), - ), + entries_stream.batch_with(self.room_list_service_state.clone().map(|_| ())), ) } @@ -160,9 +126,7 @@ impl RoomList { entries, // Batch the entries stream. Batch is drained every time the `room_list_service_state` // is changed. - entries_stream.batch_with( - BroadcastStream::new(self.entries_batch_drainer.subscribe()).map(|_| ()), - ), + entries_stream.batch_with(self.room_list_service_state.clone().map(|_| ())), ) } } From 8a21a8a6da32b22437dc5ee92ccb5ae9e20cdd71 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 26 Jul 2023 14:19:17 +0200 Subject: [PATCH 7/8] chore(ui): Address some feedbacks. --- .../tests/integration/room_list_service.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs index 8478f62d5..737ffde99 100644 --- a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs +++ b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs @@ -189,7 +189,6 @@ macro_rules! assert_entries_stream { [ $( $accumulator )* assert_eq!($entries.next(), None); - // assert_eq!($stream.next().now_or_never(), None); ] ) }; @@ -199,14 +198,14 @@ macro_rules! assert_entries_stream { }; ( [ $stream:ident ] $( $all:tt )* ) => { - // Wait on Tokio to run all the tasks. It won't happen in the main app. + // Wait on Tokio to run all the tasks. Necessary only when testing. yield_now().await; let entries = $stream .next() .now_or_never() - .expect("failed to read from the stream") - .expect("failed to read from the stream (bis)"); + .expect("stream entry wasn't in the ready state") + .expect("stream was stopped"); let mut entries = entries.iter(); @@ -1163,7 +1162,7 @@ async fn test_loading_states() -> Result<(), Error> { }, }; - // Wait on Tokio to run all the tasks. It won't happen in the main app. + // Wait on Tokio to run all the tasks. Necessary only when testing. yield_now().await; // There is a loading state update, it's loaded now! @@ -1195,7 +1194,7 @@ async fn test_loading_states() -> Result<(), Error> { }, }; - // Wait on Tokio to run all the tasks. It won't happen in the main app. + // Wait on Tokio to run all the tasks. Necessary only when testing. yield_now().await; // There is a loading state update because the number of rooms has been updated. @@ -1227,7 +1226,7 @@ async fn test_loading_states() -> Result<(), Error> { }, }; - // Wait on Tokio to run all the tasks. It won't happen in the main app. + // Wait on Tokio to run all the tasks. Necessary only when testing. yield_now().await; // No loading state update. From efd1d1e9d29b9447abeaf21313b5f3efa8b0c881 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 26 Jul 2023 14:25:50 +0200 Subject: [PATCH 8/8] chore(cargo): Use latest version of `async-rx`. --- Cargo.lock | 5 +++-- Cargo.toml | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eb8f4396f..7eaf1bef7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -348,8 +348,9 @@ dependencies = [ [[package]] name = "async-rx" -version = "0.1.0" -source = "git+https://codeberg.org/jplatte/async-rx?rev=4ba058835e947a316d859f3b46e0b86c41754df7#4ba058835e947a316d859f3b46e0b86c41754df7" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97b7ceb0aba3bb819c2870ff3a8548d924872f52e1f89d3c79d51303d9437577" dependencies = [ "futures-core", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index b7e745372..909a2456c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ rust-version = "1.70" [workspace.dependencies] anyhow = "1.0.68" assert_matches = "1.5.0" -async-rx = { git = "https://codeberg.org/jplatte/async-rx", rev = "4ba058835e947a316d859f3b46e0b86c41754df7" } +async-rx = "0.1.1" async-stream = "0.3.3" async-trait = "0.1.60" base64 = "0.21.0"