feat(state-stores): Add StateStore::upsert_thread_subscriptions() method for bulk upsert

This commit is contained in:
razvp
2025-11-09 19:33:05 +02:00
committed by Ivan Enderlin
parent 1af22a70b7
commit 67b1de613c
8 changed files with 332 additions and 1 deletions

View File

@@ -50,6 +50,8 @@ All notable changes to this project will be documented in this file.
([#5817](https://github.com/matrix-org/matrix-rust-sdk/pull/5817))
- `ComposerDraft` can now store attachments alongside text messages.
([#5794](https://github.com/matrix-org/matrix-rust-sdk/pull/5794))
- Add `StateStore::upsert_thread_subscriptions()` method for bulk upserts.
([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848))
## [0.14.1] - 2025-09-10

View File

@@ -111,6 +111,8 @@ pub trait StateStoreIntegrationTests {
async fn test_thread_subscriptions(&self) -> TestResult;
/// Test thread subscription bumpstamp semantics.
async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult;
/// Test thread subscriptions bulk upsert, including bumpstamp semantics.
async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult;
}
impl StateStoreIntegrationTests for DynStateStore {
@@ -2004,6 +2006,183 @@ impl StateStoreIntegrationTests for DynStateStore {
Ok(())
}
async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult {
let threads = [
event_id!("$t1"),
event_id!("$t2"),
event_id!("$t3"),
event_id!("$t4"),
event_id!("$t5"),
event_id!("$t6"),
];
// Helper for building the input for `upsert_thread_subscriptions()`,
// which is of the type: Vec<(&RoomId, &EventId, StoredThreadSubscription)>
let build_subscription_updates = |subs: &[StoredThreadSubscription]| {
threads
.iter()
.zip(subs)
.map(|(&event_id, &sub)| (room_id(), event_id, sub))
.collect::<Vec<_>>()
};
// Test bump_stamp logic
let initial_subscriptions = build_subscription_updates(&[
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: None,
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(14),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: None,
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(210),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(5),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(100),
},
]);
let update_subscriptions = build_subscription_updates(&[
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: None,
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: None,
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(1101),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(222),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(1),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(100),
},
]);
let expected_subscriptions = build_subscription_updates(&[
// Status should be updated, because prev and new bump_stamp are both None
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: None,
},
// Status should be updated, but keep initial bump_stamp (new is None)
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(14),
},
// Status should be updated and also bump_stamp should be updated (initial was None)
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(1101),
},
// Status should be updated and also bump_stamp should be updated (initial was lower)
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(222),
},
// Status shouldn't change, as new bump_stamp is lower
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(5),
},
// Status shouldn't change, as bump_stamp is equal to the previous one
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(100),
},
]);
// Set the initial subscriptions
self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
// Assert the subscriptions have been added
for (room_id, thread_id, expected_sub) in &initial_subscriptions {
let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
assert_eq!(stored_subscription, Some(*expected_sub));
}
// Update subscriptions
self.upsert_thread_subscriptions(update_subscriptions).await?;
// Assert the expected subscriptions and bump_stamps
for (room_id, thread_id, expected_sub) in &expected_subscriptions {
let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
assert_eq!(stored_subscription, Some(*expected_sub));
}
// Test just state changes, but first remove previous subscriptions
for (room_id, thread_id, _) in &expected_subscriptions {
self.remove_thread_subscription(room_id, thread_id).await?;
}
let initial_subscriptions = build_subscription_updates(&[
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(1),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: false },
bump_stamp: Some(1),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(1),
},
]);
self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
for (room_id, thread_id, expected_sub) in &initial_subscriptions {
let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
assert_eq!(stored_subscription, Some(*expected_sub));
}
let update_subscriptions = build_subscription_updates(&[
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(2),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(2),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: false },
bump_stamp: Some(2),
},
]);
self.upsert_thread_subscriptions(update_subscriptions.clone()).await?;
for (room_id, thread_id, expected_sub) in &update_subscriptions {
let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
assert_eq!(stored_subscription, Some(*expected_sub));
}
Ok(())
}
}
/// Macro building to allow your StateStore implementation to run the entire
@@ -2196,6 +2375,12 @@ macro_rules! statestore_integration_tests {
let store = get_store().await?.into_state_store();
store.test_thread_subscriptions_bumpstamps().await
}
#[async_test]
async fn test_thread_subscriptions_bulk_upsert() -> TestResult {
let store = get_store().await?.into_state_store();
store.test_thread_subscriptions_bulk_upsert().await
}
}
};
}

View File

