diff --git a/bindings/matrix-sdk-ffi/src/api.udl b/bindings/matrix-sdk-ffi/src/api.udl index 1092ae4ed..d7885c1c9 100644 --- a/bindings/matrix-sdk-ffi/src/api.udl +++ b/bindings/matrix-sdk-ffi/src/api.udl @@ -117,6 +117,7 @@ interface SlidingSyncView { }; interface SlidingSyncRoom { + StoppableSpawn? subscribe_and_add_timeline_listener(TimelineListener listener, RoomSubscription? settings); StoppableSpawn? add_timeline_listener(TimelineListener listener); }; diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index b698e5b6b..1533a44fd 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -26,17 +26,24 @@ use crate::{ TimelineListener, }; +type StoppableSpawnCallback = Box; + pub struct StoppableSpawn { handle: Arc>>>, + callback: Arc>>, } impl StoppableSpawn { fn with_handle(handle: JoinHandle<()>) -> StoppableSpawn { - StoppableSpawn { handle: Arc::new(RwLock::new(Some(handle))) } + StoppableSpawn { handle: Arc::new(RwLock::new(Some(handle))), callback: Default::default() } } fn with_handle_ref(handle: Arc>>>) -> StoppableSpawn { - StoppableSpawn { handle } + StoppableSpawn { handle, callback: Default::default() } + } + + fn set_callback(&mut self, f: StoppableSpawnCallback) { + *self.callback.write().unwrap() = Some(f) } } @@ -46,12 +53,21 @@ impl StoppableSpawn { if let Some(handle) = self.handle.write().unwrap().take() { handle.abort(); } + if let Some(callback) = self.callback.write().unwrap().take() { + callback(); + } } pub fn is_cancelled(&self) -> bool { self.handle.read().unwrap().is_none() } } +impl Drop for StoppableSpawn { + fn drop(&mut self) { + self.cancel(); + } +} + #[derive(uniffi::Object)] pub struct UnreadNotificationsCount { highlight_count: u32, @@ -89,6 +105,7 @@ impl From for UnreadNotificationsCount { pub struct SlidingSyncRoom { inner: matrix_sdk::SlidingSyncRoom, timeline: TimelineLock, + runner: matrix_sdk::SlidingSync, client: Client, } @@ -139,6 +156,26 @@ impl SlidingSyncRoom { &self, listener: Box, ) -> Option> { + Some(Arc::new(self.add_timeline_listener_inner(listener)?)) + } + + pub fn subscribe_and_add_timeline_listener( + &self, + listener: Box, + settings: Option, + ) -> Option> { + let mut spawner = self.add_timeline_listener_inner(listener)?; + let room_id = self.inner.room_id().clone(); + self.runner.subscribe(room_id.clone(), settings.map(Into::into)); + let runner = self.runner.clone(); + spawner.set_callback(Box::new(move || runner.unsubscribe(room_id))); + Some(Arc::new(spawner)) + } + + fn add_timeline_listener_inner( + &self, + listener: Box, + ) -> Option { let mut timeline_lock = self.timeline.write().unwrap(); let timeline_signal = match &*timeline_lock { Some(timeline) => timeline.signal(), @@ -155,19 +192,17 @@ impl SlidingSyncRoom { }; let listener: Arc = listener.into(); - Some(Arc::new(StoppableSpawn::with_handle(RUNTIME.spawn(timeline_signal.for_each( - move |diff| { - let listener = listener.clone(); - let fut = RUNTIME - .spawn_blocking(move || listener.on_update(Arc::new(TimelineDiff::new(diff)))); + Some(StoppableSpawn::with_handle(RUNTIME.spawn(timeline_signal.for_each(move |diff| { + let listener = listener.clone(); + let fut = RUNTIME + .spawn_blocking(move || listener.on_update(Arc::new(TimelineDiff::new(diff)))); - async move { - if let Err(e) = fut.await { - error!("Timeline listener error: {e}"); - } + async move { + if let Err(e) = fut.await { + error!("Timeline listener error: {e}"); } - }, - ))))) + } + })))) } } @@ -188,15 +223,14 @@ pub struct RoomSubscription { pub timeline_limit: Option, } -impl TryInto for RoomSubscription { - type Error = anyhow::Error; - fn try_into(self) -> anyhow::Result { - Ok(assign!(RumaRoomSubscription::default(), { - required_state: self.required_state.map(|r| +impl From for RumaRoomSubscription { + fn from(val: RoomSubscription) -> Self { + assign!(RumaRoomSubscription::default(), { + required_state: val.required_state.map(|r| r.into_iter().map(|s| (s.key.into(), s.value)).collect() ).unwrap_or_default(), - timeline_limit: self.timeline_limit.map(|u| u.into()) - })) + timeline_limit: val.timeline_limit.map(|u| u.into()) + }) } } @@ -549,9 +583,7 @@ impl SlidingSync { room_id: String, settings: Option, ) -> anyhow::Result<()> { - let settings = - if let Some(settings) = settings { Some(settings.try_into()?) } else { None }; - self.inner.subscribe(room_id.try_into()?, settings); + self.inner.subscribe(room_id.try_into()?, settings.map(Into::into)); Ok(()) } @@ -561,9 +593,11 @@ impl SlidingSync { } pub fn get_room(&self, room_id: String) -> anyhow::Result>> { + let runner = self.inner.clone(); Ok(self.inner.get_room(OwnedRoomId::try_from(room_id)?).map(|inner| { Arc::new(SlidingSyncRoom { inner, + runner, client: self.client.clone(), timeline: Default::default(), }) @@ -586,6 +620,7 @@ impl SlidingSync { o.map(|inner| { Arc::new(SlidingSyncRoom { inner, + runner: self.inner.clone(), client: self.client.clone(), timeline: Default::default(), })