From 6c0ef2db51f7229980f609477df7ca3ea6372231 Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Mon, 16 Jan 2023 16:12:53 +0000 Subject: [PATCH] refactor(ffi): refactor stoppable spawn to be more token-oriented --- bindings/matrix-sdk-ffi/src/sliding_sync.rs | 89 +++++++++------------ 1 file changed, 39 insertions(+), 50 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index 6d4d89ebc..09e78148e 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -29,16 +29,12 @@ use crate::{ type StoppableSpawnCallback = Box; pub struct StoppableSpawn { - handle: Arc>>>, - callback: Arc>>, + handle: JoinHandle<()>, + callback: RwLock>, } impl StoppableSpawn { fn with_handle(handle: JoinHandle<()>) -> StoppableSpawn { - StoppableSpawn { handle: Arc::new(RwLock::new(Some(handle))), callback: Default::default() } - } - - fn with_handle_ref(handle: Arc>>>) -> StoppableSpawn { StoppableSpawn { handle, callback: Default::default() } } @@ -47,18 +43,23 @@ impl StoppableSpawn { } } +impl From> for StoppableSpawn { + fn from(value: JoinHandle<()>) -> Self { + StoppableSpawn::with_handle(value) + } +} + #[uniffi::export] impl StoppableSpawn { pub fn cancel(&self) { - if let Some(handle) = self.handle.write().unwrap().take() { - handle.abort(); - } + debug!("stoppable.cancel() called"); + self.handle.abort(); if let Some(callback) = self.callback.write().unwrap().take() { callback(); } } - pub fn is_cancelled(&self) -> bool { - self.handle.read().unwrap().is_none() + pub fn is_finished(&self) -> bool { + self.handle.is_finished() } } @@ -569,12 +570,11 @@ pub struct SlidingSync { inner: matrix_sdk::SlidingSync, client: Client, observer: Arc>>>, - sync_handle: Arc>>>, } impl SlidingSync { fn new(inner: matrix_sdk::SlidingSync, client: Client) -> Self { - SlidingSync { inner, client, observer: Default::default(), sync_handle: Default::default() } + SlidingSync { inner, client, observer: Default::default() } } pub fn set_observer(&self, observer: Option>) { @@ -650,48 +650,37 @@ impl SlidingSync { pub fn sync(&self) -> Arc { let inner = self.inner.clone(); + let client = self.client.clone(); let observer = self.observer.clone(); - let spawn = Arc::new(StoppableSpawn::with_handle_ref(self.sync_handle.clone())); - let inner_spawn = spawn.clone(); - { - let mut sync_handle = self.sync_handle.write().unwrap(); - let client = self.client.clone(); - if let Some(handle) = sync_handle.take() { - handle.abort(); - } - - *sync_handle = Some(RUNTIME.spawn(async move { - let stream = inner.stream().await.unwrap(); - pin_mut!(stream); - loop { - let update = match stream.next().await { - Some(Ok(u)) => u, - Some(Err(e)) => { - if client.process_sync_error(e) == LoopCtrl::Break { - break; - } else { - continue; + Arc::new( + RUNTIME + .spawn(async move { + let stream = inner.stream().await.unwrap(); + pin_mut!(stream); + loop { + let update = match stream.next().await { + Some(Ok(u)) => u, + Some(Err(e)) => { + if client.process_sync_error(e) == LoopCtrl::Break { + warn!("loop was stopped by client error processing"); + break; + } else { + continue; + } } + None => { + warn!("Inner streaming loop ended unexpectedly"); + break; + } + }; + if let Some(ref observer) = *observer.read().unwrap() { + observer.did_receive_sync_update(update.into()); } - None => { - debug!("No update from loop, cancelled"); - break; - } - }; - if let Some(ref observer) = *observer.read().unwrap() { - observer.did_receive_sync_update(update.into()); - } else { - // when the observer has been removed - // we cancel the loop - inner_spawn.cancel(); - break; } - } - })); - } - - spawn + }) + .into(), + ) } }