feat(sdk): store the thread subscription bumpstamp and implement the correct upsert semantics

This commit is contained in:
Benjamin Bouvier
2025-08-27 17:40:54 +02:00
parent 8c0a918e6e
commit 7a762035f1
7 changed files with 240 additions and 41 deletions

View File

@@ -107,6 +107,8 @@ pub trait StateStoreIntegrationTests {
async fn test_get_room_infos(&self) -> TestResult;
/// Test loading thread subscriptions.
async fn test_thread_subscriptions(&self) -> TestResult;
/// Test thread subscription bumpstamp semantics.
async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult;
}
impl StateStoreIntegrationTests for DynStateStore {
@@ -1784,11 +1786,11 @@ impl StateStoreIntegrationTests for DynStateStore {
let second_thread = event_id!("$t2");
// At first, there is no thread subscription.
let maybe_status = self.load_thread_subscription(room_id(), first_thread).await?;
assert!(maybe_status.is_none());
let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
assert!(maybe_sub.is_none());
let maybe_status = self.load_thread_subscription(room_id(), second_thread).await?;
assert!(maybe_status.is_none());
let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
assert!(maybe_sub.is_none());
// Setting the thread subscription works.
self.upsert_thread_subscription(
@@ -1812,18 +1814,18 @@ impl StateStoreIntegrationTests for DynStateStore {
.await?;
// Now, reading the thread subscription returns the expected status.
let maybe_status = self.load_thread_subscription(room_id(), first_thread).await?;
let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
assert_eq!(
maybe_status,
maybe_sub,
Some(StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: None,
})
);
let maybe_status = self.load_thread_subscription(room_id(), second_thread).await?;
let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
assert_eq!(
maybe_status,
maybe_sub,
Some(StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: false },
bump_stamp: None,
@@ -1842,9 +1844,9 @@ impl StateStoreIntegrationTests for DynStateStore {
.await?;
// And it's correctly reflected.
let maybe_status = self.load_thread_subscription(room_id(), first_thread).await?;
let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
assert_eq!(
maybe_status,
maybe_sub,
Some(StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: None,
@@ -1852,9 +1854,9 @@ impl StateStoreIntegrationTests for DynStateStore {
);
// And the second thread is still subscribed.
let maybe_status = self.load_thread_subscription(room_id(), second_thread).await?;
let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
assert_eq!(
maybe_status,
maybe_sub,
Some(StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: false },
bump_stamp: None,
@@ -1865,13 +1867,13 @@ impl StateStoreIntegrationTests for DynStateStore {
self.remove_thread_subscription(room_id(), second_thread).await?;
// And it's correctly reflected.
let maybe_status = self.load_thread_subscription(room_id(), second_thread).await?;
assert_eq!(maybe_status, None);
let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
assert_eq!(maybe_sub, None);
// And the first thread is still unsubscribed.
let maybe_status = self.load_thread_subscription(room_id(), first_thread).await?;
let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
assert_eq!(
maybe_status,
maybe_sub,
Some(StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: None,
@@ -1883,6 +1885,76 @@ impl StateStoreIntegrationTests for DynStateStore {
Ok(())
}
async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult {
let thread = event_id!("$fred");
// At first, there is no thread subscription.
let sub = self.load_thread_subscription(room_id(), thread).await?;
assert!(sub.is_none());
// Setting the thread subscription with some bumpstamp works.
self.upsert_thread_subscription(
room_id(),
thread,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(42),
},
)
.await?;
let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap();
assert_eq!(
sub,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(42),
}
);
// Storing a subscription with an older bumpstamp has no effect.
self.upsert_thread_subscription(
room_id(),
thread,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: false },
bump_stamp: Some(41),
},
)
.await?;
let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap();
assert_eq!(
sub,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(42),
}
);
// Storing with no bumpstamps keeps the previous one.
self.upsert_thread_subscription(
room_id(),
thread,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: None,
},
)
.await?;
let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap();
assert_eq!(
sub,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(42),
}
);
Ok(())
}
}
/// Macro building to allow your StateStore implementation to run the entire
@@ -2063,6 +2135,12 @@ macro_rules! statestore_integration_tests {
let store = get_store().await?.into_state_store();
store.test_thread_subscriptions().await
}
#[async_test]
async fn test_thread_subscriptions_bumpstamps() -> TestResult {
let store = get_store().await?.into_state_store();
store.test_thread_subscriptions_bumpstamps().await
}
}
};
}

