feat(sdk): SlidingSync::*subscribe will cancel in-flight requests.

`SlidingSync::subscribe` and `SlidingSync::unsubscribe` will cancel in-
flight requests, i.e. the `SlidingSyncInternalMessage::ContinueSyncLoop`
will be sent in the internal channel, just like what `SlidingSyncList`s
already do when a parameter is changed.
This commit is contained in:
Ivan Enderlin
2023-05-08 16:44:49 +02:00
parent 748ae86a88
commit bfcedcd49c
2 changed files with 29 additions and 14 deletions

View File

@@ -219,10 +219,10 @@ impl SlidingSyncRoom {
let (items, mut stoppable_spawn) = self.add_timeline_listener_inner(listener)?;
let room_id = self.inner.room_id().clone();
self.runner.subscribe(room_id.clone(), settings.map(Into::into));
self.runner.subscribe(room_id.clone(), settings.map(Into::into))?;
let runner = self.runner.clone();
stoppable_spawn.set_finalizer(Box::new(move || runner.unsubscribe(room_id)));
stoppable_spawn.set_finalizer(Box::new(move || runner.unsubscribe(room_id).unwrap()));
Ok(SlidingSyncSubscribeResult { items, task_handle: Arc::new(stoppable_spawn) })
}
@@ -547,6 +547,7 @@ impl SlidingSyncListBuilder {
pub fn once_built(self: Arc<Self>, callback: Box<dyn SlidingSyncListOnceBuilt>) -> Arc<Self> {
let mut builder = unwrap_or_clone_arc(self);
builder.inner = builder.inner.once_built(
move |list: matrix_sdk::SlidingSyncList| -> matrix_sdk::SlidingSyncList {
let list = callback.update_list(Arc::new(list.into()));
@@ -695,12 +696,14 @@ impl SlidingSync {
room_id: String,
settings: Option<RoomSubscription>,
) -> Result<(), ClientError> {
self.inner.subscribe(room_id.try_into()?, settings.map(Into::into));
self.inner.subscribe(room_id.try_into()?, settings.map(Into::into))?;
Ok(())
}
pub fn unsubscribe(&self, room_id: String) -> Result<(), ClientError> {
self.inner.unsubscribe(room_id.try_into()?);
self.inner.unsubscribe(room_id.try_into()?)?;
Ok(())
}

View File

@@ -139,23 +139,35 @@ impl SlidingSync {
}
/// Subscribe to a given room.
///
/// Note: this does not cancel any pending request, so make sure to only
/// poll the stream after you've altered this. If you do that during, it
/// might take one round trip to take effect.
pub fn subscribe(&self, room_id: OwnedRoomId, settings: Option<v4::RoomSubscription>) {
pub fn subscribe(
&self,
room_id: OwnedRoomId,
settings: Option<v4::RoomSubscription>,
) -> Result<()> {
self.inner.subscriptions.write().unwrap().insert(room_id, settings.unwrap_or_default());
self.inner
.internal_channel
.0
.blocking_send(SlidingSyncInternalMessage::ContinueSyncLoop)
.map_err(|_| Error::InternalChannelIsBroken)?;
Ok(())
}
/// Unsubscribe from a given room.
///
/// Note: this does not cancel any pending request, so make sure to only
/// poll the stream after you've altered this. If you do that during, it
/// might take one round trip to take effect.
pub fn unsubscribe(&self, room_id: OwnedRoomId) {
pub fn unsubscribe(&self, room_id: OwnedRoomId) -> Result<()> {
if self.inner.subscriptions.write().unwrap().remove(&room_id).is_some() {
self.inner.unsubscribe.write().unwrap().push(room_id);
self.inner
.internal_channel
.0
.blocking_send(SlidingSyncInternalMessage::ContinueSyncLoop)
.map_err(|_| Error::InternalChannelIsBroken)?;
}
Ok(())
}
/// Add the common extensions if not already configured.