From 7a762035f15b09fce9945b53528ca9ea0188db32 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Wed, 27 Aug 2025 17:40:54 +0200 Subject: [PATCH] feat(sdk): store the thread subscription bumpstamp and implement the correct upsert semantics --- .../src/store/integration_tests.rs | 110 +++++++++++++++--- .../matrix-sdk-base/src/store/memory_store.rs | 38 ++++-- crates/matrix-sdk-base/src/store/traits.rs | 9 +- .../src/state_store/migrations.rs | 17 ++- .../src/state_store/mod.rs | 44 ++++++- .../012_thread_subscriptions_bumpstamp.sql | 3 + crates/matrix-sdk-sqlite/src/state_store.rs | 60 ++++++++-- 7 files changed, 240 insertions(+), 41 deletions(-) create mode 100644 crates/matrix-sdk-sqlite/migrations/state_store/012_thread_subscriptions_bumpstamp.sql diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index 6d2f2809d..5e925a2c8 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -107,6 +107,8 @@ pub trait StateStoreIntegrationTests { async fn test_get_room_infos(&self) -> TestResult; /// Test loading thread subscriptions. async fn test_thread_subscriptions(&self) -> TestResult; + /// Test thread subscription bumpstamp semantics. + async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult; } impl StateStoreIntegrationTests for DynStateStore { @@ -1784,11 +1786,11 @@ impl StateStoreIntegrationTests for DynStateStore { let second_thread = event_id!("$t2"); // At first, there is no thread subscription. - let maybe_status = self.load_thread_subscription(room_id(), first_thread).await?; - assert!(maybe_status.is_none()); + let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?; + assert!(maybe_sub.is_none()); - let maybe_status = self.load_thread_subscription(room_id(), second_thread).await?; - assert!(maybe_status.is_none()); + let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?; + assert!(maybe_sub.is_none()); // Setting the thread subscription works. self.upsert_thread_subscription( @@ -1812,18 +1814,18 @@ impl StateStoreIntegrationTests for DynStateStore { .await?; // Now, reading the thread subscription returns the expected status. - let maybe_status = self.load_thread_subscription(room_id(), first_thread).await?; + let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?; assert_eq!( - maybe_status, + maybe_sub, Some(StoredThreadSubscription { status: ThreadSubscriptionStatus::Subscribed { automatic: true }, bump_stamp: None, }) ); - let maybe_status = self.load_thread_subscription(room_id(), second_thread).await?; + let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?; assert_eq!( - maybe_status, + maybe_sub, Some(StoredThreadSubscription { status: ThreadSubscriptionStatus::Subscribed { automatic: false }, bump_stamp: None, @@ -1842,9 +1844,9 @@ impl StateStoreIntegrationTests for DynStateStore { .await?; // And it's correctly reflected. - let maybe_status = self.load_thread_subscription(room_id(), first_thread).await?; + let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?; assert_eq!( - maybe_status, + maybe_sub, Some(StoredThreadSubscription { status: ThreadSubscriptionStatus::Unsubscribed, bump_stamp: None, @@ -1852,9 +1854,9 @@ impl StateStoreIntegrationTests for DynStateStore { ); // And the second thread is still subscribed. - let maybe_status = self.load_thread_subscription(room_id(), second_thread).await?; + let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?; assert_eq!( - maybe_status, + maybe_sub, Some(StoredThreadSubscription { status: ThreadSubscriptionStatus::Subscribed { automatic: false }, bump_stamp: None, @@ -1865,13 +1867,13 @@ impl StateStoreIntegrationTests for DynStateStore { self.remove_thread_subscription(room_id(), second_thread).await?; // And it's correctly reflected. - let maybe_status = self.load_thread_subscription(room_id(), second_thread).await?; - assert_eq!(maybe_status, None); + let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?; + assert_eq!(maybe_sub, None); // And the first thread is still unsubscribed. - let maybe_status = self.load_thread_subscription(room_id(), first_thread).await?; + let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?; assert_eq!( - maybe_status, + maybe_sub, Some(StoredThreadSubscription { status: ThreadSubscriptionStatus::Unsubscribed, bump_stamp: None, @@ -1883,6 +1885,76 @@ impl StateStoreIntegrationTests for DynStateStore { Ok(()) } + + async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult { + let thread = event_id!("$fred"); + + // At first, there is no thread subscription. + let sub = self.load_thread_subscription(room_id(), thread).await?; + assert!(sub.is_none()); + + // Setting the thread subscription with some bumpstamp works. + self.upsert_thread_subscription( + room_id(), + thread, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(42), + }, + ) + .await?; + + let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap(); + assert_eq!( + sub, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(42), + } + ); + + // Storing a subscription with an older bumpstamp has no effect. + self.upsert_thread_subscription( + room_id(), + thread, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: false }, + bump_stamp: Some(41), + }, + ) + .await?; + + let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap(); + assert_eq!( + sub, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(42), + } + ); + + // Storing with no bumpstamps keeps the previous one. + self.upsert_thread_subscription( + room_id(), + thread, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: None, + }, + ) + .await?; + + let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap(); + assert_eq!( + sub, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(42), + } + ); + + Ok(()) + } } /// Macro building to allow your StateStore implementation to run the entire @@ -2063,6 +2135,12 @@ macro_rules! statestore_integration_tests { let store = get_store().await?.into_state_store(); store.test_thread_subscriptions().await } + + #[async_test] + async fn test_thread_subscriptions_bumpstamps() -> TestResult { + let store = get_store().await?.into_state_store(); + store.test_thread_subscriptions_bumpstamps().await + } } }; } diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index 66b213fd5..e531b6a11 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -968,15 +968,37 @@ impl StateStore for MemoryStore { &self, room: &RoomId, thread_id: &EventId, - subscription: StoredThreadSubscription, + mut new: StoredThreadSubscription, ) -> Result<(), Self::Error> { - self.inner - .write() - .unwrap() - .thread_subscriptions - .entry(room.to_owned()) - .or_default() - .insert(thread_id.to_owned(), subscription); + let mut inner = self.inner.write().unwrap(); + let room_subs = inner.thread_subscriptions.entry(room.to_owned()).or_default(); + + if let Some(previous) = room_subs.get(thread_id) { + // Nothing to do. + if *previous == new { + return Ok(()); + } + + match (previous.bump_stamp, new.bump_stamp) { + // If the previous subscription had a bump stamp, and the new one + // doesn't, keep the previous one. + (Some(prev_bump), None) => { + new.bump_stamp = Some(prev_bump); + } + + // If the previous bump stamp is newer than the new one, don't store the value at + // all. + (Some(prev_bump), Some(new_bump)) if new_bump <= prev_bump => { + return Ok(()); + } + + // In all other cases, keep the new bumpstamp. + _ => {} + } + } + + room_subs.insert(thread_id.to_owned(), new); + Ok(()) } diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index df90c846d..b212f03d7 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -482,8 +482,13 @@ pub trait StateStore: AsyncTraitDeps { /// Insert or update a thread subscription for a given room and thread. /// - /// If the new thread subscription hasn't set a bumpstamp, and there was one - /// in the database with a bumpstamp, the existing bumpstamp is kept. + /// If the new thread subscription hasn't set a bumpstamp, and there was a + /// previous subscription in the database with a bumpstamp, the existing + /// bumpstamp is kept. + /// + /// If the new thread subscription has a bumpstamp that's lower than or + /// equal to a previously one, the existing subscription is kept, i.e. + /// this method must have no effect. async fn upsert_thread_subscription( &self, room: &RoomId, diff --git a/crates/matrix-sdk-indexeddb/src/state_store/migrations.rs b/crates/matrix-sdk-indexeddb/src/state_store/migrations.rs index 822e18fc6..cce80e7f0 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/migrations.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/migrations.rs @@ -46,7 +46,7 @@ use super::{ }; use crate::IndexeddbStateStoreError; -const CURRENT_DB_VERSION: u32 = 13; +const CURRENT_DB_VERSION: u32 = 14; const CURRENT_META_DB_VERSION: u32 = 2; /// Sometimes Migrations can't proceed without having to drop existing @@ -240,6 +240,9 @@ pub async fn upgrade_inner_db( if old_version < 13 { db = migrate_to_v13(db).await?; } + if old_version < 14 { + db = migrate_to_v14(db).await?; + } } db.close(); @@ -806,6 +809,18 @@ async fn migrate_to_v13(db: IdbDatabase) -> Result { apply_migration(db, 13, migration).await } +/// Empty the thread subscriptions table, because the serialized format has +/// changed (from storing only the subscription to storing the +/// `StoredThreadSubscription`). +async fn migrate_to_v14(db: IdbDatabase) -> Result { + let migration = OngoingMigration { + drop_stores: [keys::THREAD_SUBSCRIPTIONS].into_iter().collect(), + create_stores: [keys::THREAD_SUBSCRIPTIONS].into_iter().collect(), + data: Default::default(), + }; + apply_migration(db, 14, migration).await +} + #[cfg(all(test, target_family = "wasm"))] mod tests { wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index dc46caf83..fd1c85916 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -502,6 +502,18 @@ impl PersistedQueuedRequest { } } +#[derive(Serialize, Deserialize, PartialEq)] +struct PersistedThreadSubscription { + status: String, + bump_stamp: Option, +} + +impl From for PersistedThreadSubscription { + fn from(value: StoredThreadSubscription) -> Self { + Self { status: value.status.as_str().to_owned(), bump_stamp: value.bump_stamp } + } +} + // Small hack to have the following macro invocation act as the appropriate // trait impl block on wasm, but still be compiled on non-wasm as a regular // impl block otherwise. @@ -1810,8 +1822,36 @@ impl_state_store!({ )?; let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?; - // TODO: store the bump stamp as well. - let serialized_value = self.serialize_value(&subscription.status.as_str().to_owned()); + let mut new = PersistedThreadSubscription::from(subscription); + + // See if there's a previous subscription. + if let Some(previous_value) = obj.get(&encoded_key)?.await? { + let previous: PersistedThreadSubscription = self.deserialize_value(&previous_value)?; + + // If the previous status is the same as the new one, don't do anything. + if new == previous { + return Ok(()); + } + + match (previous.bump_stamp, new.bump_stamp) { + // If the previous subscription had a bump stamp, and the new one + // doesn't, keep the previous one. + (Some(prev_bump), None) => { + new.bump_stamp = Some(prev_bump); + } + + // If the previous bump stamp is newer than the new one, don't store the value at + // all. + (Some(prev_bump), Some(new_bump)) if new_bump <= prev_bump => { + return Ok(()); + } + + // In all other cases, keep the new bumpstamp. + _ => {} + } + } + + let serialized_value = self.serialize_value(&new); obj.put_key_val(&encoded_key, &serialized_value?)?; tx.await.into_result()?; diff --git a/crates/matrix-sdk-sqlite/migrations/state_store/012_thread_subscriptions_bumpstamp.sql b/crates/matrix-sdk-sqlite/migrations/state_store/012_thread_subscriptions_bumpstamp.sql new file mode 100644 index 000000000..0faacb03a --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/state_store/012_thread_subscriptions_bumpstamp.sql @@ -0,0 +1,3 @@ +-- Add a column bump_stamp (number) to the table thread_subscriptions. +ALTER TABLE "thread_subscriptions" + ADD COLUMN "bump_stamp" INTEGER; diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index 3e8d43bae..6bbb6acdc 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -74,7 +74,7 @@ pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3"; /// This is used to figure whether the SQLite database requires a migration. /// Every new SQL migration should imply a bump of this number, and changes in /// the [`SqliteStateStore::run_migrations`] function. -const DATABASE_VERSION: u8 = 13; +const DATABASE_VERSION: u8 = 14; /// An SQLite-based state store. #[derive(Clone)] @@ -369,6 +369,17 @@ impl SqliteStateStore { .await?; } + if from < 14 && to >= 14 { + conn.with_transaction(move |txn| { + // Run the migration. + txn.execute_batch(include_str!( + "../migrations/state_store/012_thread_subscriptions_bumpstamp.sql" + ))?; + txn.set_db_version(14) + }) + .await?; + } + Ok(()) } @@ -2108,20 +2119,45 @@ impl StateStore for SqliteStateStore { &self, room_id: &RoomId, thread_id: &EventId, - subscription: StoredThreadSubscription, + mut new: StoredThreadSubscription, ) -> Result<(), Self::Error> { + if let Some(previous) = self.load_thread_subscription(room_id, thread_id).await? { + if previous == new { + // No need to update anything. + return Ok(()); + } + + match (previous.bump_stamp, new.bump_stamp) { + // If the previous subscription had a bump stamp, and the new one + // doesn't, keep the previous one. + (Some(prev_bump), None) => { + new.bump_stamp = Some(prev_bump); + } + + // If the previous bump stamp is newer than the new one, don't store the value at + // all. + (Some(prev_bump), Some(new_bump)) if new_bump <= prev_bump => { + return Ok(()); + } + + // In all other cases, keep the new bumpstamp. + _ => {} + } + } + let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id); let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id); - let status = subscription.status.as_str(); + let status = new.status.as_str(); self.acquire() .await? .with_transaction(move |txn| { + // Try to find a previous value. txn.prepare_cached( - "INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status) - VALUES (?, ?, ?)", + "INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status, bump_stamp) + VALUES (?, ?, ?, ?)", )? - .execute((room_id, thread_id, status)) + .execute((room_id, thread_id, status, new.bump_stamp)) }) .await?; Ok(()) @@ -2139,17 +2175,17 @@ impl StateStore for SqliteStateStore { .acquire() .await? .query_row( - "SELECT status FROM thread_subscriptions WHERE room_id = ? AND event_id = ?", + "SELECT status, bump_stamp FROM thread_subscriptions WHERE room_id = ? AND event_id = ?", (room_id, thread_id), - |row| row.get::<_, String>(0), + |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option>(1)?)) ) .await .optional()? - .map(|data| -> Result<_, Self::Error> { - let status = ThreadSubscriptionStatus::from_str(&data).map_err(|_| { - Error::InvalidData { details: format!("Invalid thread status: {data}") } + .map(|(status, bump_stamp)| -> Result<_, Self::Error> { + let status = ThreadSubscriptionStatus::from_str(&status).map_err(|_| { + Error::InvalidData { details: format!("Invalid thread status: {status}") } })?; - Ok(StoredThreadSubscription { status, bump_stamp: None }) + Ok(StoredThreadSubscription { status, bump_stamp }) }) .transpose()?) }