@@ -1017,6 +1017,33 @@ impl StateStore for MemoryStore {
Ok(())
}
async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
) -> Result<(), Self::Error> {
let mut inner = self.inner.write().unwrap();
for (room_id, thread_id, mut new) in updates {
let room_subs = inner.thread_subscriptions.entry(room_id.to_owned()).or_default();
if let Some(previous) = room_subs.get(thread_id) {
if *previous == new {
continue;
}
if !compare_thread_subscription_bump_stamps(
previous.bump_stamp,
&mut new.bump_stamp,
) {
continue;
}
}
room_subs.insert(thread_id.to_owned(), new);
}
Ok(())
}
async fn load_thread_subscription(
&self,
room: &RoomId,

View File

@@ -487,7 +487,7 @@ pub trait StateStore: AsyncTraitDeps {
/// bumpstamp is kept.
///
/// If the new thread subscription has a bumpstamp that's lower than or
/// equal to a previously one, the existing subscription is kept, i.e.
/// equal to a previous one, the existing subscription is kept, i.e.
/// this method must have no effect.
async fn upsert_thread_subscription(
&self,
@@ -496,6 +496,20 @@ pub trait StateStore: AsyncTraitDeps {
subscription: StoredThreadSubscription,
) -> Result<(), Self::Error>;
/// Inserts or updates multiple thread subscriptions.
///
/// If the new thread subscription hasn't set a bumpstamp, and there was a
/// previous subscription in the database with a bumpstamp, the existing
/// bumpstamp is kept.
///
/// If the new thread subscription has a bumpstamp that's lower than or
/// equal to a previous one, the existing subscription is kept, i.e.
/// this method must have no effect.
async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
) -> Result<(), Self::Error>;
/// Remove a previous thread subscription for a given room and thread.
///
/// Note: removing an unknown thread subscription is a no-op.
@@ -828,6 +842,13 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
self.0.upsert_thread_subscription(room, thread_id, subscription).await.map_err(Into::into)
}
async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
) -> Result<(), Self::Error> {
self.0.upsert_thread_subscriptions(updates).await.map_err(Into::into)
}
async fn load_thread_subscription(
&self,
room: &RoomId,

View File

@@ -19,6 +19,8 @@ All notable changes to this project will be documented in this file.
([#5819](https://github.com/matrix-org/matrix-rust-sdk/pull/5819))
- [**breaking**] `IndexeddbCryptoStore::get_withheld_info` now returns `Result<Option<RoomKeyWithheldEntry>, ...>`.
([#5737](https://github.com/matrix-org/matrix-rust-sdk/pull/5737))
- Implement `StateStore::upsert_thread_subscriptions()` method for bulk upserts.
([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848))
### Performance

View File

@@ -1978,6 +1978,47 @@ impl_state_store!({
Ok(())
}
async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
) -> Result<()> {
let tx = self
.inner
.transaction(keys::THREAD_SUBSCRIPTIONS)
.with_mode(TransactionMode::Readwrite)
.build()?;
let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
for (room_id, thread_id, subscription) in updates {
let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room_id, thread_id));
let mut new = PersistedThreadSubscription::from(subscription);
// See if there's a previous subscription.
if let Some(previous_value) = obj.get(&encoded_key).await? {
let previous: PersistedThreadSubscription =
self.deserialize_value(&previous_value)?;
// If the previous status is the same as the new one, don't do anything.
if new == previous {
continue;
}
if !compare_thread_subscription_bump_stamps(
previous.bump_stamp,
&mut new.bump_stamp,
) {
continue;
}
}
let serialized_value = self.serialize_value(&new);
obj.put(&serialized_value?).with_key(encoded_key).build()?;
}
tx.commit().await?;
Ok(())
}
async fn load_thread_subscription(
&self,
room: &RoomId,

View File

@@ -17,6 +17,8 @@ All notable changes to this project will be documented in this file.
- Implement a new constructor that allows to open `SqliteCryptoStore` with a cryptographic key
([#5472](https://github.com/matrix-org/matrix-rust-sdk/pull/5472))
- Implement `StateStore::upsert_thread_subscriptions()` method for bulk upserts.
([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848))
### Refactor
- [breaking] Change the logic for opening a store so as to use a `Secret` enum in the function `open_with_pool` instead of a `passphrase`

View File

@@ -2230,6 +2230,57 @@ impl StateStore for SqliteStateStore {
Ok(())
}
async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
) -> Result<(), Self::Error> {
let values: Vec<_> = updates
.into_iter()
.map(|(room_id, thread_id, subscription)| {
(
self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id),
self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id),
subscription.status.as_str(),
subscription.bump_stamp,
)
})
.collect();
self.write()
.await
.with_transaction(move |txn| {
let mut txn = txn.prepare_cached(
"INSERT INTO thread_subscriptions (room_id, event_id, status, bump_stamp)
VALUES (?, ?, ?, ?)
ON CONFLICT (room_id, event_id) DO UPDATE
SET
status =
CASE
WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.status
WHEN EXCLUDED.bump_stamp IS NULL THEN EXCLUDED.status
WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.status
ELSE thread_subscriptions.status
END,
bump_stamp =
CASE
WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.bump_stamp
WHEN EXCLUDED.bump_stamp IS NULL THEN thread_subscriptions.bump_stamp
WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.bump_stamp
ELSE thread_subscriptions.bump_stamp
END",
)?;
for value in values {
txn.execute(value)?;
}
Result::<_, Error>::Ok(())
})
.await?;
Ok(())
}
async fn load_thread_subscription(
&self,
room_id: &RoomId,