refactor(state store): remove StateStore::upsert_thread_subscription

There is `StateStore::upsert_thread_subscriptions` as a proper
replacement these days.
This commit is contained in:
Benjamin Bouvier
2026-01-15 15:37:05 +01:00
committed by Ivan Enderlin
parent e9f5ed108a
commit 5b8ff8a76d
7 changed files with 16 additions and 217 deletions

View File

@@ -22,6 +22,8 @@ All notable changes to this project will be documented in this file.
### Refactor
- [**breaking**] The `StateStore::upsert_thread_subscription` method has been removed in favor of a
bulk method `StateStore::upsert_thread_subscriptions`.
- [**breaking**] The `message-ids` feature has been removed. It was already a no-op and has now
been eliminated entirely.
([#5963](https://github.com/matrix-org/matrix-rust-sdk/pull/5963))

View File

@@ -109,8 +109,6 @@ pub trait StateStoreIntegrationTests {
async fn test_get_room_infos(&self) -> TestResult;
/// Test loading thread subscriptions.
async fn test_thread_subscriptions(&self) -> TestResult;
/// Test thread subscription bumpstamp semantics.
async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult;
/// Test thread subscriptions bulk upsert, including bumpstamp semantics.
async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult;
}
@@ -1844,24 +1842,24 @@ impl StateStoreIntegrationTests for DynStateStore {
assert!(maybe_sub.is_none());
// Setting the thread subscription works.
self.upsert_thread_subscription(
self.upsert_thread_subscriptions(vec![(
room_id(),
first_thread,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: None,
},
)
)])
.await?;
self.upsert_thread_subscription(
self.upsert_thread_subscriptions(vec![(
room_id(),
second_thread,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: false },
bump_stamp: None,
},
)
)])
.await?;
// Now, reading the thread subscription returns the expected status.
@@ -1884,14 +1882,14 @@ impl StateStoreIntegrationTests for DynStateStore {
);
// We can override the thread subscription status.
self.upsert_thread_subscription(
self.upsert_thread_subscriptions(vec![(
room_id(),
first_thread,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: None,
},
)
)])
.await?;
// And it's correctly reflected.
@@ -1937,76 +1935,6 @@ impl StateStoreIntegrationTests for DynStateStore {
Ok(())
}
async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult {
let thread = event_id!("$fred");
// At first, there is no thread subscription.
let sub = self.load_thread_subscription(room_id(), thread).await?;
assert!(sub.is_none());
// Setting the thread subscription with some bumpstamp works.
self.upsert_thread_subscription(
room_id(),
thread,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(42),
},
)
.await?;
let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap();
assert_eq!(
sub,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(42),
}
);
// Storing a subscription with an older bumpstamp has no effect.
self.upsert_thread_subscription(
room_id(),
thread,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: false },
bump_stamp: Some(41),
},
)
.await?;
let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap();
assert_eq!(
sub,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(42),
}
);
// Storing with no bumpstamps keeps the previous one.
self.upsert_thread_subscription(
room_id(),
thread,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: None,
},
)
.await?;
let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap();
assert_eq!(
sub,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(42),
}
);
Ok(())
}
async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult {
let threads = [
event_id!("$t1"),
@@ -2370,12 +2298,6 @@ macro_rules! statestore_integration_tests {
store.test_thread_subscriptions().await
}
#[async_test]
async fn test_thread_subscriptions_bumpstamps() -> TestResult {
let store = get_store().await?.into_state_store();
store.test_thread_subscriptions_bumpstamps().await
}
#[async_test]
async fn test_thread_subscriptions_bulk_upsert() -> TestResult {
let store = get_store().await?.into_state_store();

View File

@@ -993,30 +993,6 @@ impl StateStore for MemoryStore {
.unwrap_or_default())
}
async fn upsert_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
mut new: StoredThreadSubscription,
) -> Result<(), Self::Error> {
let mut inner = self.inner.write().unwrap();
let room_subs = inner.thread_subscriptions.entry(room.to_owned()).or_default();
if let Some(previous) = room_subs.get(thread_id) {
// Nothing to do.
if *previous == new {
return Ok(());
}
if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) {
return Ok(());
}
}
room_subs.insert(thread_id.to_owned(), new);
Ok(())
}
async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,

View File

