diff --git a/Cargo.toml b/Cargo.toml index 64839198c..c1ac2d989 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,8 @@ ruma = { path = "/home/work/code/ruma/crates/ruma", features = [ "unstable-msc4222", "unstable-msc4278", "unstable-msc4286", - "unstable-msc4306" + "unstable-msc4306", + "unstable-msc4308" ] } sentry = "0.36.0" sentry-tracing = "0.36.0" diff --git a/crates/matrix-sdk-base/src/room/call.rs b/crates/matrix-sdk-base/src/room/call.rs index 34d789e63..ccf075daf 100644 --- a/crates/matrix-sdk-base/src/room/call.rs +++ b/crates/matrix-sdk-base/src/room/call.rs @@ -144,6 +144,7 @@ mod tests { focus_active, foci_preferred, Some(timestamp(minutes_ago)), + None, ), CallMemberStateKey::new(user_id.to_owned(), Some(member_id), false), ) diff --git a/crates/matrix-sdk/CHANGELOG.md b/crates/matrix-sdk/CHANGELOG.md index 71a66492e..975a765f3 100644 --- a/crates/matrix-sdk/CHANGELOG.md +++ b/crates/matrix-sdk/CHANGELOG.md @@ -8,6 +8,10 @@ All notable changes to this project will be documented in this file. ### Features +- `Client::fetch_thread_subscriptions` implements support for the companion endpoint of the + experimental MSC4308, allowing to fetch thread subscriptions for a given range, as specified by + the MSC. + ([#XXXX](https://github.com/matrix-org/matrix-rust-sdk/pull/XXXX)) - `Room::enable_encryption` and `Room::enable_encryption_with_state_event_encryption` will poll the encryption state for up to 3 seconds, rather than checking once after a single sync has completed. diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 0337ea01b..7275d28ba 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -61,6 +61,7 @@ use ruma::{ room::create_room, session::login::v3::DiscoveryInfo, sync::sync_events, + threads::get_thread_subscriptions_changes, uiaa, user_directory::search_users, }, @@ -2885,6 +2886,25 @@ impl Client { ThreadingSupport::Disabled => false, } } + + /// Fetch thread subscriptions changes between `from` and up to `to`. + /// + /// The `limit` optional parameter can be used to limit the number of + /// entries in a response. It can also be overridden by the server, if + /// it's deemed too large. + pub async fn fetch_thread_subscriptions( + &self, + from: Option, + to: Option, + limit: Option, + ) -> Result { + let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), { + from, + to, + limit, + }); + Ok(self.send(request).await?) + } } #[cfg(any(feature = "testing", test))] diff --git a/crates/matrix-sdk/src/test_utils/mocks/mod.rs b/crates/matrix-sdk/src/test_utils/mocks/mod.rs index fccdb4d3b..e877dd176 100644 --- a/crates/matrix-sdk/src/test_utils/mocks/mod.rs +++ b/crates/matrix-sdk/src/test_utils/mocks/mod.rs @@ -30,7 +30,13 @@ use matrix_sdk_test::{ }; use percent_encoding::{AsciiSet, CONTROLS}; use ruma::{ - api::client::{receipt::create_receipt::v3::ReceiptType, room::Visibility}, + api::client::{ + receipt::create_receipt::v3::ReceiptType, + room::Visibility, + threads::get_thread_subscriptions_changes::unstable::{ + ThreadSubscription, ThreadUnsubscription, + }, + }, device_id, directory::PublicRoomsChunk, encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, @@ -1346,8 +1352,8 @@ impl MatrixMockServer { self.mock_endpoint(mock, AuthedMediaThumbnailEndpoint).expect_default_access_token() } - /// Create a prebuilt mock for the endpoint used to get a thread - /// subscription in a given room. + /// Create a prebuilt mock for the endpoint used to get a single thread + /// subscription status in a given room. pub fn mock_get_thread_subscription(&self) -> MockEndpoint<'_, GetThreadSubscriptionEndpoint> { let mock = Mock::given(method("GET")); self.mock_endpoint(mock, GetThreadSubscriptionEndpoint::default()) @@ -1427,6 +1433,17 @@ impl MatrixMockServer { let mock = Mock::given(method("GET")).and(path("/_matrix/federation/v1/version")); self.mock_endpoint(mock, FederationVersionEndpoint) } + + /// Create a prebuilt mock for the endpoint used to get all thread + /// subscriptions across all rooms. + pub fn mock_get_thread_subscriptions( + &self, + ) -> MockEndpoint<'_, GetThreadSubscriptionsEndpoint> { + let mock = Mock::given(method("GET")) + .and(path_regex(r"^/_matrix/client/unstable/io.element.msc4308/thread_subscriptions$")); + self.mock_endpoint(mock, GetThreadSubscriptionsEndpoint::default()) + .expect_default_access_token() + } } /// A specification for a push rule ID. @@ -4136,3 +4153,56 @@ impl<'a> MockEndpoint<'a, FederationVersionEndpoint> { self.respond_with(ResponseTemplate::new(200).set_body_json(response_body)) } } + +/// A prebuilt mock for `GET ^/_matrix/client/v3/thread_subscriptions`. +#[derive(Default)] +pub struct GetThreadSubscriptionsEndpoint { + /// New thread subscriptions per (room id, thread root event id). + subscribed: BTreeMap>, + /// New thread unsubscriptions per (room id, thread root event id). + unsubscribed: BTreeMap>, +} + +impl<'a> MockEndpoint<'a, GetThreadSubscriptionsEndpoint> { + /// Add a single thread subscription to the response. + pub fn add_subscription( + mut self, + room_id: OwnedRoomId, + thread_root: OwnedEventId, + subscription: ThreadSubscription, + ) -> Self { + self.endpoint.subscribed.entry(room_id).or_default().insert(thread_root, subscription); + self + } + + /// Add a single thread unsubscription to the response. + pub fn add_unsubcription( + mut self, + room_id: OwnedRoomId, + thread_root: OwnedEventId, + unsubscription: ThreadUnsubscription, + ) -> Self { + self.endpoint.unsubscribed.entry(room_id).or_default().insert(thread_root, unsubscription); + self + } + + /// Match the `from` query parameter to a given value. + pub fn match_from(self, from: &str) -> Self { + Self { mock: self.mock.and(query_param("from", from)), ..self } + } + /// Match the `to` query parameter to a given value. + pub fn match_to(self, to: &str) -> Self { + Self { mock: self.mock.and(query_param("to", to)), ..self } + } + + /// Returns a successful response with the given thread subscriptions, and + /// "end" parameter to be used in the next query. + pub fn ok(self, end: Option) -> MatrixMock<'a> { + let response_body = json!({ + "subscribed": self.endpoint.subscribed, + "unsubscribed": self.endpoint.unsubscribed, + "end": end, + }); + self.respond_with(ResponseTemplate::new(200).set_body_json(response_body)) + } +} diff --git a/crates/matrix-sdk/tests/integration/client.rs b/crates/matrix-sdk/tests/integration/client.rs index 621cae0ad..ebe5a2756 100644 --- a/crates/matrix-sdk/tests/integration/client.rs +++ b/crates/matrix-sdk/tests/integration/client.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, time::Duration}; +use std::{collections::BTreeMap, ops::Not as _, time::Duration}; use assert_matches2::{assert_let, assert_matches}; use eyeball_im::VectorDiff; @@ -46,10 +46,11 @@ use ruma::{ room::{history_visibility::HistoryVisibility, member::MembershipState}, AnyInitialStateEvent, }, + owned_event_id, owned_room_id, room::JoinRule, room_id, serde::Raw, - user_id, OwnedUserId, + uint, user_id, OwnedUserId, }; use serde_json::{json, Value as JsonValue}; use stream_assert::{assert_next_matches, assert_pending}; @@ -1516,3 +1517,66 @@ async fn test_server_vendor_info_with_missing_fields() { assert_eq!(server_info.server_name, "unknown"); assert_eq!(server_info.version, "unknown"); } + +#[async_test] +async fn test_fetch_thread_subscriptions() { + use ruma::api::client::threads::get_thread_subscriptions_changes::unstable::{ + ThreadSubscription, ThreadUnsubscription, + }; + + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + let room1 = owned_room_id!("!room1:example.com"); + let room2 = owned_room_id!("!room2:example.com"); + let room3 = owned_room_id!("!room3:example.com"); + + let thread1 = owned_event_id!("$thread1:example.com"); + let thread2 = owned_event_id!("$thread2:example.com"); + let thread3 = owned_event_id!("$thread3:example.com"); + + server + .mock_get_thread_subscriptions() + .match_from("from") + .match_to("to") + .add_subscription( + room1.clone(), + thread1.clone(), + ThreadSubscription { automatic: true, bump_stamp: uint!(42) }, + ) + .add_subscription( + room2.clone(), + thread2.clone(), + ThreadSubscription { automatic: false, bump_stamp: uint!(7) }, + ) + .add_unsubcription( + room3.clone(), + thread3.clone(), + ThreadUnsubscription { bump_stamp: uint!(13) }, + ) + .ok(Some("next_batch_token".to_owned())) + .mount() + .await; + + let response = client + .fetch_thread_subscriptions(Some("from".to_owned()), Some("to".to_owned()), None) + .await + .unwrap(); + + assert_eq!(response.end.as_deref(), Some("next_batch_token")); + + assert_eq!(response.subscribed.len(), 2); + + let s1 = &response.subscribed[&room1][&thread1]; + assert!(s1.automatic); + assert_eq!(s1.bump_stamp, uint!(42)); + + let s2 = &response.subscribed[&room2][&thread2]; + assert!(s2.automatic.not()); + assert_eq!(s2.bump_stamp, uint!(7)); + + assert_eq!(response.unsubscribed.len(), 1); + + let u = &response.unsubscribed[&room3][&thread3]; + assert_eq!(u.bump_stamp, uint!(13)); +}