feat(sdk): add support for msc4308 accompanying endpoint (fetching thread subscriptions)

This commit is contained in:
Benjamin Bouvier
2025-08-25 17:25:53 +02:00
parent 80fc6c20f8
commit b6aed4e10e
6 changed files with 166 additions and 6 deletions

View File

@@ -78,7 +78,8 @@ ruma = { path = "/home/work/code/ruma/crates/ruma", features = [
"unstable-msc4222",
"unstable-msc4278",
"unstable-msc4286",
"unstable-msc4306"
"unstable-msc4306",
"unstable-msc4308"
] }
sentry = "0.36.0"
sentry-tracing = "0.36.0"

View File

@@ -144,6 +144,7 @@ mod tests {
focus_active,
foci_preferred,
Some(timestamp(minutes_ago)),
None,
),
CallMemberStateKey::new(user_id.to_owned(), Some(member_id), false),
)

View File

@@ -8,6 +8,10 @@ All notable changes to this project will be documented in this file.
### Features
- `Client::fetch_thread_subscriptions` implements support for the companion endpoint of the
experimental MSC4308, allowing to fetch thread subscriptions for a given range, as specified by
the MSC.
([#XXXX](https://github.com/matrix-org/matrix-rust-sdk/pull/XXXX))
- `Room::enable_encryption` and `Room::enable_encryption_with_state_event_encryption` will poll
the encryption state for up to 3 seconds, rather than checking once after a single sync has
completed.

View File

@@ -61,6 +61,7 @@ use ruma::{
room::create_room,
session::login::v3::DiscoveryInfo,
sync::sync_events,
threads::get_thread_subscriptions_changes,
uiaa,
user_directory::search_users,
},
@@ -2885,6 +2886,25 @@ impl Client {
ThreadingSupport::Disabled => false,
}
}
/// Fetch thread subscriptions changes between `from` and up to `to`.
///
/// The `limit` optional parameter can be used to limit the number of
/// entries in a response. It can also be overridden by the server, if
/// it's deemed too large.
pub async fn fetch_thread_subscriptions(
&self,
from: Option<String>,
to: Option<String>,
limit: Option<UInt>,
) -> Result<get_thread_subscriptions_changes::unstable::Response> {
let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
from,
to,
limit,
});
Ok(self.send(request).await?)
}
}
#[cfg(any(feature = "testing", test))]

View File

@@ -30,7 +30,13 @@ use matrix_sdk_test::{
};
use percent_encoding::{AsciiSet, CONTROLS};
use ruma::{
api::client::{receipt::create_receipt::v3::ReceiptType, room::Visibility},
api::client::{
receipt::create_receipt::v3::ReceiptType,
room::Visibility,
threads::get_thread_subscriptions_changes::unstable::{
ThreadSubscription, ThreadUnsubscription,
},
},
device_id,
directory::PublicRoomsChunk,
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
@@ -1346,8 +1352,8 @@ impl MatrixMockServer {
self.mock_endpoint(mock, AuthedMediaThumbnailEndpoint).expect_default_access_token()
}
/// Create a prebuilt mock for the endpoint used to get a thread
/// subscription in a given room.
/// Create a prebuilt mock for the endpoint used to get a single thread
/// subscription status in a given room.
pub fn mock_get_thread_subscription(&self) -> MockEndpoint<'_, GetThreadSubscriptionEndpoint> {
let mock = Mock::given(method("GET"));
self.mock_endpoint(mock, GetThreadSubscriptionEndpoint::default())
@@ -1427,6 +1433,17 @@ impl MatrixMockServer {
let mock = Mock::given(method("GET")).and(path("/_matrix/federation/v1/version"));
self.mock_endpoint(mock, FederationVersionEndpoint)
}
/// Create a prebuilt mock for the endpoint used to get all thread
/// subscriptions across all rooms.
pub fn mock_get_thread_subscriptions(
&self,
) -> MockEndpoint<'_, GetThreadSubscriptionsEndpoint> {
let mock = Mock::given(method("GET"))
.and(path_regex(r"^/_matrix/client/unstable/io.element.msc4308/thread_subscriptions$"));
self.mock_endpoint(mock, GetThreadSubscriptionsEndpoint::default())
.expect_default_access_token()
}
}
/// A specification for a push rule ID.
@@ -4136,3 +4153,56 @@ impl<'a> MockEndpoint<'a, FederationVersionEndpoint> {
self.respond_with(ResponseTemplate::new(200).set_body_json(response_body))
}
}
/// A prebuilt mock for `GET ^/_matrix/client/v3/thread_subscriptions`.
#[derive(Default)]
pub struct GetThreadSubscriptionsEndpoint {
/// New thread subscriptions per (room id, thread root event id).
subscribed: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadSubscription>>,
/// New thread unsubscriptions per (room id, thread root event id).
unsubscribed: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadUnsubscription>>,
}
impl<'a> MockEndpoint<'a, GetThreadSubscriptionsEndpoint> {
/// Add a single thread subscription to the response.
pub fn add_subscription(
mut self,
room_id: OwnedRoomId,
thread_root: OwnedEventId,
subscription: ThreadSubscription,
) -> Self {
self.endpoint.subscribed.entry(room_id).or_default().insert(thread_root, subscription);
self
}
/// Add a single thread unsubscription to the response.
pub fn add_unsubcription(
mut self,
room_id: OwnedRoomId,
thread_root: OwnedEventId,
unsubscription: ThreadUnsubscription,
) -> Self {
self.endpoint.unsubscribed.entry(room_id).or_default().insert(thread_root, unsubscription);
self
}
/// Match the `from` query parameter to a given value.
pub fn match_from(self, from: &str) -> Self {
Self { mock: self.mock.and(query_param("from", from)), ..self }
}
/// Match the `to` query parameter to a given value.
pub fn match_to(self, to: &str) -> Self {
Self { mock: self.mock.and(query_param("to", to)), ..self }
}
/// Returns a successful response with the given thread subscriptions, and
/// "end" parameter to be used in the next query.
pub fn ok(self, end: Option<String>) -> MatrixMock<'a> {
let response_body = json!({
"subscribed": self.endpoint.subscribed,
"unsubscribed": self.endpoint.unsubscribed,
"end": end,
});
self.respond_with(ResponseTemplate::new(200).set_body_json(response_body))
}
}

