feat(sdk): Implement SlidingSync::stop_sync

feat(sdk): Implement `SlidingSync::stop_sync`
This commit is contained in:
Ivan Enderlin
2023-05-24 11:52:19 +02:00
committed by GitHub
7 changed files with 77 additions and 35 deletions

View File

@@ -783,7 +783,7 @@ impl SlidingSync {
let observer = self.observer.clone();
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
let stream = inner.stream();
let stream = inner.sync();
pin_mut!(stream);
loop {
@@ -800,7 +800,7 @@ impl SlidingSync {
}
None => {
warn!("Inner streaming loop ended unexpectedly");
warn!("SlidingSync sync-loop ended");
break;
}
};
@@ -811,6 +811,10 @@ impl SlidingSync {
}
})))
}
pub fn stop_sync(&self) {
RUNTIME.block_on(async move { self.inner.stop_sync().await.unwrap() });
}
}
#[derive(Clone, uniffi::Object)]

View File

@@ -223,7 +223,7 @@ async fn test_timeline_basic() -> Result<()> {
.set_range(0..=10)])
.await?;
let stream = sliding_sync.stream();
let stream = sliding_sync.sync();
pin_mut!(stream);
let room_id = room_id!("!foo:bar.org");
@@ -270,7 +270,7 @@ async fn test_timeline_duplicated_events() -> Result<()> {
.set_range(0..=10)])
.await?;
let stream = sliding_sync.stream();
let stream = sliding_sync.sync();
pin_mut!(stream);
let room_id = room_id!("!foo:bar.org");

View File

@@ -240,7 +240,7 @@ does after.
This is modelled as a [async `Stream`][`futures_core::stream::Stream`] in
our API, that one basically wants to continue polling. Once one has made its
setup ready and build its sliding sync sessions, one wants to acquire its
[`.stream()`](`SlidingSync::stream`) and continuously poll it.
[`.sync()`](`SlidingSync::sync`) and continuously poll it.
While the async stream API allows for streams to end (by returning `None`)
Sliding Sync streams items `Result<UpdateSummary, Error>`. For every
@@ -276,7 +276,7 @@ let sliding_sync = client
.build()
.await?;
let stream = sliding_sync.stream();
let stream = sliding_sync.sync();
// continuously poll for updates
pin_mut!(stream);
@@ -321,7 +321,7 @@ the [`SlidingSync`][] will only process new data and skip the processing
even across restarts.
To support this, in practice, one can spawn a `Future` that runs
[`SlidingSync::stream`]. The spawned `Future` can be cancelled safely. If
[`SlidingSync::sync`]. The spawned `Future` can be cancelled safely. If
the client was waiting on a response, it's cancelled without any issue. If
a response was just received, it
will be fully handled by `SlidingSync`. This _response is always
@@ -470,7 +470,7 @@ tokio::spawn(async move {
}
});
let stream = sliding_sync.stream();
let stream = sliding_sync.sync();
// continuously poll for updates
pin_mut!(stream);

View File

