From b8580b76f7b121a7e45aa15bfcf0d8ef73cda24b Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 24 May 2023 08:20:29 +0200 Subject: [PATCH 1/9] feat(sdk): Rename `SlidingSync::stream` to `::sync`. Because it doesn't start a stream, but a sync-loop. --- bindings/matrix-sdk-ffi/src/sliding_sync.rs | 2 +- .../integration/timeline/sliding_sync.rs | 4 +-- crates/matrix-sdk/src/sliding_sync/README.md | 8 ++--- crates/matrix-sdk/src/sliding_sync/mod.rs | 30 +++++++++---------- .../sliding-sync-integration-test/src/lib.rs | 2 +- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index 8d17affe9..6b7c2f02d 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -777,7 +777,7 @@ impl SlidingSync { let observer = self.observer.clone(); Arc::new(TaskHandle::new(RUNTIME.spawn(async move { - let stream = inner.stream(); + let stream = inner.sync(); pin_mut!(stream); loop { diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/sliding_sync.rs b/crates/matrix-sdk-ui/tests/integration/timeline/sliding_sync.rs index 33ac20786..245418e9e 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/sliding_sync.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/sliding_sync.rs @@ -223,7 +223,7 @@ async fn test_timeline_basic() -> Result<()> { .set_range(0..=10)]) .await?; - let stream = sliding_sync.stream(); + let stream = sliding_sync.sync(); pin_mut!(stream); let room_id = room_id!("!foo:bar.org"); @@ -270,7 +270,7 @@ async fn test_timeline_duplicated_events() -> Result<()> { .set_range(0..=10)]) .await?; - let stream = sliding_sync.stream(); + let stream = sliding_sync.sync(); pin_mut!(stream); let room_id = room_id!("!foo:bar.org"); diff --git a/crates/matrix-sdk/src/sliding_sync/README.md b/crates/matrix-sdk/src/sliding_sync/README.md index 534a5d614..c2b53715c 100644 --- a/crates/matrix-sdk/src/sliding_sync/README.md +++ b/crates/matrix-sdk/src/sliding_sync/README.md @@ -240,7 +240,7 @@ does after. This is modelled as a [async `Stream`][`futures_core::stream::Stream`] in our API, that one basically wants to continue polling. Once one has made its setup ready and build its sliding sync sessions, one wants to acquire its -[`.stream()`](`SlidingSync::stream`) and continuously poll it. +[`.sync()`](`SlidingSync::sync`) and continuously poll it. While the async stream API allows for streams to end (by returning `None`) Sliding Sync streams items `Result`. For every @@ -276,7 +276,7 @@ let sliding_sync = client .build() .await?; -let stream = sliding_sync.stream(); +let stream = sliding_sync.sync(); // continuously poll for updates pin_mut!(stream); @@ -321,7 +321,7 @@ the [`SlidingSync`][] will only process new data and skip the processing even across restarts. To support this, in practice, one can spawn a `Future` that runs -[`SlidingSync::stream`]. The spawned `Future` can be cancelled safely. If +[`SlidingSync::sync`]. The spawned `Future` can be cancelled safely. If the client was waiting on a response, it's cancelled without any issue. If a response was just received, it will be fully handled by `SlidingSync`. This _response is always @@ -470,7 +470,7 @@ tokio::spawn(async move { } }); -let stream = sliding_sync.stream(); +let stream = sliding_sync.sync(); // continuously poll for updates pin_mut!(stream); diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index ea26914eb..36f9c75ba 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -571,21 +571,21 @@ impl SlidingSync { spawn(future.instrument(Span::current())).await.unwrap() } - /// Create a _new_ Sliding Sync stream. + /// Create a _new_ Sliding Sync sync-loop. /// - /// This stream will send requests and will handle responses automatically, - /// hence updating the lists. + /// This method returns a `Stream`, which will send requests and will handle + /// responses automatically. Lists and rooms are updated automatically. #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro #[instrument(name = "sync_stream", skip_all)] - pub fn stream(&self) -> impl Stream> + '_ { - debug!(?self.inner.extensions, "About to run the sync stream"); + pub fn sync(&self) -> impl Stream> + '_ { + debug!(?self.inner.extensions, "About to run the sync-loop"); - let sync_stream_span = Span::current(); + let sync_span = Span::current(); stream! { loop { - sync_stream_span.in_scope(|| { - debug!(?self.inner.extensions, "Sync stream loop is running"); + sync_span.in_scope(|| { + debug!(?self.inner.extensions, "Sync-loop is running"); }); let mut internal_channel_receiver_lock = self.inner.internal_channel.1.write().await; @@ -607,7 +607,7 @@ impl SlidingSync { } } - update_summary = self.sync_once().instrument(sync_stream_span.clone()) => { + update_summary = self.sync_once().instrument(sync_span.clone()) => { match update_summary { Ok(Some(updates)) => { self.inner.reset_counter.store(0, Ordering::SeqCst); @@ -627,7 +627,7 @@ impl SlidingSync { if self.inner.reset_counter.fetch_add(1, Ordering::SeqCst) >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION { - sync_stream_span.in_scope(|| { + sync_span.in_scope(|| { error!("Session expired {MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION} times in a row"); }); @@ -638,7 +638,7 @@ impl SlidingSync { } // Let's reset the Sliding Sync session. - sync_stream_span.in_scope(|| { + sync_span.in_scope(|| { warn!("Session expired. Restarting Sliding Sync."); // To “restart” a Sliding Sync session, we set `pos` to its initial value. @@ -661,7 +661,7 @@ impl SlidingSync { } } - debug!("Sync stream loop has exited."); + debug!("Sync-loop has exited."); } } @@ -747,7 +747,7 @@ impl From<&SlidingSync> for FrozenSlidingSync { } /// A summary of the updates received after a sync (like in -/// [`SlidingSync::stream`]). +/// [`SlidingSync::sync`]). #[derive(Debug, Clone)] pub struct UpdateSummary { /// The names of the lists that have seen an update. @@ -833,7 +833,7 @@ mod test { .set_range(0..=10)]) .await?; - let _stream = sliding_sync.stream(); + let _stream = sliding_sync.sync(); pin_mut!(_stream); let room0 = room_id!("!r0:bar.org").to_owned(); @@ -881,7 +881,7 @@ mod test { .set_range(0..=10)]) .await?; - let _stream = sliding_sync.stream(); + let _stream = sliding_sync.sync(); pin_mut!(_stream); sliding_sync diff --git a/testing/sliding-sync-integration-test/src/lib.rs b/testing/sliding-sync-integration-test/src/lib.rs index 0bde59c22..00669f8b5 100644 --- a/testing/sliding-sync-integration-test/src/lib.rs +++ b/testing/sliding-sync-integration-test/src/lib.rs @@ -49,7 +49,7 @@ async fn it_works_smoke_test() -> anyhow::Result<()> { ) .build() .await?; - let stream = sync_proxy.stream(); + let stream = sync_proxy.sync(); pin_mut!(stream); let room_summary = stream.next().await.context("No room summary found, loop ended unsuccessfully")?; From 99bbf2a42bf50183598f6f34e384c8008ada8f27 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 24 May 2023 08:41:16 +0200 Subject: [PATCH 2/9] feat(sdk): Implement `SlidingSync::stop_sync`. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In case it's not obvious to drop the `Stream` returned by `SlidingSync::sync` immediately to “stop” the sync-loop, one can use the new `stop_sync` method to do achieve the same result. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 36f9c75ba..67430ea26 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -665,6 +665,18 @@ impl SlidingSync { } } + /// Force to stop the sync-loop ([`Self::sync`]) if it's running. + /// + /// Usually, dropping the `Stream` returned by [`Self::sync`] should be + /// enough to “stop” it, but depending of how this `Stream` is used, it + /// might not be obvious to drop it immediately (thinking of using this API + /// over FFI; the foreign-language might not be able to drop a value + /// immediately). Thus, calling this method will ensure that the sync-loop + /// stops gracefully and as soon as it returns. + pub async fn stop_sync(&self) -> Result<(), Error> { + self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop).await + } + /// Resets the lists. pub fn reset_lists(&self) -> Result<(), Error> { let lists = self.inner.lists.read().unwrap(); @@ -679,7 +691,10 @@ impl SlidingSync { impl SlidingSyncInner { /// Send a message over the internal channel. - async fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<()> { + async fn internal_channel_send( + &self, + message: SlidingSyncInternalMessage, + ) -> Result<(), Error> { Ok(self .internal_channel .0 From 7bde2cfd4aa1912b14312333f86db0639c7877f3 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 24 May 2023 08:44:51 +0200 Subject: [PATCH 3/9] feat(sdk): Add log in `SlidingSync` when an internal message is received. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 67430ea26..f916179d6 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -596,6 +596,10 @@ impl SlidingSync { internal_message = internal_channel_receiver_lock.recv() => { use SlidingSyncInternalMessage::*; + sync_span.in_scope(|| { + debug!(?internal_message, "Sync-loop has received an internal message"); + }); + match internal_message { None | Some(SyncLoopStop) => { break; From 47922f7f500085561f703a57ca4be314d62e450a Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 24 May 2023 08:59:28 +0200 Subject: [PATCH 4/9] test(sdk): Test `SlidingSync::stop_sync_loop`. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 27 +++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index f916179d6..c3d9eb1d6 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -776,9 +776,9 @@ pub struct UpdateSummary { } #[cfg(test)] -mod test { +mod tests { use assert_matches::assert_matches; - use futures_util::pin_mut; + use futures_util::{pin_mut, StreamExt}; use ruma::{ api::client::sync::sync_events::v4::{E2EEConfig, ToDeviceConfig}, room_id, @@ -920,4 +920,27 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_stop_sync_loop() -> Result<()> { + let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo") + .sync_mode(SlidingSyncMode::Selective) + .set_range(0..=10)]) + .await?; + + let stream = sliding_sync.sync(); + pin_mut!(stream); + + for _ in 0..3 { + assert!(stream.next().await.is_some()); + } + + sliding_sync.stop_sync().await?; + + for _ in 0..3 { + assert!(stream.next().await.is_none()); + } + + Ok(()) + } } From 415778d44d2d432cb1d2839cf896033777484b65 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 24 May 2023 09:03:03 +0200 Subject: [PATCH 5/9] feat(ffi): Implement `SlidingSync::stop_sync`. --- bindings/matrix-sdk-ffi/src/sliding_sync.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index 6b7c2f02d..2f7ef2616 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -794,7 +794,7 @@ impl SlidingSync { } None => { - warn!("Inner streaming loop ended unexpectedly"); + warn!("SlidingSync sync-loop ended"); break; } }; @@ -805,6 +805,10 @@ impl SlidingSync { } }))) } + + pub fn stop_sync(&self) { + RUNTIME.block_on(async move { self.inner.stop_sync().await.unwrap() }); + } } #[derive(Clone, uniffi::Object)] From 849f83adb7c0a46e78181b83259c6e45235cc7de Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 24 May 2023 09:06:32 +0200 Subject: [PATCH 6/9] chore(sdk): Remove an `allow(unused)`. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index c3d9eb1d6..6d428fc1a 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -711,7 +711,6 @@ impl SlidingSyncInner { #[derive(Debug)] enum SlidingSyncInternalMessage { /// Instruct the sync loop to stop. - #[allow(unused)] // temporary SyncLoopStop, /// Instruct the sync loop to skip over any remaining work in its iteration, From e9bbf366ba2ff06def1f992176f69b8e86c11712 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 24 May 2023 09:19:12 +0200 Subject: [PATCH 7/9] chore(sdk): Simplify code. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 6d428fc1a..54c91ea17 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -699,12 +699,7 @@ impl SlidingSyncInner { &self, message: SlidingSyncInternalMessage, ) -> Result<(), Error> { - Ok(self - .internal_channel - .0 - .send(message) - .await - .map_err(|_| Error::InternalChannelIsBroken)?) + self.internal_channel.0.send(message).await.map_err(|_| Error::InternalChannelIsBroken) } } From 512a5e77cda010e5023fbd84fb02c9ca51daf241 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 24 May 2023 11:01:42 +0200 Subject: [PATCH 8/9] feat(sdk): Log when messages are sent internally by SlidingSync. --- crates/matrix-sdk/src/sliding_sync/list/mod.rs | 1 + crates/matrix-sdk/src/sliding_sync/mod.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/crates/matrix-sdk/src/sliding_sync/list/mod.rs b/crates/matrix-sdk/src/sliding_sync/list/mod.rs index 0e18627ed..c1f8d0626 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/mod.rs @@ -661,6 +661,7 @@ impl SlidingSyncListInner { } /// Send a message over the internal channel. + #[instrument] fn internal_channel_blocking_send( &self, message: SlidingSyncInternalMessage, diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 54c91ea17..7c2a17d37 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -695,6 +695,7 @@ impl SlidingSync { impl SlidingSyncInner { /// Send a message over the internal channel. + #[instrument] async fn internal_channel_send( &self, message: SlidingSyncInternalMessage, From 728cd5db86168a548acabbc13ad0c0808903a2ba Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 24 May 2023 11:02:06 +0200 Subject: [PATCH 9/9] fix(sdk): Restore the size of the SS channel. Because it doesn't solve any problem, it just postpones it, making the whole thing more difficult to debug. --- crates/matrix-sdk/src/sliding_sync/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 9206c0661..6776edc4f 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -230,7 +230,7 @@ impl SlidingSyncBuilder { let mut delta_token = None; - let (internal_channel_sender, internal_channel_receiver) = channel(256); + let (internal_channel_sender, internal_channel_receiver) = channel(8); let mut lists = BTreeMap::new();