From 5b8ff8a76d13bc1d2cdbea627c683c613034e032 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 15 Jan 2026 15:37:05 +0100 Subject: [PATCH] refactor(state store): remove `StateStore::upsert_thread_subscription` There is `StateStore::upsert_thread_subscriptions` as a proper replacement these days. --- crates/matrix-sdk-base/CHANGELOG.md | 2 + .../src/store/integration_tests.rs | 90 ++----------------- .../matrix-sdk-base/src/store/memory_store.rs | 24 ----- crates/matrix-sdk-base/src/store/traits.rs | 25 ------ .../src/state_store/mod.rs | 38 -------- crates/matrix-sdk-sqlite/src/state_store.rs | 42 +-------- crates/matrix-sdk/src/room/mod.rs | 12 +-- 7 files changed, 16 insertions(+), 217 deletions(-) diff --git a/crates/matrix-sdk-base/CHANGELOG.md b/crates/matrix-sdk-base/CHANGELOG.md index 762d2edaa..d4214694a 100644 --- a/crates/matrix-sdk-base/CHANGELOG.md +++ b/crates/matrix-sdk-base/CHANGELOG.md @@ -22,6 +22,8 @@ All notable changes to this project will be documented in this file. ### Refactor +- [**breaking**] The `StateStore::upsert_thread_subscription` method has been removed in favor of a + bulk method `StateStore::upsert_thread_subscriptions`. - [**breaking**] The `message-ids` feature has been removed. It was already a no-op and has now been eliminated entirely. ([#5963](https://github.com/matrix-org/matrix-rust-sdk/pull/5963)) diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index ca2f1652f..bdbcff068 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -109,8 +109,6 @@ pub trait StateStoreIntegrationTests { async fn test_get_room_infos(&self) -> TestResult; /// Test loading thread subscriptions. async fn test_thread_subscriptions(&self) -> TestResult; - /// Test thread subscription bumpstamp semantics. - async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult; /// Test thread subscriptions bulk upsert, including bumpstamp semantics. async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult; } @@ -1844,24 +1842,24 @@ impl StateStoreIntegrationTests for DynStateStore { assert!(maybe_sub.is_none()); // Setting the thread subscription works. - self.upsert_thread_subscription( + self.upsert_thread_subscriptions(vec![( room_id(), first_thread, StoredThreadSubscription { status: ThreadSubscriptionStatus::Subscribed { automatic: true }, bump_stamp: None, }, - ) + )]) .await?; - self.upsert_thread_subscription( + self.upsert_thread_subscriptions(vec![( room_id(), second_thread, StoredThreadSubscription { status: ThreadSubscriptionStatus::Subscribed { automatic: false }, bump_stamp: None, }, - ) + )]) .await?; // Now, reading the thread subscription returns the expected status. @@ -1884,14 +1882,14 @@ impl StateStoreIntegrationTests for DynStateStore { ); // We can override the thread subscription status. - self.upsert_thread_subscription( + self.upsert_thread_subscriptions(vec![( room_id(), first_thread, StoredThreadSubscription { status: ThreadSubscriptionStatus::Unsubscribed, bump_stamp: None, }, - ) + )]) .await?; // And it's correctly reflected. @@ -1937,76 +1935,6 @@ impl StateStoreIntegrationTests for DynStateStore { Ok(()) } - async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult { - let thread = event_id!("$fred"); - - // At first, there is no thread subscription. - let sub = self.load_thread_subscription(room_id(), thread).await?; - assert!(sub.is_none()); - - // Setting the thread subscription with some bumpstamp works. - self.upsert_thread_subscription( - room_id(), - thread, - StoredThreadSubscription { - status: ThreadSubscriptionStatus::Subscribed { automatic: true }, - bump_stamp: Some(42), - }, - ) - .await?; - - let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap(); - assert_eq!( - sub, - StoredThreadSubscription { - status: ThreadSubscriptionStatus::Subscribed { automatic: true }, - bump_stamp: Some(42), - } - ); - - // Storing a subscription with an older bumpstamp has no effect. - self.upsert_thread_subscription( - room_id(), - thread, - StoredThreadSubscription { - status: ThreadSubscriptionStatus::Subscribed { automatic: false }, - bump_stamp: Some(41), - }, - ) - .await?; - - let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap(); - assert_eq!( - sub, - StoredThreadSubscription { - status: ThreadSubscriptionStatus::Subscribed { automatic: true }, - bump_stamp: Some(42), - } - ); - - // Storing with no bumpstamps keeps the previous one. - self.upsert_thread_subscription( - room_id(), - thread, - StoredThreadSubscription { - status: ThreadSubscriptionStatus::Unsubscribed, - bump_stamp: None, - }, - ) - .await?; - - let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap(); - assert_eq!( - sub, - StoredThreadSubscription { - status: ThreadSubscriptionStatus::Unsubscribed, - bump_stamp: Some(42), - } - ); - - Ok(()) - } - async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult { let threads = [ event_id!("$t1"), @@ -2370,12 +2298,6 @@ macro_rules! statestore_integration_tests { store.test_thread_subscriptions().await } - #[async_test] - async fn test_thread_subscriptions_bumpstamps() -> TestResult { - let store = get_store().await?.into_state_store(); - store.test_thread_subscriptions_bumpstamps().await - } - #[async_test] async fn test_thread_subscriptions_bulk_upsert() -> TestResult { let store = get_store().await?.into_state_store(); diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index 0763e8b33..1b3bb3152 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -993,30 +993,6 @@ impl StateStore for MemoryStore { .unwrap_or_default()) } - async fn upsert_thread_subscription( - &self, - room: &RoomId, - thread_id: &EventId, - mut new: StoredThreadSubscription, - ) -> Result<(), Self::Error> { - let mut inner = self.inner.write().unwrap(); - let room_subs = inner.thread_subscriptions.entry(room.to_owned()).or_default(); - - if let Some(previous) = room_subs.get(thread_id) { - // Nothing to do. - if *previous == new { - return Ok(()); - } - if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) { - return Ok(()); - } - } - - room_subs.insert(thread_id.to_owned(), new); - - Ok(()) - } - async fn upsert_thread_subscriptions( &self, updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>, diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index 4c65f4442..956347792 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -480,22 +480,6 @@ pub trait StateStore: AsyncTraitDeps { room: &RoomId, ) -> Result, Self::Error>; - /// Insert or update a thread subscription for a given room and thread. - /// - /// If the new thread subscription hasn't set a bumpstamp, and there was a - /// previous subscription in the database with a bumpstamp, the existing - /// bumpstamp is kept. - /// - /// If the new thread subscription has a bumpstamp that's lower than or - /// equal to a previous one, the existing subscription is kept, i.e. - /// this method must have no effect. - async fn upsert_thread_subscription( - &self, - room: &RoomId, - thread_id: &EventId, - subscription: StoredThreadSubscription, - ) -> Result<(), Self::Error>; - /// Inserts or updates multiple thread subscriptions. /// /// If the new thread subscription hasn't set a bumpstamp, and there was a @@ -833,15 +817,6 @@ impl StateStore for EraseStateStoreError { .map_err(Into::into) } - async fn upsert_thread_subscription( - &self, - room: &RoomId, - thread_id: &EventId, - subscription: StoredThreadSubscription, - ) -> Result<(), Self::Error> { - self.0.upsert_thread_subscription(room, thread_id, subscription).await.map_err(Into::into) - } - async fn upsert_thread_subscriptions( &self, updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>, diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index ce3a3f30c..e66b83b76 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -1946,44 +1946,6 @@ impl_state_store!({ ) } - async fn upsert_thread_subscription( - &self, - room: &RoomId, - thread_id: &EventId, - subscription: StoredThreadSubscription, - ) -> Result<()> { - let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id)); - - let tx = self - .inner - .transaction(keys::THREAD_SUBSCRIPTIONS) - .with_mode(TransactionMode::Readwrite) - .build()?; - let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?; - - let mut new = PersistedThreadSubscription::from(subscription); - - // See if there's a previous subscription. - if let Some(previous_value) = obj.get(&encoded_key).await? { - let previous: PersistedThreadSubscription = self.deserialize_value(&previous_value)?; - - // If the previous status is the same as the new one, don't do anything. - if new == previous { - return Ok(()); - } - if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) { - return Ok(()); - } - } - - let serialized_value = self.serialize_value(&new); - obj.put(&serialized_value?).with_key(encoded_key).build()?; - - tx.commit().await?; - - Ok(()) - } - async fn upsert_thread_subscriptions( &self, updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>, diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index a760781d6..d6ba5c650 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -15,8 +15,7 @@ use matrix_sdk_base::{ store::{ ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey, - StoredThreadSubscription, ThreadSubscriptionStatus, - compare_thread_subscription_bump_stamps, migration_helpers::RoomInfoV1, + StoredThreadSubscription, ThreadSubscriptionStatus, migration_helpers::RoomInfoV1, }, }; use matrix_sdk_store_encryption::StoreCipher; @@ -42,7 +41,7 @@ use tokio::{ fs, sync::{Mutex, OwnedMutexGuard}, }; -use tracing::{debug, instrument, trace, warn}; +use tracing::{debug, instrument, warn}; use crate::{ OpenStoreError, Secret, SqliteStoreConfig, @@ -2195,43 +2194,6 @@ impl StateStore for SqliteStateStore { Ok(dependent_events) } - async fn upsert_thread_subscription( - &self, - room_id: &RoomId, - thread_id: &EventId, - mut new: StoredThreadSubscription, - ) -> Result<(), Self::Error> { - if let Some(previous) = self.load_thread_subscription(room_id, thread_id).await? { - if previous == new { - // No need to update anything. - trace!("not saving thread subscription because the subscription is the same"); - return Ok(()); - } - - if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) { - trace!("not saving thread subscription because we have a newer bump stamp"); - return Ok(()); - } - } - - let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id); - let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id); - let status = new.status.as_str(); - - self.write() - .await - .with_transaction(move |txn| { - // Try to find a previous value. - txn.prepare_cached( - "INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status, bump_stamp) - VALUES (?, ?, ?, ?)", - )? - .execute((room_id, thread_id, status, new.bump_stamp)) - }) - .await?; - Ok(()) - } - async fn upsert_thread_subscriptions( &self, updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>, diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index dbd3413fb..d6d19c62d 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -4199,7 +4199,7 @@ impl Room { // Immediately save the result into the database. self.client .state_store() - .upsert_thread_subscription( + .upsert_thread_subscriptions(vec![( self.room_id(), &thread_root, StoredThreadSubscription { @@ -4208,7 +4208,7 @@ impl Room { }, bump_stamp: None, }, - ) + )]) .await?; Ok(()) @@ -4277,14 +4277,14 @@ impl Room { // Immediately save the result into the database. self.client .state_store() - .upsert_thread_subscription( + .upsert_thread_subscriptions(vec![( self.room_id(), &thread_root, StoredThreadSubscription { status: ThreadSubscriptionStatus::Unsubscribed, bump_stamp: None, }, - ) + )]) .await?; Ok(()) @@ -4331,14 +4331,14 @@ impl Room { if let Some(sub) = &subscription { self.client .state_store() - .upsert_thread_subscription( + .upsert_thread_subscriptions(vec![( self.room_id(), &thread_root, StoredThreadSubscription { status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic }, bump_stamp: None, }, - ) + )]) .await?; } else { // If the subscription was not found, remove it from the database.