diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index ac05abd9d..51d5814ce 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -783,7 +783,7 @@ impl SlidingSync { let observer = self.observer.clone(); Arc::new(TaskHandle::new(RUNTIME.spawn(async move { - let stream = inner.stream(); + let stream = inner.sync(); pin_mut!(stream); loop { @@ -800,7 +800,7 @@ impl SlidingSync { } None => { - warn!("Inner streaming loop ended unexpectedly"); + warn!("SlidingSync sync-loop ended"); break; } }; @@ -811,6 +811,10 @@ impl SlidingSync { } }))) } + + pub fn stop_sync(&self) { + RUNTIME.block_on(async move { self.inner.stop_sync().await.unwrap() }); + } } #[derive(Clone, uniffi::Object)] diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/sliding_sync.rs b/crates/matrix-sdk-ui/tests/integration/timeline/sliding_sync.rs index 33ac20786..245418e9e 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/sliding_sync.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/sliding_sync.rs @@ -223,7 +223,7 @@ async fn test_timeline_basic() -> Result<()> { .set_range(0..=10)]) .await?; - let stream = sliding_sync.stream(); + let stream = sliding_sync.sync(); pin_mut!(stream); let room_id = room_id!("!foo:bar.org"); @@ -270,7 +270,7 @@ async fn test_timeline_duplicated_events() -> Result<()> { .set_range(0..=10)]) .await?; - let stream = sliding_sync.stream(); + let stream = sliding_sync.sync(); pin_mut!(stream); let room_id = room_id!("!foo:bar.org"); diff --git a/crates/matrix-sdk/src/sliding_sync/README.md b/crates/matrix-sdk/src/sliding_sync/README.md index 534a5d614..c2b53715c 100644 --- a/crates/matrix-sdk/src/sliding_sync/README.md +++ b/crates/matrix-sdk/src/sliding_sync/README.md @@ -240,7 +240,7 @@ does after. This is modelled as a [async `Stream`][`futures_core::stream::Stream`] in our API, that one basically wants to continue polling. Once one has made its setup ready and build its sliding sync sessions, one wants to acquire its -[`.stream()`](`SlidingSync::stream`) and continuously poll it. +[`.sync()`](`SlidingSync::sync`) and continuously poll it. While the async stream API allows for streams to end (by returning `None`) Sliding Sync streams items `Result`. For every @@ -276,7 +276,7 @@ let sliding_sync = client .build() .await?; -let stream = sliding_sync.stream(); +let stream = sliding_sync.sync(); // continuously poll for updates pin_mut!(stream); @@ -321,7 +321,7 @@ the [`SlidingSync`][] will only process new data and skip the processing even across restarts. To support this, in practice, one can spawn a `Future` that runs -[`SlidingSync::stream`]. The spawned `Future` can be cancelled safely. If +[`SlidingSync::sync`]. The spawned `Future` can be cancelled safely. If the client was waiting on a response, it's cancelled without any issue. If a response was just received, it will be fully handled by `SlidingSync`. This _response is always @@ -470,7 +470,7 @@ tokio::spawn(async move { } }); -let stream = sliding_sync.stream(); +let stream = sliding_sync.sync(); // continuously poll for updates pin_mut!(stream); diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 9206c0661..6776edc4f 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -230,7 +230,7 @@ impl SlidingSyncBuilder { let mut delta_token = None; - let (internal_channel_sender, internal_channel_receiver) = channel(256); + let (internal_channel_sender, internal_channel_receiver) = channel(8); let mut lists = BTreeMap::new(); diff --git a/crates/matrix-sdk/src/sliding_sync/list/mod.rs b/crates/matrix-sdk/src/sliding_sync/list/mod.rs index 0e18627ed..c1f8d0626 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/mod.rs @@ -661,6 +661,7 @@ impl SlidingSyncListInner { } /// Send a message over the internal channel. + #[instrument] fn internal_channel_blocking_send( &self, message: SlidingSyncInternalMessage, diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index ea26914eb..7c2a17d37 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -571,21 +571,21 @@ impl SlidingSync { spawn(future.instrument(Span::current())).await.unwrap() } - /// Create a _new_ Sliding Sync stream. + /// Create a _new_ Sliding Sync sync-loop. /// - /// This stream will send requests and will handle responses automatically, - /// hence updating the lists. + /// This method returns a `Stream`, which will send requests and will handle + /// responses automatically. Lists and rooms are updated automatically. #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro #[instrument(name = "sync_stream", skip_all)] - pub fn stream(&self) -> impl Stream> + '_ { - debug!(?self.inner.extensions, "About to run the sync stream"); + pub fn sync(&self) -> impl Stream> + '_ { + debug!(?self.inner.extensions, "About to run the sync-loop"); - let sync_stream_span = Span::current(); + let sync_span = Span::current(); stream! { loop { - sync_stream_span.in_scope(|| { - debug!(?self.inner.extensions, "Sync stream loop is running"); + sync_span.in_scope(|| { + debug!(?self.inner.extensions, "Sync-loop is running"); }); let mut internal_channel_receiver_lock = self.inner.internal_channel.1.write().await; @@ -596,6 +596,10 @@ impl SlidingSync { internal_message = internal_channel_receiver_lock.recv() => { use SlidingSyncInternalMessage::*; + sync_span.in_scope(|| { + debug!(?internal_message, "Sync-loop has received an internal message"); + }); + match internal_message { None | Some(SyncLoopStop) => { break; @@ -607,7 +611,7 @@ impl SlidingSync { } } - update_summary = self.sync_once().instrument(sync_stream_span.clone()) => { + update_summary = self.sync_once().instrument(sync_span.clone()) => { match update_summary { Ok(Some(updates)) => { self.inner.reset_counter.store(0, Ordering::SeqCst); @@ -627,7 +631,7 @@ impl SlidingSync { if self.inner.reset_counter.fetch_add(1, Ordering::SeqCst) >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION { - sync_stream_span.in_scope(|| { + sync_span.in_scope(|| { error!("Session expired {MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION} times in a row"); }); @@ -638,7 +642,7 @@ impl SlidingSync { } // Let's reset the Sliding Sync session. - sync_stream_span.in_scope(|| { + sync_span.in_scope(|| { warn!("Session expired. Restarting Sliding Sync."); // To “restart” a Sliding Sync session, we set `pos` to its initial value. @@ -661,10 +665,22 @@ impl SlidingSync { } } - debug!("Sync stream loop has exited."); + debug!("Sync-loop has exited."); } } + /// Force to stop the sync-loop ([`Self::sync`]) if it's running. + /// + /// Usually, dropping the `Stream` returned by [`Self::sync`] should be + /// enough to “stop” it, but depending of how this `Stream` is used, it + /// might not be obvious to drop it immediately (thinking of using this API + /// over FFI; the foreign-language might not be able to drop a value + /// immediately). Thus, calling this method will ensure that the sync-loop + /// stops gracefully and as soon as it returns. + pub async fn stop_sync(&self) -> Result<(), Error> { + self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop).await + } + /// Resets the lists. pub fn reset_lists(&self) -> Result<(), Error> { let lists = self.inner.lists.read().unwrap(); @@ -679,20 +695,18 @@ impl SlidingSync { impl SlidingSyncInner { /// Send a message over the internal channel. - async fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<()> { - Ok(self - .internal_channel - .0 - .send(message) - .await - .map_err(|_| Error::InternalChannelIsBroken)?) + #[instrument] + async fn internal_channel_send( + &self, + message: SlidingSyncInternalMessage, + ) -> Result<(), Error> { + self.internal_channel.0.send(message).await.map_err(|_| Error::InternalChannelIsBroken) } } #[derive(Debug)] enum SlidingSyncInternalMessage { /// Instruct the sync loop to stop. - #[allow(unused)] // temporary SyncLoopStop, /// Instruct the sync loop to skip over any remaining work in its iteration, @@ -747,7 +761,7 @@ impl From<&SlidingSync> for FrozenSlidingSync { } /// A summary of the updates received after a sync (like in -/// [`SlidingSync::stream`]). +/// [`SlidingSync::sync`]). #[derive(Debug, Clone)] pub struct UpdateSummary { /// The names of the lists that have seen an update. @@ -757,9 +771,9 @@ pub struct UpdateSummary { } #[cfg(test)] -mod test { +mod tests { use assert_matches::assert_matches; - use futures_util::pin_mut; + use futures_util::{pin_mut, StreamExt}; use ruma::{ api::client::sync::sync_events::v4::{E2EEConfig, ToDeviceConfig}, room_id, @@ -833,7 +847,7 @@ mod test { .set_range(0..=10)]) .await?; - let _stream = sliding_sync.stream(); + let _stream = sliding_sync.sync(); pin_mut!(_stream); let room0 = room_id!("!r0:bar.org").to_owned(); @@ -881,7 +895,7 @@ mod test { .set_range(0..=10)]) .await?; - let _stream = sliding_sync.stream(); + let _stream = sliding_sync.sync(); pin_mut!(_stream); sliding_sync @@ -901,4 +915,27 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_stop_sync_loop() -> Result<()> { + let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo") + .sync_mode(SlidingSyncMode::Selective) + .set_range(0..=10)]) + .await?; + + let stream = sliding_sync.sync(); + pin_mut!(stream); + + for _ in 0..3 { + assert!(stream.next().await.is_some()); + } + + sliding_sync.stop_sync().await?; + + for _ in 0..3 { + assert!(stream.next().await.is_none()); + } + + Ok(()) + } } diff --git a/testing/sliding-sync-integration-test/src/lib.rs b/testing/sliding-sync-integration-test/src/lib.rs index 0bde59c22..00669f8b5 100644 --- a/testing/sliding-sync-integration-test/src/lib.rs +++ b/testing/sliding-sync-integration-test/src/lib.rs @@ -49,7 +49,7 @@ async fn it_works_smoke_test() -> anyhow::Result<()> { ) .build() .await?; - let stream = sync_proxy.stream(); + let stream = sync_proxy.sync(); pin_mut!(stream); let room_summary = stream.next().await.context("No room summary found, loop ended unsuccessfully")?;