From 53c20fcf1aef9cdf22bf0c164d79bd90a6c47724 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 17 May 2023 11:27:11 +0200 Subject: [PATCH] feat(sdk): `SlidingSync::add_list` has an immediate effect. `SlidingSync::add_list` sends `ContinueSyncLoop` on the internal channel. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 146 +++++++++++++++++++--- 1 file changed, 127 insertions(+), 19 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 70210e751..313d099a0 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -140,7 +140,7 @@ impl SlidingSync { } /// Subscribe to a given room. - pub fn subscribe_to_room( + pub async fn subscribe_to_room( &self, room_id: OwnedRoomId, settings: Option, @@ -150,28 +150,17 @@ impl SlidingSync { .write() .unwrap() .insert(room_id, settings.unwrap_or_default()); - - self.inner - .internal_channel - .0 - .blocking_send(SlidingSyncInternalMessage::ContinueSyncLoop) - .map_err(|_| Error::InternalChannelIsBroken)?; + self.inner.internal_channel_send(SlidingSyncInternalMessage::ContinueSyncLoop).await?; Ok(()) } /// Unsubscribe from a given room. - pub fn unsubscribe_from_room(&self, room_id: OwnedRoomId) -> Result<()> { + pub async fn unsubscribe_from_room(&self, room_id: OwnedRoomId) -> Result<()> { // If removing the subscription was successful… if self.inner.room_subscriptions.write().unwrap().remove(&room_id).is_some() { - // … then keep the unsubscription for the next request. self.inner.room_unsubscriptions.write().unwrap().insert(room_id); - - self.inner - .internal_channel - .0 - .blocking_send(SlidingSyncInternalMessage::ContinueSyncLoop) - .map_err(|_| Error::InternalChannelIsBroken)?; + self.inner.internal_channel_send(SlidingSyncInternalMessage::ContinueSyncLoop).await?; } Ok(()) @@ -233,11 +222,12 @@ impl SlidingSync { /// As lists need to have a unique `.name`, if a list with the same name /// is found the new list will replace the old one and the return it or /// `None`. - pub fn add_list( + pub async fn add_list( &self, list_builder: SlidingSyncListBuilder, ) -> Result> { let list = list_builder.build(self.inner.internal_channel.0.clone()); + self.inner.internal_channel_send(SlidingSyncInternalMessage::ContinueSyncLoop).await?; Ok(self.inner.lists.write().unwrap().insert(list.name().to_owned(), list)) } @@ -269,7 +259,7 @@ impl SlidingSync { } } - self.add_list(list_builder) + self.add_list(list_builder).await } /// Lookup a set of rooms @@ -676,6 +666,18 @@ impl SlidingSync { } } +impl SlidingSyncInner { + /// Send a message over the internal channel. + async fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<()> { + Ok(self + .internal_channel + .0 + .send(message) + .await + .map_err(|_| Error::InternalChannelIsBroken)?) + } +} + #[derive(Debug)] enum SlidingSyncInternalMessage { #[allow(unused)] // temporary @@ -742,8 +744,12 @@ pub struct UpdateSummary { #[cfg(test)] mod test { use assert_matches::assert_matches; - use ruma::api::client::sync::sync_events::v4::{E2EEConfig, ToDeviceConfig}; - use wiremock::MockServer; + use futures::pin_mut; + use ruma::{ + api::client::sync::sync_events::v4::{E2EEConfig, ToDeviceConfig}, + room_id, + }; + use wiremock::{http::Method, Match, MockServer, Request}; use super::*; use crate::test_utils::logged_in_client; @@ -787,4 +793,106 @@ mod test { Ok(()) } + + struct SlidingSyncMatcher; + + impl Match for SlidingSyncMatcher { + fn matches(&self, request: &Request) -> bool { + request.url.path() == "/_matrix/client/unstable/org.matrix.msc3575/sync" + && request.method == Method::Post + } + } + + async fn new_sliding_sync( + lists: Vec, + ) -> Result<(MockServer, SlidingSync)> { + let server = MockServer::start().await; + let client = logged_in_client(Some(server.uri())).await; + + let mut sliding_sync_builder = client.sliding_sync().await; + + for list in lists { + sliding_sync_builder = sliding_sync_builder.add_list(list); + } + + let sliding_sync = sliding_sync_builder.build().await?; + + Ok((server, sliding_sync)) + } + + #[tokio::test] + async fn test_subscribe_to_room() -> Result<()> { + let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo") + .sync_mode(SlidingSyncMode::Selective) + .set_range(0..=10)]) + .await?; + + let _stream = sliding_sync.stream(); + pin_mut!(_stream); + + let room0 = room_id!("!r0:bar.org").to_owned(); + let room1 = room_id!("!r1:bar.org").to_owned(); + let room2 = room_id!("!r2:bar.org").to_owned(); + + sliding_sync.subscribe_to_room(room0.clone(), None).await?; + sliding_sync.subscribe_to_room(room1.clone(), None).await?; + + { + let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap(); + + assert!(room_subscriptions.contains_key(&room0)); + assert!(room_subscriptions.contains_key(&room1)); + assert!(!room_subscriptions.contains_key(&room2)); + } + + sliding_sync.unsubscribe_from_room(room0.clone()).await?; + sliding_sync.unsubscribe_from_room(room2.clone()).await?; + + { + let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap(); + + assert!(!room_subscriptions.contains_key(&room0)); + assert!(room_subscriptions.contains_key(&room1)); + assert!(!room_subscriptions.contains_key(&room2)); + + let room_unsubscriptions = sliding_sync.inner.room_unsubscriptions.read().unwrap(); + + assert!(room_unsubscriptions.contains(&room0)); + assert!(!room_unsubscriptions.contains(&room1)); + assert!(!room_unsubscriptions.contains(&room2)); + } + + // this test also ensures that Tokio is not panicking when calling + // `subscribe_to_room` and `unsubscribe_from_room`. + + Ok(()) + } + + #[tokio::test] + async fn test_add_list() -> Result<()> { + let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo") + .sync_mode(SlidingSyncMode::Selective) + .set_range(0..=10)]) + .await?; + + let _stream = sliding_sync.stream(); + pin_mut!(_stream); + + sliding_sync + .add_list( + SlidingSyncList::builder("bar") + .sync_mode(SlidingSyncMode::Selective) + .set_range(50..=60), + ) + .await?; + + let lists = sliding_sync.inner.lists.read().unwrap(); + + assert!(lists.contains_key("foo")); + assert!(lists.contains_key("bar")); + + // this test also ensures that Tokio is not panicking when calling `add_list`. + + Ok(()) + } }