diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index f385af1a8..8cd61ac51 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -219,10 +219,10 @@ impl SlidingSyncRoom { let (items, mut stoppable_spawn) = self.add_timeline_listener_inner(listener)?; let room_id = self.inner.room_id().clone(); - self.runner.subscribe(room_id.clone(), settings.map(Into::into)); + self.runner.subscribe(room_id.clone(), settings.map(Into::into))?; let runner = self.runner.clone(); - stoppable_spawn.set_finalizer(Box::new(move || runner.unsubscribe(room_id))); + stoppable_spawn.set_finalizer(Box::new(move || runner.unsubscribe(room_id).unwrap())); Ok(SlidingSyncSubscribeResult { items, task_handle: Arc::new(stoppable_spawn) }) } @@ -547,6 +547,7 @@ impl SlidingSyncListBuilder { pub fn once_built(self: Arc, callback: Box) -> Arc { let mut builder = unwrap_or_clone_arc(self); + builder.inner = builder.inner.once_built( move |list: matrix_sdk::SlidingSyncList| -> matrix_sdk::SlidingSyncList { let list = callback.update_list(Arc::new(list.into())); @@ -695,12 +696,14 @@ impl SlidingSync { room_id: String, settings: Option, ) -> Result<(), ClientError> { - self.inner.subscribe(room_id.try_into()?, settings.map(Into::into)); + self.inner.subscribe(room_id.try_into()?, settings.map(Into::into))?; + Ok(()) } pub fn unsubscribe(&self, room_id: String) -> Result<(), ClientError> { - self.inner.unsubscribe(room_id.try_into()?); + self.inner.unsubscribe(room_id.try_into()?)?; + Ok(()) } diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 17287d099..c6f59795c 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -139,23 +139,35 @@ impl SlidingSync { } /// Subscribe to a given room. - /// - /// Note: this does not cancel any pending request, so make sure to only - /// poll the stream after you've altered this. If you do that during, it - /// might take one round trip to take effect. - pub fn subscribe(&self, room_id: OwnedRoomId, settings: Option) { + pub fn subscribe( + &self, + room_id: OwnedRoomId, + settings: Option, + ) -> Result<()> { self.inner.subscriptions.write().unwrap().insert(room_id, settings.unwrap_or_default()); + + self.inner + .internal_channel + .0 + .blocking_send(SlidingSyncInternalMessage::ContinueSyncLoop) + .map_err(|_| Error::InternalChannelIsBroken)?; + + Ok(()) } /// Unsubscribe from a given room. - /// - /// Note: this does not cancel any pending request, so make sure to only - /// poll the stream after you've altered this. If you do that during, it - /// might take one round trip to take effect. - pub fn unsubscribe(&self, room_id: OwnedRoomId) { + pub fn unsubscribe(&self, room_id: OwnedRoomId) -> Result<()> { if self.inner.subscriptions.write().unwrap().remove(&room_id).is_some() { self.inner.unsubscribe.write().unwrap().push(room_id); + + self.inner + .internal_channel + .0 + .blocking_send(SlidingSyncInternalMessage::ContinueSyncLoop) + .map_err(|_| Error::InternalChannelIsBroken)?; } + + Ok(()) } /// Add the common extensions if not already configured.