View File

@@ -968,15 +968,37 @@ impl StateStore for MemoryStore {
&self,
room: &RoomId,
thread_id: &EventId,
subscription: StoredThreadSubscription,
mut new: StoredThreadSubscription,
) -> Result<(), Self::Error> {
self.inner
.write()
.unwrap()
.thread_subscriptions
.entry(room.to_owned())
.or_default()
.insert(thread_id.to_owned(), subscription);
let mut inner = self.inner.write().unwrap();
let room_subs = inner.thread_subscriptions.entry(room.to_owned()).or_default();
if let Some(previous) = room_subs.get(thread_id) {
// Nothing to do.
if *previous == new {
return Ok(());
}
match (previous.bump_stamp, new.bump_stamp) {
// If the previous subscription had a bump stamp, and the new one
// doesn't, keep the previous one.
(Some(prev_bump), None) => {
new.bump_stamp = Some(prev_bump);
}
// If the previous bump stamp is newer than the new one, don't store the value at
// all.
(Some(prev_bump), Some(new_bump)) if new_bump <= prev_bump => {
return Ok(());
}
// In all other cases, keep the new bumpstamp.
_ => {}
}
}
room_subs.insert(thread_id.to_owned(), new);
Ok(())
}

View File

@@ -482,8 +482,13 @@ pub trait StateStore: AsyncTraitDeps {
/// Insert or update a thread subscription for a given room and thread.
///
/// If the new thread subscription hasn't set a bumpstamp, and there was one
/// in the database with a bumpstamp, the existing bumpstamp is kept.
/// If the new thread subscription hasn't set a bumpstamp, and there was a
/// previous subscription in the database with a bumpstamp, the existing
/// bumpstamp is kept.
///
/// If the new thread subscription has a bumpstamp that's lower than or
/// equal to a previously one, the existing subscription is kept, i.e.
/// this method must have no effect.
async fn upsert_thread_subscription(
&self,
room: &RoomId,

View File

@@ -46,7 +46,7 @@ use super::{
};
use crate::IndexeddbStateStoreError;
const CURRENT_DB_VERSION: u32 = 13;
const CURRENT_DB_VERSION: u32 = 14;
const CURRENT_META_DB_VERSION: u32 = 2;
/// Sometimes Migrations can't proceed without having to drop existing
@@ -240,6 +240,9 @@ pub async fn upgrade_inner_db(
if old_version < 13 {
db = migrate_to_v13(db).await?;
}
if old_version < 14 {
db = migrate_to_v14(db).await?;
}
}
db.close();
@@ -806,6 +809,18 @@ async fn migrate_to_v13(db: IdbDatabase) -> Result<IdbDatabase> {
apply_migration(db, 13, migration).await
}
/// Empty the thread subscriptions table, because the serialized format has
/// changed (from storing only the subscription to storing the
/// `StoredThreadSubscription`).
async fn migrate_to_v14(db: IdbDatabase) -> Result<IdbDatabase> {
let migration = OngoingMigration {
drop_stores: [keys::THREAD_SUBSCRIPTIONS].into_iter().collect(),
create_stores: [keys::THREAD_SUBSCRIPTIONS].into_iter().collect(),
data: Default::default(),
};
apply_migration(db, 14, migration).await
}
#[cfg(all(test, target_family = "wasm"))]
mod tests {
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

View File

@@ -502,6 +502,18 @@ impl PersistedQueuedRequest {
}
}
#[derive(Serialize, Deserialize, PartialEq)]
struct PersistedThreadSubscription {
status: String,
bump_stamp: Option<u64>,
}
impl From<StoredThreadSubscription> for PersistedThreadSubscription {
fn from(value: StoredThreadSubscription) -> Self {
Self { status: value.status.as_str().to_owned(), bump_stamp: value.bump_stamp }
}
}
// Small hack to have the following macro invocation act as the appropriate
// trait impl block on wasm, but still be compiled on non-wasm as a regular
// impl block otherwise.
@@ -1810,8 +1822,36 @@ impl_state_store!({
)?;
let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
// TODO: store the bump stamp as well.
let serialized_value = self.serialize_value(&subscription.status.as_str().to_owned());
let mut new = PersistedThreadSubscription::from(subscription);
// See if there's a previous subscription.
if let Some(previous_value) = obj.get(&encoded_key)?.await? {
let previous: PersistedThreadSubscription = self.deserialize_value(&previous_value)?;
// If the previous status is the same as the new one, don't do anything.
if new == previous {
return Ok(());
}
match (previous.bump_stamp, new.bump_stamp) {
// If the previous subscription had a bump stamp, and the new one
// doesn't, keep the previous one.
(Some(prev_bump), None) => {
new.bump_stamp = Some(prev_bump);
}
// If the previous bump stamp is newer than the new one, don't store the value at
// all.
(Some(prev_bump), Some(new_bump)) if new_bump <= prev_bump => {
return Ok(());
}
// In all other cases, keep the new bumpstamp.
_ => {}
}
}
let serialized_value = self.serialize_value(&new);
obj.put_key_val(&encoded_key, &serialized_value?)?;
tx.await.into_result()?;

View File

@@ -0,0 +1,3 @@
-- Add a column bump_stamp (number) to the table thread_subscriptions.
ALTER TABLE "thread_subscriptions"
ADD COLUMN "bump_stamp" INTEGER;

View File

@@ -74,7 +74,7 @@ pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
/// This is used to figure whether the SQLite database requires a migration.
/// Every new SQL migration should imply a bump of this number, and changes in
/// the [`SqliteStateStore::run_migrations`] function.
const DATABASE_VERSION: u8 = 13;
const DATABASE_VERSION: u8 = 14;
/// An SQLite-based state store.
#[derive(Clone)]
@@ -369,6 +369,17 @@ impl SqliteStateStore {
.await?;
}
if from < 14 && to >= 14 {
conn.with_transaction(move |txn| {
// Run the migration.
txn.execute_batch(include_str!(
"../migrations/state_store/012_thread_subscriptions_bumpstamp.sql"
))?;
txn.set_db_version(14)
})
.await?;
}
Ok(())
}
@@ -2108,20 +2119,45 @@ impl StateStore for SqliteStateStore {
&self,
room_id: &RoomId,
thread_id: &EventId,
subscription: StoredThreadSubscription,
mut new: StoredThreadSubscription,
) -> Result<(), Self::Error> {
if let Some(previous) = self.load_thread_subscription(room_id, thread_id).await? {
if previous == new {
// No need to update anything.
return Ok(());
}
match (previous.bump_stamp, new.bump_stamp) {
// If the previous subscription had a bump stamp, and the new one
// doesn't, keep the previous one.
(Some(prev_bump), None) => {
new.bump_stamp = Some(prev_bump);
}
// If the previous bump stamp is newer than the new one, don't store the value at
// all.
(Some(prev_bump), Some(new_bump)) if new_bump <= prev_bump => {
return Ok(());
}
// In all other cases, keep the new bumpstamp.
_ => {}
}
}
let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
let status = subscription.status.as_str();
let status = new.status.as_str();
self.acquire()
.await?
.with_transaction(move |txn| {
// Try to find a previous value.
txn.prepare_cached(
"INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status)
VALUES (?, ?, ?)",
"INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status, bump_stamp)
VALUES (?, ?, ?, ?)",
)?
.execute((room_id, thread_id, status))
.execute((room_id, thread_id, status, new.bump_stamp))
})
.await?;
Ok(())
@@ -2139,17 +2175,17 @@ impl StateStore for SqliteStateStore {
.acquire()
.await?
.query_row(
"SELECT status FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
"SELECT status, bump_stamp FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
(room_id, thread_id),
|row| row.get::<_, String>(0),
|row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<u64>>(1)?))
)
.await
.optional()?
.map(|data| -> Result<_, Self::Error> {
let status = ThreadSubscriptionStatus::from_str(&data).map_err(|_| {
Error::InvalidData { details: format!("Invalid thread status: {data}") }
.map(|(status, bump_stamp)| -> Result<_, Self::Error> {
let status = ThreadSubscriptionStatus::from_str(&status).map_err(|_| {
Error::InvalidData { details: format!("Invalid thread status: {status}") }
})?;
Ok(StoredThreadSubscription { status, bump_stamp: None })
Ok(StoredThreadSubscription { status, bump_stamp })
})
.transpose()?)
}