mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-24 16:48:52 -04:00
refactor(ffi): refactor stoppable spawn to be more token-oriented
This commit is contained in:
committed by
Benjamin Kampmann
parent
938a03867b
commit
6c0ef2db51
@@ -29,16 +29,12 @@ use crate::{
|
||||
type StoppableSpawnCallback = Box<dyn FnOnce() + Send + Sync>;
|
||||
|
||||
pub struct StoppableSpawn {
|
||||
handle: Arc<RwLock<Option<JoinHandle<()>>>>,
|
||||
callback: Arc<RwLock<Option<StoppableSpawnCallback>>>,
|
||||
handle: JoinHandle<()>,
|
||||
callback: RwLock<Option<StoppableSpawnCallback>>,
|
||||
}
|
||||
|
||||
impl StoppableSpawn {
|
||||
fn with_handle(handle: JoinHandle<()>) -> StoppableSpawn {
|
||||
StoppableSpawn { handle: Arc::new(RwLock::new(Some(handle))), callback: Default::default() }
|
||||
}
|
||||
|
||||
fn with_handle_ref(handle: Arc<RwLock<Option<JoinHandle<()>>>>) -> StoppableSpawn {
|
||||
StoppableSpawn { handle, callback: Default::default() }
|
||||
}
|
||||
|
||||
@@ -47,18 +43,23 @@ impl StoppableSpawn {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<JoinHandle<()>> for StoppableSpawn {
|
||||
fn from(value: JoinHandle<()>) -> Self {
|
||||
StoppableSpawn::with_handle(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[uniffi::export]
|
||||
impl StoppableSpawn {
|
||||
pub fn cancel(&self) {
|
||||
if let Some(handle) = self.handle.write().unwrap().take() {
|
||||
handle.abort();
|
||||
}
|
||||
debug!("stoppable.cancel() called");
|
||||
self.handle.abort();
|
||||
if let Some(callback) = self.callback.write().unwrap().take() {
|
||||
callback();
|
||||
}
|
||||
}
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
self.handle.read().unwrap().is_none()
|
||||
pub fn is_finished(&self) -> bool {
|
||||
self.handle.is_finished()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -569,12 +570,11 @@ pub struct SlidingSync {
|
||||
inner: matrix_sdk::SlidingSync,
|
||||
client: Client,
|
||||
observer: Arc<RwLock<Option<Box<dyn SlidingSyncObserver>>>>,
|
||||
sync_handle: Arc<RwLock<Option<JoinHandle<()>>>>,
|
||||
}
|
||||
|
||||
impl SlidingSync {
|
||||
fn new(inner: matrix_sdk::SlidingSync, client: Client) -> Self {
|
||||
SlidingSync { inner, client, observer: Default::default(), sync_handle: Default::default() }
|
||||
SlidingSync { inner, client, observer: Default::default() }
|
||||
}
|
||||
|
||||
pub fn set_observer(&self, observer: Option<Box<dyn SlidingSyncObserver>>) {
|
||||
@@ -650,48 +650,37 @@ impl SlidingSync {
|
||||
|
||||
pub fn sync(&self) -> Arc<StoppableSpawn> {
|
||||
let inner = self.inner.clone();
|
||||
let client = self.client.clone();
|
||||
let observer = self.observer.clone();
|
||||
let spawn = Arc::new(StoppableSpawn::with_handle_ref(self.sync_handle.clone()));
|
||||
let inner_spawn = spawn.clone();
|
||||
{
|
||||
let mut sync_handle = self.sync_handle.write().unwrap();
|
||||
let client = self.client.clone();
|
||||
|
||||
if let Some(handle) = sync_handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
*sync_handle = Some(RUNTIME.spawn(async move {
|
||||
let stream = inner.stream().await.unwrap();
|
||||
pin_mut!(stream);
|
||||
loop {
|
||||
let update = match stream.next().await {
|
||||
Some(Ok(u)) => u,
|
||||
Some(Err(e)) => {
|
||||
if client.process_sync_error(e) == LoopCtrl::Break {
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
Arc::new(
|
||||
RUNTIME
|
||||
.spawn(async move {
|
||||
let stream = inner.stream().await.unwrap();
|
||||
pin_mut!(stream);
|
||||
loop {
|
||||
let update = match stream.next().await {
|
||||
Some(Ok(u)) => u,
|
||||
Some(Err(e)) => {
|
||||
if client.process_sync_error(e) == LoopCtrl::Break {
|
||||
warn!("loop was stopped by client error processing");
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
warn!("Inner streaming loop ended unexpectedly");
|
||||
break;
|
||||
}
|
||||
};
|
||||
if let Some(ref observer) = *observer.read().unwrap() {
|
||||
observer.did_receive_sync_update(update.into());
|
||||
}
|
||||
None => {
|
||||
debug!("No update from loop, cancelled");
|
||||
break;
|
||||
}
|
||||
};
|
||||
if let Some(ref observer) = *observer.read().unwrap() {
|
||||
observer.did_receive_sync_update(update.into());
|
||||
} else {
|
||||
// when the observer has been removed
|
||||
// we cancel the loop
|
||||
inner_spawn.cancel();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
spawn
|
||||
})
|
||||
.into(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user