View File

@@ -1,4 +1,4 @@
use std::{collections::BTreeMap, time::Duration};
use std::{collections::BTreeMap, ops::Not as _, time::Duration};
use assert_matches2::{assert_let, assert_matches};
use eyeball_im::VectorDiff;
@@ -46,10 +46,11 @@ use ruma::{
room::{history_visibility::HistoryVisibility, member::MembershipState},
AnyInitialStateEvent,
},
owned_event_id, owned_room_id,
room::JoinRule,
room_id,
serde::Raw,
user_id, OwnedUserId,
uint, user_id, OwnedUserId,
};
use serde_json::{json, Value as JsonValue};
use stream_assert::{assert_next_matches, assert_pending};
@@ -1516,3 +1517,66 @@ async fn test_server_vendor_info_with_missing_fields() {
assert_eq!(server_info.server_name, "unknown");
assert_eq!(server_info.version, "unknown");
}
#[async_test]
async fn test_fetch_thread_subscriptions() {
use ruma::api::client::threads::get_thread_subscriptions_changes::unstable::{
ThreadSubscription, ThreadUnsubscription,
};
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let room1 = owned_room_id!("!room1:example.com");
let room2 = owned_room_id!("!room2:example.com");
let room3 = owned_room_id!("!room3:example.com");
let thread1 = owned_event_id!("$thread1:example.com");
let thread2 = owned_event_id!("$thread2:example.com");
let thread3 = owned_event_id!("$thread3:example.com");
server
.mock_get_thread_subscriptions()
.match_from("from")
.match_to("to")
.add_subscription(
room1.clone(),
thread1.clone(),
ThreadSubscription { automatic: true, bump_stamp: uint!(42) },
)
.add_subscription(
room2.clone(),
thread2.clone(),
ThreadSubscription { automatic: false, bump_stamp: uint!(7) },
)
.add_unsubcription(
room3.clone(),
thread3.clone(),
ThreadUnsubscription { bump_stamp: uint!(13) },
)
.ok(Some("next_batch_token".to_owned()))
.mount()
.await;
let response = client
.fetch_thread_subscriptions(Some("from".to_owned()), Some("to".to_owned()), None)
.await
.unwrap();
assert_eq!(response.end.as_deref(), Some("next_batch_token"));
assert_eq!(response.subscribed.len(), 2);
let s1 = &response.subscribed[&room1][&thread1];
assert!(s1.automatic);
assert_eq!(s1.bump_stamp, uint!(42));
let s2 = &response.subscribed[&room2][&thread2];
assert!(s2.automatic.not());
assert_eq!(s2.bump_stamp, uint!(7));
assert_eq!(response.unsubscribed.len(), 1);
let u = &response.unsubscribed[&room3][&thread3];
assert_eq!(u.bump_stamp, uint!(13));
}