From 67b1de613cfe35f76be424ffb0c7d7a3cc6fe040 Mon Sep 17 00:00:00 2001 From: razvp Date: Sun, 9 Nov 2025 19:33:05 +0200 Subject: [PATCH] feat(state-stores): Add `StateStore::upsert_thread_subscriptions()` method for bulk upsert --- crates/matrix-sdk-base/CHANGELOG.md | 2 + .../src/store/integration_tests.rs | 185 ++++++++++++++++++ .../matrix-sdk-base/src/store/memory_store.rs | 27 +++ crates/matrix-sdk-base/src/store/traits.rs | 23 ++- crates/matrix-sdk-indexeddb/CHANGELOG.md | 2 + .../src/state_store/mod.rs | 41 ++++ crates/matrix-sdk-sqlite/CHANGELOG.md | 2 + crates/matrix-sdk-sqlite/src/state_store.rs | 51 +++++ 8 files changed, 332 insertions(+), 1 deletion(-) diff --git a/crates/matrix-sdk-base/CHANGELOG.md b/crates/matrix-sdk-base/CHANGELOG.md index 45f960d79..2ac9014b7 100644 --- a/crates/matrix-sdk-base/CHANGELOG.md +++ b/crates/matrix-sdk-base/CHANGELOG.md @@ -50,6 +50,8 @@ All notable changes to this project will be documented in this file. ([#5817](https://github.com/matrix-org/matrix-rust-sdk/pull/5817)) - `ComposerDraft` can now store attachments alongside text messages. ([#5794](https://github.com/matrix-org/matrix-rust-sdk/pull/5794)) +- Add `StateStore::upsert_thread_subscriptions()` method for bulk upserts. + ([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848)) ## [0.14.1] - 2025-09-10 diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index b3dd03a0d..ca2f1652f 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -111,6 +111,8 @@ pub trait StateStoreIntegrationTests { async fn test_thread_subscriptions(&self) -> TestResult; /// Test thread subscription bumpstamp semantics. async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult; + /// Test thread subscriptions bulk upsert, including bumpstamp semantics. + async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult; } impl StateStoreIntegrationTests for DynStateStore { @@ -2004,6 +2006,183 @@ impl StateStoreIntegrationTests for DynStateStore { Ok(()) } + + async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult { + let threads = [ + event_id!("$t1"), + event_id!("$t2"), + event_id!("$t3"), + event_id!("$t4"), + event_id!("$t5"), + event_id!("$t6"), + ]; + // Helper for building the input for `upsert_thread_subscriptions()`, + // which is of the type: Vec<(&RoomId, &EventId, StoredThreadSubscription)> + let build_subscription_updates = |subs: &[StoredThreadSubscription]| { + threads + .iter() + .zip(subs) + .map(|(&event_id, &sub)| (room_id(), event_id, sub)) + .collect::>() + }; + + // Test bump_stamp logic + let initial_subscriptions = build_subscription_updates(&[ + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: None, + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(14), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: None, + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(210), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(5), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(100), + }, + ]); + + let update_subscriptions = build_subscription_updates(&[ + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: None, + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: None, + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(1101), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(222), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(1), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(100), + }, + ]); + + let expected_subscriptions = build_subscription_updates(&[ + // Status should be updated, because prev and new bump_stamp are both None + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: None, + }, + // Status should be updated, but keep initial bump_stamp (new is None) + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(14), + }, + // Status should be updated and also bump_stamp should be updated (initial was None) + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(1101), + }, + // Status should be updated and also bump_stamp should be updated (initial was lower) + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(222), + }, + // Status shouldn't change, as new bump_stamp is lower + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(5), + }, + // Status shouldn't change, as bump_stamp is equal to the previous one + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(100), + }, + ]); + + // Set the initial subscriptions + self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?; + + // Assert the subscriptions have been added + for (room_id, thread_id, expected_sub) in &initial_subscriptions { + let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?; + assert_eq!(stored_subscription, Some(*expected_sub)); + } + + // Update subscriptions + self.upsert_thread_subscriptions(update_subscriptions).await?; + + // Assert the expected subscriptions and bump_stamps + for (room_id, thread_id, expected_sub) in &expected_subscriptions { + let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?; + assert_eq!(stored_subscription, Some(*expected_sub)); + } + + // Test just state changes, but first remove previous subscriptions + for (room_id, thread_id, _) in &expected_subscriptions { + self.remove_thread_subscription(room_id, thread_id).await?; + } + + let initial_subscriptions = build_subscription_updates(&[ + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(1), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: false }, + bump_stamp: Some(1), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(1), + }, + ]); + + self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?; + + for (room_id, thread_id, expected_sub) in &initial_subscriptions { + let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?; + assert_eq!(stored_subscription, Some(*expected_sub)); + } + + let update_subscriptions = build_subscription_updates(&[ + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(2), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(2), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: false }, + bump_stamp: Some(2), + }, + ]); + + self.upsert_thread_subscriptions(update_subscriptions.clone()).await?; + + for (room_id, thread_id, expected_sub) in &update_subscriptions { + let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?; + assert_eq!(stored_subscription, Some(*expected_sub)); + } + + Ok(()) + } } /// Macro building to allow your StateStore implementation to run the entire @@ -2196,6 +2375,12 @@ macro_rules! statestore_integration_tests { let store = get_store().await?.into_state_store(); store.test_thread_subscriptions_bumpstamps().await } + + #[async_test] + async fn test_thread_subscriptions_bulk_upsert() -> TestResult { + let store = get_store().await?.into_state_store(); + store.test_thread_subscriptions_bulk_upsert().await + } } }; } diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index bf8d65803..0763e8b33 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -1017,6 +1017,33 @@ impl StateStore for MemoryStore { Ok(()) } + async fn upsert_thread_subscriptions( + &self, + updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>, + ) -> Result<(), Self::Error> { + let mut inner = self.inner.write().unwrap(); + + for (room_id, thread_id, mut new) in updates { + let room_subs = inner.thread_subscriptions.entry(room_id.to_owned()).or_default(); + + if let Some(previous) = room_subs.get(thread_id) { + if *previous == new { + continue; + } + if !compare_thread_subscription_bump_stamps( + previous.bump_stamp, + &mut new.bump_stamp, + ) { + continue; + } + } + + room_subs.insert(thread_id.to_owned(), new); + } + + Ok(()) + } + async fn load_thread_subscription( &self, room: &RoomId, diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index bbdc3a8f2..4c65f4442 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -487,7 +487,7 @@ pub trait StateStore: AsyncTraitDeps { /// bumpstamp is kept. /// /// If the new thread subscription has a bumpstamp that's lower than or - /// equal to a previously one, the existing subscription is kept, i.e. + /// equal to a previous one, the existing subscription is kept, i.e. /// this method must have no effect. async fn upsert_thread_subscription( &self, @@ -496,6 +496,20 @@ pub trait StateStore: AsyncTraitDeps { subscription: StoredThreadSubscription, ) -> Result<(), Self::Error>; + /// Inserts or updates multiple thread subscriptions. + /// + /// If the new thread subscription hasn't set a bumpstamp, and there was a + /// previous subscription in the database with a bumpstamp, the existing + /// bumpstamp is kept. + /// + /// If the new thread subscription has a bumpstamp that's lower than or + /// equal to a previous one, the existing subscription is kept, i.e. + /// this method must have no effect. + async fn upsert_thread_subscriptions( + &self, + updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>, + ) -> Result<(), Self::Error>; + /// Remove a previous thread subscription for a given room and thread. /// /// Note: removing an unknown thread subscription is a no-op. @@ -828,6 +842,13 @@ impl StateStore for EraseStateStoreError { self.0.upsert_thread_subscription(room, thread_id, subscription).await.map_err(Into::into) } + async fn upsert_thread_subscriptions( + &self, + updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>, + ) -> Result<(), Self::Error> { + self.0.upsert_thread_subscriptions(updates).await.map_err(Into::into) + } + async fn load_thread_subscription( &self, room: &RoomId, diff --git a/crates/matrix-sdk-indexeddb/CHANGELOG.md b/crates/matrix-sdk-indexeddb/CHANGELOG.md index 3c749c8e4..edf409fa2 100644 --- a/crates/matrix-sdk-indexeddb/CHANGELOG.md +++ b/crates/matrix-sdk-indexeddb/CHANGELOG.md @@ -19,6 +19,8 @@ All notable changes to this project will be documented in this file. ([#5819](https://github.com/matrix-org/matrix-rust-sdk/pull/5819)) - [**breaking**] `IndexeddbCryptoStore::get_withheld_info` now returns `Result, ...>`. ([#5737](https://github.com/matrix-org/matrix-rust-sdk/pull/5737)) +- Implement `StateStore::upsert_thread_subscriptions()` method for bulk upserts. + ([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848)) ### Performance diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index f26cbcc84..a90a3cc8b 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -1978,6 +1978,47 @@ impl_state_store!({ Ok(()) } + async fn upsert_thread_subscriptions( + &self, + updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>, + ) -> Result<()> { + let tx = self + .inner + .transaction(keys::THREAD_SUBSCRIPTIONS) + .with_mode(TransactionMode::Readwrite) + .build()?; + let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?; + + for (room_id, thread_id, subscription) in updates { + let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room_id, thread_id)); + let mut new = PersistedThreadSubscription::from(subscription); + + // See if there's a previous subscription. + if let Some(previous_value) = obj.get(&encoded_key).await? { + let previous: PersistedThreadSubscription = + self.deserialize_value(&previous_value)?; + + // If the previous status is the same as the new one, don't do anything. + if new == previous { + continue; + } + if !compare_thread_subscription_bump_stamps( + previous.bump_stamp, + &mut new.bump_stamp, + ) { + continue; + } + } + + let serialized_value = self.serialize_value(&new); + obj.put(&serialized_value?).with_key(encoded_key).build()?; + } + + tx.commit().await?; + + Ok(()) + } + async fn load_thread_subscription( &self, room: &RoomId, diff --git a/crates/matrix-sdk-sqlite/CHANGELOG.md b/crates/matrix-sdk-sqlite/CHANGELOG.md index 7ba3bea05..409f6cb4f 100644 --- a/crates/matrix-sdk-sqlite/CHANGELOG.md +++ b/crates/matrix-sdk-sqlite/CHANGELOG.md @@ -17,6 +17,8 @@ All notable changes to this project will be documented in this file. - Implement a new constructor that allows to open `SqliteCryptoStore` with a cryptographic key ([#5472](https://github.com/matrix-org/matrix-rust-sdk/pull/5472)) +- Implement `StateStore::upsert_thread_subscriptions()` method for bulk upserts. + ([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848)) ### Refactor - [breaking] Change the logic for opening a store so as to use a `Secret` enum in the function `open_with_pool` instead of a `passphrase` diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index fc3303995..5dbc235b9 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -2230,6 +2230,57 @@ impl StateStore for SqliteStateStore { Ok(()) } + async fn upsert_thread_subscriptions( + &self, + updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>, + ) -> Result<(), Self::Error> { + let values: Vec<_> = updates + .into_iter() + .map(|(room_id, thread_id, subscription)| { + ( + self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id), + self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id), + subscription.status.as_str(), + subscription.bump_stamp, + ) + }) + .collect(); + + self.write() + .await + .with_transaction(move |txn| { + let mut txn = txn.prepare_cached( + "INSERT INTO thread_subscriptions (room_id, event_id, status, bump_stamp) + VALUES (?, ?, ?, ?) + ON CONFLICT (room_id, event_id) DO UPDATE + SET + status = + CASE + WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.status + WHEN EXCLUDED.bump_stamp IS NULL THEN EXCLUDED.status + WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.status + ELSE thread_subscriptions.status + END, + bump_stamp = + CASE + WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.bump_stamp + WHEN EXCLUDED.bump_stamp IS NULL THEN thread_subscriptions.bump_stamp + WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.bump_stamp + ELSE thread_subscriptions.bump_stamp + END", + )?; + + for value in values { + txn.execute(value)?; + } + + Result::<_, Error>::Ok(()) + }) + .await?; + + Ok(()) + } + async fn load_thread_subscription( &self, room_id: &RoomId,