refactor(threads): adapt to Ruma API changes for thread subscriptions

This commit is contained in:
Benjamin Bouvier
2025-08-04 17:51:01 +02:00
parent 8d3b1d3c7e
commit ebb7059d55
5 changed files with 77 additions and 21 deletions

View File

@@ -1112,7 +1112,7 @@ impl Room {
let thread_root = EventId::parse(thread_root_event_id)?;
if subscribed {
// This is a manual subscription.
let automatic = false;
let automatic = None;
self.inner.subscribe_thread(thread_root, automatic).await?;
} else {
self.inner.unsubscribe_thread(thread_root).await?;

View File

@@ -3655,35 +3655,63 @@ impl Room {
///
/// - `thread_root`: The ID of the thread root event to subscribe to.
/// - `automatic`: Whether the subscription was made automatically by a
/// client, not by manual user choice. If there was a previous automatic
/// subscription, and that's set to `true` (i.e. we're now subscribing
/// manually), the subscription will be overridden to a manual one
/// instead.
/// client, not by manual user choice. If set, must include the latest
/// event ID that's known in the thread and that is causing the automatic
/// subscription. If unset (i.e. we're now subscribing manually) and there
/// was a previous automatic subscription, the subscription will be
/// overridden to a manual one instead.
///
/// # Returns
///
/// - A 404 error if the event isn't known, or isn't a thread root.
/// - An `Ok` result if the subscription was successful.
pub async fn subscribe_thread(&self, thread_root: OwnedEventId, automatic: bool) -> Result<()> {
self.client
/// - An `Ok` result if the subscription was successful, or if the server
/// skipped an automatic subscription (as the user unsubscribed from the
/// thread after the event causing the automatic subscription).
pub async fn subscribe_thread(
&self,
thread_root: OwnedEventId,
automatic: Option<OwnedEventId>,
) -> Result<()> {
let is_automatic = automatic.is_some();
match self
.client
.send(subscribe_thread::unstable::Request::new(
self.room_id().to_owned(),
thread_root.clone(),
automatic,
))
.await?;
.await
{
Ok(_response) => {
trace!("Server acknowledged the thread subscription; saving in db");
// Immediately save the result into the database.
self.client
.state_store()
.upsert_thread_subscription(
self.room_id(),
&thread_root,
ThreadStatus::Subscribed { automatic: is_automatic },
)
.await?;
// Immediately save the result into the database.
self.client
.state_store()
.upsert_thread_subscription(
self.room_id(),
&thread_root,
ThreadStatus::Subscribed { automatic },
)
.await?;
Ok(())
}
Ok(())
Err(err) => {
if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
// In this case: the server indicates that the user unsubscribed *after* the
// event ID we've used in an automatic subscription; don't
// save the subscription state in the database, as the
// previous one should be more correct.
trace!("Thread subscription skipped: {err}");
Ok(())
} else {
// Forward the error to the caller.
Err(err.into())
}
}
}
}
/// Unsubscribe from a given thread in this room.

View File

@@ -3879,6 +3879,17 @@ impl<'a> MockEndpoint<'a, PutThreadSubscriptionEndpoint> {
self.respond_with(ResponseTemplate::new(200))
}
/// Returns that the server skipped an automated thread subscription,
/// because the user unsubscribed to the thread after the event id passed in
/// the automatic subscription.
pub fn conflicting_unsubscription(mut self) -> MatrixMock<'a> {
self.mock = self.mock.and(path_regex(self.endpoint.matchers.endpoint_regexp_uri()));
self.respond_with(ResponseTemplate::new(409).set_body_json(json!({
"errcode": "IO.ELEMENT.MSC4306.M_CONFLICTING_UNSUBSCRIPTION",
"error": "the user unsubscribed after the subscription event id"
})))
}
/// Match the request parameter against a specific room id.
pub fn match_room_id(mut self, room_id: OwnedRoomId) -> Self {
self.endpoint.matchers = self.endpoint.matchers.match_room_id(room_id);

View File

@@ -23,7 +23,7 @@ async fn test_subscribe_thread() {
.await;
// I can subscribe to a thread.
room.subscribe_thread(root_id.clone(), true).await.unwrap();
room.subscribe_thread(root_id.clone(), Some(root_id.clone())).await.unwrap();
server
.mock_get_thread_subscription()
@@ -58,6 +58,23 @@ async fn test_subscribe_thread() {
// Now, if I retry to get the subscription status for this thread, it's
// unsubscribed.
let subscription = room.fetch_thread_subscription(root_id.clone()).await.unwrap();
assert_matches!(subscription, Some(ThreadStatus::Unsubscribed));
// Subscribing automatically to the thread may also return a `M_SKIPPED` error
// that should be non-fatal.
server
.mock_put_thread_subscription()
.match_room_id(room_id.to_owned())
.match_thread_id(root_id.clone())
.conflicting_unsubscription()
.mock_once()
.mount()
.await;
room.subscribe_thread(root_id.clone(), Some(root_id.clone())).await.unwrap();
// And in this case, the thread is still unsubscribed.
let subscription = room.fetch_thread_subscription(root_id).await.unwrap();
assert_matches!(subscription, Some(ThreadStatus::Unsubscribed));
}

View File

@@ -491,7 +491,7 @@ impl RoomView {
async fn subscribe_thread(&mut self) {
if let TimelineKind::Thread { thread_root, .. } = &self.kind {
self.call_with_room(async |room, status_handle| {
if let Err(err) = room.subscribe_thread(thread_root.clone(), false).await {
if let Err(err) = room.subscribe_thread(thread_root.clone(), None).await {
status_handle.set_message(format!("error when subscribing to a thread: {err}"));
} else {
status_handle.set_message("Subscribed to thread!".to_owned());