diff --git a/crates/matrix-sdk/CHANGELOG.md b/crates/matrix-sdk/CHANGELOG.md index 25aae4132..c0d9c1c1f 100644 --- a/crates/matrix-sdk/CHANGELOG.md +++ b/crates/matrix-sdk/CHANGELOG.md @@ -82,6 +82,9 @@ All notable changes to this project will be documented in this file. ([#5678](https://github.com/matrix-org/matrix-rust-sdk/pull/5678). - Add new API to decline calls ([MSC4310](https://github.com/matrix-org/matrix-spec-proposals/pull/4310)): `Room::make_decline_call_event` and `Room::subscribe_to_call_decline_events` ([#5614](https://github.com/matrix-org/matrix-rust-sdk/pull/5614)) +- Use `StateStore::upsert_thread_subscriptions()` to bulk process thread subscription updates received + via the sync response or from the MSC4308 companion endpoint. + ([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848)) ### Refactor diff --git a/crates/matrix-sdk/src/client/thread_subscriptions.rs b/crates/matrix-sdk/src/client/thread_subscriptions.rs index 45a9afe16..fc359da8e 100644 --- a/crates/matrix-sdk/src/client/thread_subscriptions.rs +++ b/crates/matrix-sdk/src/client/thread_subscriptions.rs @@ -28,7 +28,7 @@ use matrix_sdk_base::{ use matrix_sdk_common::executor::spawn; use once_cell::sync::OnceCell; use ruma::{ - OwnedEventId, OwnedRoomId, + EventId, OwnedEventId, OwnedRoomId, RoomId, api::client::threads::get_thread_subscriptions_changes::unstable::{ ThreadSubscription, ThreadUnsubscription, }, @@ -179,70 +179,21 @@ impl ThreadSubscriptionCatchup { unsubscribed: BTreeMap>, token: Option, ) -> Result<()> { + // Precompute the updates so we don't hold the guard for too long. + let updates = build_subscription_updates(&subscribed, &unsubscribed); let Some(guard) = self.lock().await else { // Client is shutting down. return Ok(()); }; self.save_catchup_token(&guard, token).await?; - self.store_subscriptions(&guard, subscribed, unsubscribed).await?; - Ok(()) - } - - async fn store_subscriptions( - &self, - guard: &GuardedStoreAccess, - subscribed: BTreeMap>, - unsubscribed: BTreeMap>, - ) -> Result<()> { - if subscribed.is_empty() && unsubscribed.is_empty() { - // Nothing to do. - return Ok(()); + if !updates.is_empty() { + trace!( + "saving {} new subscriptions and {} unsubscriptions", + subscribed.values().map(|by_room| by_room.len()).sum::(), + unsubscribed.values().map(|by_room| by_room.len()).sum::(), + ); + guard.client.state_store().upsert_thread_subscriptions(updates).await?; } - - trace!( - "saving {} new subscriptions and {} unsubscriptions", - subscribed.values().map(|by_room| by_room.len()).sum::(), - unsubscribed.values().map(|by_room| by_room.len()).sum::(), - ); - - // Take into account the new unsubscriptions. - for (room_id, room_map) in unsubscribed { - for (event_id, thread_sub) in room_map { - guard - .client - .state_store() - .upsert_thread_subscription( - &room_id, - &event_id, - StoredThreadSubscription { - status: ThreadSubscriptionStatus::Unsubscribed, - bump_stamp: Some(thread_sub.bump_stamp.into()), - }, - ) - .await?; - } - } - - // Take into account the new subscriptions. - for (room_id, room_map) in subscribed { - for (event_id, thread_sub) in room_map { - guard - .client - .state_store() - .upsert_thread_subscription( - &room_id, - &event_id, - StoredThreadSubscription { - status: ThreadSubscriptionStatus::Subscribed { - automatic: thread_sub.automatic, - }, - bump_stamp: Some(thread_sub.bump_stamp.into()), - }, - ) - .await?; - } - } - Ok(()) } @@ -348,16 +299,27 @@ impl ThreadSubscriptionCatchup { match client.send(req).await { Ok(resp) => { + // Precompute the updates so we don't hold the guard for too long. + let updates = build_subscription_updates(&resp.subscribed, &resp.unsubscribed); + let guard = this .lock() .await .expect("a client instance is alive, so the locking should not fail"); - if let Err(err) = - this.store_subscriptions(&guard, resp.subscribed, resp.unsubscribed).await - { - warn!("Failed to store caught up thread subscriptions: {err}"); - continue; + if !updates.is_empty() { + trace!( + "saving {} new subscriptions and {} unsubscriptions", + resp.subscribed.values().map(|by_room| by_room.len()).sum::(), + resp.unsubscribed.values().map(|by_room| by_room.len()).sum::(), + ); + + if let Err(err) = + guard.client.state_store().upsert_thread_subscriptions(updates).await + { + warn!("Failed to store caught up thread subscriptions: {err}"); + continue; + } } // Refresh the tokens, as the list might have changed while we sent the @@ -398,6 +360,47 @@ impl ThreadSubscriptionCatchup { } } +/// Internal helper for building the thread subscription updates Vec. +fn build_subscription_updates<'a>( + subscribed: &'a BTreeMap>, + unsubscribed: &'a BTreeMap>, +) -> Vec<(&'a RoomId, &'a EventId, StoredThreadSubscription)> { + let mut updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)> = + Vec::with_capacity(unsubscribed.len() + subscribed.len()); + + // Take into account the new unsubscriptions. + for (room_id, room_map) in unsubscribed { + for (event_id, thread_sub) in room_map { + updates.push(( + room_id, + event_id, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(thread_sub.bump_stamp.into()), + }, + )); + } + } + + // Take into account the new subscriptions. + for (room_id, room_map) in subscribed { + for (event_id, thread_sub) in room_map { + updates.push(( + room_id, + event_id, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { + automatic: thread_sub.automatic, + }, + bump_stamp: Some(thread_sub.bump_stamp.into()), + }, + )); + } + } + + updates +} + #[cfg(test)] mod tests { use std::ops::Not as _;