feat(sdk): Ensure SlidingSync::sync drains its internal channel.

Something weird is happening with ElementX iOS. When
`SlidingSync::stop_sync` is called, the internal message `SyncLoopStop`
has time to be sent in the internal channel, but iOS decides to suspend
the code before the sync-loop processes it. When ElementX decides to
start the sync-loop again, it immediately processes the `SyncLoopStop`
message, and… stops the sync-loop.

This patch ensures that `SlidingSync::sync` drains the internal channel
before starting the sync-loop for real. `tokio::sync::mpsc::Receiver`
type has no `drain` method, so this patch implements its
own logic by calling `try_recv` in a loop, until it returns
`Err(TryRecvError::Empty)`.
This commit is contained in:
Ivan Enderlin
2023-05-31 14:45:00 +02:00
parent 85b37bfcc4
commit 9da1a2f48b

View File

@@ -52,7 +52,7 @@ use serde::{Deserialize, Serialize};
use tokio::{
select, spawn,
sync::{
mpsc::{Receiver, Sender},
mpsc::{error::TryRecvError, Receiver, Sender},
Mutex as AsyncMutex, RwLock as AsyncRwLock,
},
};
@@ -536,6 +536,24 @@ impl SlidingSync {
let sync_span = Span::current();
stream! {
// Drain the internal channel, in case some messages were still present in it.
// It's unlikely to happen in a closed Rust world, but it can happen with async
// operations over FFI. Let's make sure the internal channel is properly drained
// before starting the sync-loop.
{
let mut internal_channel_receiver_lock = self.inner.internal_channel.1.write().await;
// *Gurgle, gurgle, gurgle*
while internal_channel_receiver_lock.try_recv().is_ok() {}
// At this point, the receiver must be empty.
assert_eq!(
internal_channel_receiver_lock.try_recv(),
Err(TryRecvError::Empty),
"Sliding Sync internal channel is not empty"
);
}
loop {
sync_span.in_scope(|| {
debug!(?self.inner.extensions, ?self.inner.position,"Sync-loop is running");
@@ -645,7 +663,7 @@ impl SlidingSyncInner {
}
}
#[derive(Debug)]
#[derive(Debug, PartialEq)]
enum SlidingSyncInternalMessage {
/// Instruct the sync loop to stop.
SyncLoopStop,