feat(sdk): Bulk process thread subscription updates from sync and companion enpoint

This commit is contained in:
razvp
2025-11-10 19:25:16 +02:00
committed by Ivan Enderlin
parent 67b1de613c
commit ea43e3f5a8
2 changed files with 70 additions and 64 deletions

View File

@@ -82,6 +82,9 @@ All notable changes to this project will be documented in this file.
([#5678](https://github.com/matrix-org/matrix-rust-sdk/pull/5678).
- Add new API to decline calls ([MSC4310](https://github.com/matrix-org/matrix-spec-proposals/pull/4310)): `Room::make_decline_call_event` and `Room::subscribe_to_call_decline_events`
([#5614](https://github.com/matrix-org/matrix-rust-sdk/pull/5614))
- Use `StateStore::upsert_thread_subscriptions()` to bulk process thread subscription updates received
via the sync response or from the MSC4308 companion endpoint.
([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848))
### Refactor

View File

@@ -28,7 +28,7 @@ use matrix_sdk_base::{
use matrix_sdk_common::executor::spawn;
use once_cell::sync::OnceCell;
use ruma::{
OwnedEventId, OwnedRoomId,
EventId, OwnedEventId, OwnedRoomId, RoomId,
api::client::threads::get_thread_subscriptions_changes::unstable::{
ThreadSubscription, ThreadUnsubscription,
},
@@ -179,70 +179,21 @@ impl ThreadSubscriptionCatchup {
unsubscribed: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadUnsubscription>>,
token: Option<ThreadSubscriptionCatchupToken>,
) -> Result<()> {
// Precompute the updates so we don't hold the guard for too long.
let updates = build_subscription_updates(&subscribed, &unsubscribed);
let Some(guard) = self.lock().await else {
// Client is shutting down.
return Ok(());
};
self.save_catchup_token(&guard, token).await?;
self.store_subscriptions(&guard, subscribed, unsubscribed).await?;
Ok(())
}
async fn store_subscriptions(
&self,
guard: &GuardedStoreAccess,
subscribed: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadSubscription>>,
unsubscribed: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadUnsubscription>>,
) -> Result<()> {
if subscribed.is_empty() && unsubscribed.is_empty() {
// Nothing to do.
return Ok(());
if !updates.is_empty() {
trace!(
"saving {} new subscriptions and {} unsubscriptions",
subscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
unsubscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
);
guard.client.state_store().upsert_thread_subscriptions(updates).await?;
}
trace!(
"saving {} new subscriptions and {} unsubscriptions",
subscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
unsubscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
);
// Take into account the new unsubscriptions.
for (room_id, room_map) in unsubscribed {
for (event_id, thread_sub) in room_map {
guard
.client
.state_store()
.upsert_thread_subscription(
&room_id,
&event_id,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(thread_sub.bump_stamp.into()),
},
)
.await?;
}
}
// Take into account the new subscriptions.
for (room_id, room_map) in subscribed {
for (event_id, thread_sub) in room_map {
guard
.client
.state_store()
.upsert_thread_subscription(
&room_id,
&event_id,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed {
automatic: thread_sub.automatic,
},
bump_stamp: Some(thread_sub.bump_stamp.into()),
},
)
.await?;
}
}
Ok(())
}
@@ -348,16 +299,27 @@ impl ThreadSubscriptionCatchup {
match client.send(req).await {
Ok(resp) => {
// Precompute the updates so we don't hold the guard for too long.
let updates = build_subscription_updates(&resp.subscribed, &resp.unsubscribed);
let guard = this
.lock()
.await
.expect("a client instance is alive, so the locking should not fail");
if let Err(err) =
this.store_subscriptions(&guard, resp.subscribed, resp.unsubscribed).await
{
warn!("Failed to store caught up thread subscriptions: {err}");
continue;
if !updates.is_empty() {
trace!(
"saving {} new subscriptions and {} unsubscriptions",
resp.subscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
resp.unsubscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
);
if let Err(err) =
guard.client.state_store().upsert_thread_subscriptions(updates).await
{
warn!("Failed to store caught up thread subscriptions: {err}");
continue;
}
}
// Refresh the tokens, as the list might have changed while we sent the
@@ -398,6 +360,47 @@ impl ThreadSubscriptionCatchup {
}
}
/// Internal helper for building the thread subscription updates Vec.
fn build_subscription_updates<'a>(
subscribed: &'a BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadSubscription>>,
unsubscribed: &'a BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadUnsubscription>>,
) -> Vec<(&'a RoomId, &'a EventId, StoredThreadSubscription)> {
let mut updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)> =
Vec::with_capacity(unsubscribed.len() + subscribed.len());
// Take into account the new unsubscriptions.
for (room_id, room_map) in unsubscribed {
for (event_id, thread_sub) in room_map {
updates.push((
room_id,
event_id,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(thread_sub.bump_stamp.into()),
},
));
}
}
// Take into account the new subscriptions.
for (room_id, room_map) in subscribed {
for (event_id, thread_sub) in room_map {
updates.push((
room_id,
event_id,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed {
automatic: thread_sub.automatic,
},
bump_stamp: Some(thread_sub.bump_stamp.into()),
},
));
}
}
updates
}
#[cfg(test)]
mod tests {
use std::ops::Not as _;