From 9da1a2f48b4433cb780a5490c65ea96950b3dbaa Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 31 May 2023 14:45:00 +0200 Subject: [PATCH] feat(sdk): Ensure `SlidingSync::sync` drains its internal channel. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Something weird is happening with ElementX iOS. When `SlidingSync::stop_sync` is called, the internal message `SyncLoopStop` has time to be sent in the internal channel, but iOS decides to suspend the code before the sync-loop processes it. When ElementX decides to start the sync-loop again, it immediately processes the `SyncLoopStop` message, and… stops the sync-loop. This patch ensures that `SlidingSync::sync` drains the internal channel before starting the sync-loop for real. `tokio::sync::mpsc::Receiver` type has no `drain` method, so this patch implements its own logic by calling `try_recv` in a loop, until it returns `Err(TryRecvError::Empty)`. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 83fc3a0b4..63acc3be4 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -52,7 +52,7 @@ use serde::{Deserialize, Serialize}; use tokio::{ select, spawn, sync::{ - mpsc::{Receiver, Sender}, + mpsc::{error::TryRecvError, Receiver, Sender}, Mutex as AsyncMutex, RwLock as AsyncRwLock, }, }; @@ -536,6 +536,24 @@ impl SlidingSync { let sync_span = Span::current(); stream! { + // Drain the internal channel, in case some messages were still present in it. + // It's unlikely to happen in a closed Rust world, but it can happen with async + // operations over FFI. Let's make sure the internal channel is properly drained + // before starting the sync-loop. + { + let mut internal_channel_receiver_lock = self.inner.internal_channel.1.write().await; + + // *Gurgle, gurgle, gurgle* + while internal_channel_receiver_lock.try_recv().is_ok() {} + + // At this point, the receiver must be empty. + assert_eq!( + internal_channel_receiver_lock.try_recv(), + Err(TryRecvError::Empty), + "Sliding Sync internal channel is not empty" + ); + } + loop { sync_span.in_scope(|| { debug!(?self.inner.extensions, ?self.inner.position,"Sync-loop is running"); @@ -645,7 +663,7 @@ impl SlidingSyncInner { } } -#[derive(Debug)] +#[derive(Debug, PartialEq)] enum SlidingSyncInternalMessage { /// Instruct the sync loop to stop. SyncLoopStop,