@@ -230,7 +230,7 @@ impl SlidingSyncBuilder {
let mut delta_token = None;
let (internal_channel_sender, internal_channel_receiver) = channel(256);
let (internal_channel_sender, internal_channel_receiver) = channel(8);
let mut lists = BTreeMap::new();

View File

@@ -661,6 +661,7 @@ impl SlidingSyncListInner {
}
/// Send a message over the internal channel.
#[instrument]
fn internal_channel_blocking_send(
&self,
message: SlidingSyncInternalMessage,

View File

@@ -571,21 +571,21 @@ impl SlidingSync {
spawn(future.instrument(Span::current())).await.unwrap()
}
/// Create a _new_ Sliding Sync stream.
/// Create a _new_ Sliding Sync sync-loop.
///
/// This stream will send requests and will handle responses automatically,
/// hence updating the lists.
/// This method returns a `Stream`, which will send requests and will handle
/// responses automatically. Lists and rooms are updated automatically.
#[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
#[instrument(name = "sync_stream", skip_all)]
pub fn stream(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
debug!(?self.inner.extensions, "About to run the sync stream");
pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
debug!(?self.inner.extensions, "About to run the sync-loop");
let sync_stream_span = Span::current();
let sync_span = Span::current();
stream! {
loop {
sync_stream_span.in_scope(|| {
debug!(?self.inner.extensions, "Sync stream loop is running");
sync_span.in_scope(|| {
debug!(?self.inner.extensions, "Sync-loop is running");
});
let mut internal_channel_receiver_lock = self.inner.internal_channel.1.write().await;
@@ -596,6 +596,10 @@ impl SlidingSync {
internal_message = internal_channel_receiver_lock.recv() => {
use SlidingSyncInternalMessage::*;
sync_span.in_scope(|| {
debug!(?internal_message, "Sync-loop has received an internal message");
});
match internal_message {
None | Some(SyncLoopStop) => {
break;
@@ -607,7 +611,7 @@ impl SlidingSync {
}
}
update_summary = self.sync_once().instrument(sync_stream_span.clone()) => {
update_summary = self.sync_once().instrument(sync_span.clone()) => {
match update_summary {
Ok(Some(updates)) => {
self.inner.reset_counter.store(0, Ordering::SeqCst);
@@ -627,7 +631,7 @@ impl SlidingSync {
if self.inner.reset_counter.fetch_add(1, Ordering::SeqCst)
>= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION
{
sync_stream_span.in_scope(|| {
sync_span.in_scope(|| {
error!("Session expired {MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION} times in a row");
});
@@ -638,7 +642,7 @@ impl SlidingSync {
}
// Let's reset the Sliding Sync session.
sync_stream_span.in_scope(|| {
sync_span.in_scope(|| {
warn!("Session expired. Restarting Sliding Sync.");
// To “restart” a Sliding Sync session, we set `pos` to its initial value.
@@ -661,10 +665,22 @@ impl SlidingSync {
}
}
debug!("Sync stream loop has exited.");
debug!("Sync-loop has exited.");
}
}
/// Force to stop the sync-loop ([`Self::sync`]) if it's running.
///
/// Usually, dropping the `Stream` returned by [`Self::sync`] should be
/// enough to “stop” it, but depending of how this `Stream` is used, it
/// might not be obvious to drop it immediately (thinking of using this API
/// over FFI; the foreign-language might not be able to drop a value
/// immediately). Thus, calling this method will ensure that the sync-loop
/// stops gracefully and as soon as it returns.
pub async fn stop_sync(&self) -> Result<(), Error> {
self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop).await
}
/// Resets the lists.
pub fn reset_lists(&self) -> Result<(), Error> {
let lists = self.inner.lists.read().unwrap();
@@ -679,20 +695,18 @@ impl SlidingSync {
impl SlidingSyncInner {
/// Send a message over the internal channel.
async fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<()> {
Ok(self
.internal_channel
.0
.send(message)
.await
.map_err(|_| Error::InternalChannelIsBroken)?)
#[instrument]
async fn internal_channel_send(
&self,
message: SlidingSyncInternalMessage,
) -> Result<(), Error> {
self.internal_channel.0.send(message).await.map_err(|_| Error::InternalChannelIsBroken)
}
}
#[derive(Debug)]
enum SlidingSyncInternalMessage {
/// Instruct the sync loop to stop.
#[allow(unused)] // temporary
SyncLoopStop,
/// Instruct the sync loop to skip over any remaining work in its iteration,
@@ -747,7 +761,7 @@ impl From<&SlidingSync> for FrozenSlidingSync {
}
/// A summary of the updates received after a sync (like in
/// [`SlidingSync::stream`]).
/// [`SlidingSync::sync`]).
#[derive(Debug, Clone)]
pub struct UpdateSummary {
/// The names of the lists that have seen an update.
@@ -757,9 +771,9 @@ pub struct UpdateSummary {
}
#[cfg(test)]
mod test {
mod tests {
use assert_matches::assert_matches;
use futures_util::pin_mut;
use futures_util::{pin_mut, StreamExt};
use ruma::{
api::client::sync::sync_events::v4::{E2EEConfig, ToDeviceConfig},
room_id,
@@ -833,7 +847,7 @@ mod test {
.set_range(0..=10)])
.await?;
let _stream = sliding_sync.stream();
let _stream = sliding_sync.sync();
pin_mut!(_stream);
let room0 = room_id!("!r0:bar.org").to_owned();
@@ -881,7 +895,7 @@ mod test {
.set_range(0..=10)])
.await?;
let _stream = sliding_sync.stream();
let _stream = sliding_sync.sync();
pin_mut!(_stream);
sliding_sync
@@ -901,4 +915,27 @@ mod test {
Ok(())
}
#[tokio::test]
async fn test_stop_sync_loop() -> Result<()> {
let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
.sync_mode(SlidingSyncMode::Selective)
.set_range(0..=10)])
.await?;
let stream = sliding_sync.sync();
pin_mut!(stream);
for _ in 0..3 {
assert!(stream.next().await.is_some());
}
sliding_sync.stop_sync().await?;
for _ in 0..3 {
assert!(stream.next().await.is_none());
}
Ok(())
}
}

View File

@@ -49,7 +49,7 @@ async fn it_works_smoke_test() -> anyhow::Result<()> {
)
.build()
.await?;
let stream = sync_proxy.stream();
let stream = sync_proxy.sync();
pin_mut!(stream);
let room_summary =
stream.next().await.context("No room summary found, loop ended unsuccessfully")?;