fix(sliding-sync): return a stoppable spawn to allow listener cancellation

This commit is contained in:
Benjamin Kampmann
2022-11-30 15:53:05 +01:00
parent 229e6b28a5
commit 0bfa63b31e
3 changed files with 22 additions and 14 deletions

View File

@@ -115,7 +115,7 @@ interface SlidingSyncView {
};
interface SlidingSyncRoom {
void add_timeline_listener(TimelineListener listener);
StoppableSpawn? add_timeline_listener(TimelineListener listener);
};
interface SlidingSync {

View File

@@ -139,24 +139,32 @@ impl SlidingSyncRoom {
}
impl SlidingSyncRoom {
pub fn add_timeline_listener(&self, listener: Box<dyn TimelineListener>) {
let timeline = RUNTIME.block_on(async move { self.inner.timeline().await });
pub fn add_timeline_listener(
&self,
listener: Box<dyn TimelineListener>,
) -> Option<Arc<StoppableSpawn>> {
let Some(timeline) = RUNTIME.block_on(async move { self.inner.timeline().await }) else {
tracing::warn!(room_id=?self.room_id(), "Could set timeline listener: no timeline found.");
return None
};
let timeline_signal =
self.timeline.write().unwrap().get_or_insert_with(|| Arc::new(timeline)).signal();
let listener: Arc<dyn TimelineListener> = listener.into();
RUNTIME.spawn(timeline_signal.for_each(move |diff| {
let listener = listener.clone();
let fut = RUNTIME
.spawn_blocking(move || listener.on_update(Arc::new(TimelineDiff::new(diff))));
Some(Arc::new(StoppableSpawn::with_handle(RUNTIME.spawn(timeline_signal.for_each(
move |diff| {
let listener = listener.clone();
let fut = RUNTIME
.spawn_blocking(move || listener.on_update(Arc::new(TimelineDiff::new(diff))));
async move {
if let Err(e) = fut.await {
error!("Timeline listener error: {e}");
async move {
if let Err(e) = fut.await {
error!("Timeline listener error: {e}");
}
}
}
}));
},
)))))
}
}

View File

@@ -219,8 +219,8 @@ impl SlidingSyncRoom {
/// `Timeline` of this room
#[cfg(feature = "experimental-timeline")]
pub async fn timeline(&self) -> Timeline {
self.timeline_no_fully_read_tracking().unwrap().with_fully_read_tracking().await
pub async fn timeline(&self) -> Option<Timeline> {
Some(self.timeline_no_fully_read_tracking()?.with_fully_read_tracking().await)
}
fn timeline_no_fully_read_tracking(&self) -> Option<Timeline> {