@@ -480,22 +480,6 @@ pub trait StateStore: AsyncTraitDeps {
room: &RoomId,
) -> Result<Vec<DependentQueuedRequest>, Self::Error>;
/// Insert or update a thread subscription for a given room and thread.
///
/// If the new thread subscription hasn't set a bumpstamp, and there was a
/// previous subscription in the database with a bumpstamp, the existing
/// bumpstamp is kept.
///
/// If the new thread subscription has a bumpstamp that's lower than or
/// equal to a previous one, the existing subscription is kept, i.e.
/// this method must have no effect.
async fn upsert_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
subscription: StoredThreadSubscription,
) -> Result<(), Self::Error>;
/// Inserts or updates multiple thread subscriptions.
///
/// If the new thread subscription hasn't set a bumpstamp, and there was a
@@ -833,15 +817,6 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
.map_err(Into::into)
}
async fn upsert_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
subscription: StoredThreadSubscription,
) -> Result<(), Self::Error> {
self.0.upsert_thread_subscription(room, thread_id, subscription).await.map_err(Into::into)
}
async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,

View File

@@ -1946,44 +1946,6 @@ impl_state_store!({
)
}
async fn upsert_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
subscription: StoredThreadSubscription,
) -> Result<()> {
let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
let tx = self
.inner
.transaction(keys::THREAD_SUBSCRIPTIONS)
.with_mode(TransactionMode::Readwrite)
.build()?;
let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
let mut new = PersistedThreadSubscription::from(subscription);
// See if there's a previous subscription.
if let Some(previous_value) = obj.get(&encoded_key).await? {
let previous: PersistedThreadSubscription = self.deserialize_value(&previous_value)?;
// If the previous status is the same as the new one, don't do anything.
if new == previous {
return Ok(());
}
if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) {
return Ok(());
}
}
let serialized_value = self.serialize_value(&new);
obj.put(&serialized_value?).with_key(encoded_key).build()?;
tx.commit().await?;
Ok(())
}
async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,

View File

@@ -15,8 +15,7 @@ use matrix_sdk_base::{
store::{
ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
StoredThreadSubscription, ThreadSubscriptionStatus,
compare_thread_subscription_bump_stamps, migration_helpers::RoomInfoV1,
StoredThreadSubscription, ThreadSubscriptionStatus, migration_helpers::RoomInfoV1,
},
};
use matrix_sdk_store_encryption::StoreCipher;
@@ -42,7 +41,7 @@ use tokio::{
fs,
sync::{Mutex, OwnedMutexGuard},
};
use tracing::{debug, instrument, trace, warn};
use tracing::{debug, instrument, warn};
use crate::{
OpenStoreError, Secret, SqliteStoreConfig,
@@ -2195,43 +2194,6 @@ impl StateStore for SqliteStateStore {
Ok(dependent_events)
}
async fn upsert_thread_subscription(
&self,
room_id: &RoomId,
thread_id: &EventId,
mut new: StoredThreadSubscription,
) -> Result<(), Self::Error> {
if let Some(previous) = self.load_thread_subscription(room_id, thread_id).await? {
if previous == new {
// No need to update anything.
trace!("not saving thread subscription because the subscription is the same");
return Ok(());
}
if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) {
trace!("not saving thread subscription because we have a newer bump stamp");
return Ok(());
}
}
let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
let status = new.status.as_str();
self.write()
.await
.with_transaction(move |txn| {
// Try to find a previous value.
txn.prepare_cached(
"INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status, bump_stamp)
VALUES (?, ?, ?, ?)",
)?
.execute((room_id, thread_id, status, new.bump_stamp))
})
.await?;
Ok(())
}
async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,

View File

@@ -4199,7 +4199,7 @@ impl Room {
// Immediately save the result into the database.
self.client
.state_store()
.upsert_thread_subscription(
.upsert_thread_subscriptions(vec![(
self.room_id(),
&thread_root,
StoredThreadSubscription {
@@ -4208,7 +4208,7 @@ impl Room {
},
bump_stamp: None,
},
)
)])
.await?;
Ok(())
@@ -4277,14 +4277,14 @@ impl Room {
// Immediately save the result into the database.
self.client
.state_store()
.upsert_thread_subscription(
.upsert_thread_subscriptions(vec![(
self.room_id(),
&thread_root,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: None,
},
)
)])
.await?;
Ok(())
@@ -4331,14 +4331,14 @@ impl Room {
if let Some(sub) = &subscription {
self.client
.state_store()
.upsert_thread_subscription(
.upsert_thread_subscriptions(vec![(
self.room_id(),
&thread_root,
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
bump_stamp: None,
},
)
)])
.await?;
} else {
// If the subscription was not found, remove it from the database.