diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 83fc3a0b4..63acc3be4 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -52,7 +52,7 @@ use serde::{Deserialize, Serialize}; use tokio::{ select, spawn, sync::{ - mpsc::{Receiver, Sender}, + mpsc::{error::TryRecvError, Receiver, Sender}, Mutex as AsyncMutex, RwLock as AsyncRwLock, }, }; @@ -536,6 +536,24 @@ impl SlidingSync { let sync_span = Span::current(); stream! { + // Drain the internal channel, in case some messages were still present in it. + // It's unlikely to happen in a closed Rust world, but it can happen with async + // operations over FFI. Let's make sure the internal channel is properly drained + // before starting the sync-loop. + { + let mut internal_channel_receiver_lock = self.inner.internal_channel.1.write().await; + + // *Gurgle, gurgle, gurgle* + while internal_channel_receiver_lock.try_recv().is_ok() {} + + // At this point, the receiver must be empty. + assert_eq!( + internal_channel_receiver_lock.try_recv(), + Err(TryRecvError::Empty), + "Sliding Sync internal channel is not empty" + ); + } + loop { sync_span.in_scope(|| { debug!(?self.inner.extensions, ?self.inner.position,"Sync-loop is running"); @@ -645,7 +663,7 @@ impl SlidingSyncInner { } } -#[derive(Debug)] +#[derive(Debug, PartialEq)] enum SlidingSyncInternalMessage { /// Instruct the sync loop to stop. SyncLoopStop,