feat(sdk): SlidingSync::add_list has an immediate effect.

`SlidingSync::add_list` sends `ContinueSyncLoop` on the internal channel.
This commit is contained in:
Ivan Enderlin
2023-05-17 11:27:11 +02:00
parent 501b990248
commit 53c20fcf1a

View File

@@ -140,7 +140,7 @@ impl SlidingSync {
}
/// Subscribe to a given room.
pub fn subscribe_to_room(
pub async fn subscribe_to_room(
&self,
room_id: OwnedRoomId,
settings: Option<v4::RoomSubscription>,
@@ -150,28 +150,17 @@ impl SlidingSync {
.write()
.unwrap()
.insert(room_id, settings.unwrap_or_default());
self.inner
.internal_channel
.0
.blocking_send(SlidingSyncInternalMessage::ContinueSyncLoop)
.map_err(|_| Error::InternalChannelIsBroken)?;
self.inner.internal_channel_send(SlidingSyncInternalMessage::ContinueSyncLoop).await?;
Ok(())
}
/// Unsubscribe from a given room.
pub fn unsubscribe_from_room(&self, room_id: OwnedRoomId) -> Result<()> {
pub async fn unsubscribe_from_room(&self, room_id: OwnedRoomId) -> Result<()> {
// If removing the subscription was successful…
if self.inner.room_subscriptions.write().unwrap().remove(&room_id).is_some() {
// … then keep the unsubscription for the next request.
self.inner.room_unsubscriptions.write().unwrap().insert(room_id);
self.inner
.internal_channel
.0
.blocking_send(SlidingSyncInternalMessage::ContinueSyncLoop)
.map_err(|_| Error::InternalChannelIsBroken)?;
self.inner.internal_channel_send(SlidingSyncInternalMessage::ContinueSyncLoop).await?;
}
Ok(())
@@ -233,11 +222,12 @@ impl SlidingSync {
/// As lists need to have a unique `.name`, if a list with the same name
/// is found the new list will replace the old one and the return it or
/// `None`.
pub fn add_list(
pub async fn add_list(
&self,
list_builder: SlidingSyncListBuilder,
) -> Result<Option<SlidingSyncList>> {
let list = list_builder.build(self.inner.internal_channel.0.clone());
self.inner.internal_channel_send(SlidingSyncInternalMessage::ContinueSyncLoop).await?;
Ok(self.inner.lists.write().unwrap().insert(list.name().to_owned(), list))
}
@@ -269,7 +259,7 @@ impl SlidingSync {
}
}
self.add_list(list_builder)
self.add_list(list_builder).await
}
/// Lookup a set of rooms
@@ -676,6 +666,18 @@ impl SlidingSync {
}
}
impl SlidingSyncInner {
/// Send a message over the internal channel.
async fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<()> {
Ok(self
.internal_channel
.0
.send(message)
.await
.map_err(|_| Error::InternalChannelIsBroken)?)
}
}
#[derive(Debug)]
enum SlidingSyncInternalMessage {
#[allow(unused)] // temporary
@@ -742,8 +744,12 @@ pub struct UpdateSummary {
#[cfg(test)]
mod test {
use assert_matches::assert_matches;
use ruma::api::client::sync::sync_events::v4::{E2EEConfig, ToDeviceConfig};
use wiremock::MockServer;
use futures::pin_mut;
use ruma::{
api::client::sync::sync_events::v4::{E2EEConfig, ToDeviceConfig},
room_id,
};
use wiremock::{http::Method, Match, MockServer, Request};
use super::*;
use crate::test_utils::logged_in_client;
@@ -787,4 +793,106 @@ mod test {
Ok(())
}
struct SlidingSyncMatcher;
impl Match for SlidingSyncMatcher {
fn matches(&self, request: &Request) -> bool {
request.url.path() == "/_matrix/client/unstable/org.matrix.msc3575/sync"
&& request.method == Method::Post
}
}
async fn new_sliding_sync(
lists: Vec<SlidingSyncListBuilder>,
) -> Result<(MockServer, SlidingSync)> {
let server = MockServer::start().await;
let client = logged_in_client(Some(server.uri())).await;
let mut sliding_sync_builder = client.sliding_sync().await;
for list in lists {
sliding_sync_builder = sliding_sync_builder.add_list(list);
}
let sliding_sync = sliding_sync_builder.build().await?;
Ok((server, sliding_sync))
}
#[tokio::test]
async fn test_subscribe_to_room() -> Result<()> {
let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
.sync_mode(SlidingSyncMode::Selective)
.set_range(0..=10)])
.await?;
let _stream = sliding_sync.stream();
pin_mut!(_stream);
let room0 = room_id!("!r0:bar.org").to_owned();
let room1 = room_id!("!r1:bar.org").to_owned();
let room2 = room_id!("!r2:bar.org").to_owned();
sliding_sync.subscribe_to_room(room0.clone(), None).await?;
sliding_sync.subscribe_to_room(room1.clone(), None).await?;
{
let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
assert!(room_subscriptions.contains_key(&room0));
assert!(room_subscriptions.contains_key(&room1));
assert!(!room_subscriptions.contains_key(&room2));
}
sliding_sync.unsubscribe_from_room(room0.clone()).await?;
sliding_sync.unsubscribe_from_room(room2.clone()).await?;
{
let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
assert!(!room_subscriptions.contains_key(&room0));
assert!(room_subscriptions.contains_key(&room1));
assert!(!room_subscriptions.contains_key(&room2));
let room_unsubscriptions = sliding_sync.inner.room_unsubscriptions.read().unwrap();
assert!(room_unsubscriptions.contains(&room0));
assert!(!room_unsubscriptions.contains(&room1));
assert!(!room_unsubscriptions.contains(&room2));
}
// this test also ensures that Tokio is not panicking when calling
// `subscribe_to_room` and `unsubscribe_from_room`.
Ok(())
}
#[tokio::test]
async fn test_add_list() -> Result<()> {
let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
.sync_mode(SlidingSyncMode::Selective)
.set_range(0..=10)])
.await?;
let _stream = sliding_sync.stream();
pin_mut!(_stream);
sliding_sync
.add_list(
SlidingSyncList::builder("bar")
.sync_mode(SlidingSyncMode::Selective)
.set_range(50..=60),
)
.await?;
let lists = sliding_sync.inner.lists.read().unwrap();
assert!(lists.contains_key("foo"));
assert!(lists.contains_key("bar"));
// this test also ensures that Tokio is not panicking when calling `add_list`.
Ok(())
}
}