From 5dff50aff508b608fe4092daaeef0a640b2c7110 Mon Sep 17 00:00:00 2001 From: Michael Goldenberg Date: Fri, 17 Apr 2026 14:07:31 -0400 Subject: [PATCH] feat(base): use save locked state store in base state store Signed-off-by: Michael Goldenberg --- crates/matrix-sdk-base/src/client.rs | 4 +-- .../account_data/global.rs | 2 +- .../src/response_processors/changes.rs | 2 +- .../response_processors/e2ee/tracked_users.rs | 2 +- crates/matrix-sdk-base/src/sliding_sync.rs | 2 +- crates/matrix-sdk-base/src/store/mod.rs | 29 ++++++++----------- crates/matrix-sdk-base/src/store/traits.rs | 11 +++++-- 7 files changed, 26 insertions(+), 26 deletions(-) diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index 872505daa..d29e9f3ca 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -55,7 +55,7 @@ use tracing::{Level, debug, enabled, info, instrument, warn}; #[cfg(feature = "e2e-encryption")] use crate::RoomMemberships; use crate::{ - RoomStateFilter, SessionMeta, + RoomStateFilter, SessionMeta, StateStore, deserialized_responses::DisplayName, error::{Error, Result}, event_cache::store::{EventCacheStoreLock, EventCacheStoreLockState}, @@ -652,7 +652,7 @@ impl BaseClient { }) .collect(); - let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.clone()); + let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.store().clone()); let global_account_data_processor = processors::account_data::global(&response.account_data.events); diff --git a/crates/matrix-sdk-base/src/response_processors/account_data/global.rs b/crates/matrix-sdk-base/src/response_processors/account_data/global.rs index 6ee86c913..60d9e4bda 100644 --- a/crates/matrix-sdk-base/src/response_processors/account_data/global.rs +++ b/crates/matrix-sdk-base/src/response_processors/account_data/global.rs @@ -28,7 +28,7 @@ use ruma::{ use tracing::{debug, instrument, trace, warn}; use super::super::Context; -use crate::{RoomInfo, StateChanges, store::BaseStateStore}; +use crate::{RoomInfo, StateChanges, StateStore, store::BaseStateStore}; /// Create the [`Global`] account data processor. pub fn global(events: &[Raw]) -> Global { diff --git a/crates/matrix-sdk-base/src/response_processors/changes.rs b/crates/matrix-sdk-base/src/response_processors/changes.rs index f6bc0162b..e70446bfd 100644 --- a/crates/matrix-sdk-base/src/response_processors/changes.rs +++ b/crates/matrix-sdk-base/src/response_processors/changes.rs @@ -22,7 +22,7 @@ use tracing::{error, instrument, trace}; use super::Context; use crate::{ - Result, + Result, StateStore, store::{BaseStateStore, StateStoreExt as _}, }; diff --git a/crates/matrix-sdk-base/src/response_processors/e2ee/tracked_users.rs b/crates/matrix-sdk-base/src/response_processors/e2ee/tracked_users.rs index 4749f215a..a20beee55 100644 --- a/crates/matrix-sdk-base/src/response_processors/e2ee/tracked_users.rs +++ b/crates/matrix-sdk-base/src/response_processors/e2ee/tracked_users.rs @@ -18,7 +18,7 @@ use matrix_sdk_common::timer; use matrix_sdk_crypto::OlmMachine; use ruma::{OwnedUserId, RoomId}; -use crate::{EncryptionState, Result, RoomMemberships, store::BaseStateStore}; +use crate::{EncryptionState, Result, RoomMemberships, StateStore, store::BaseStateStore}; /// Update tracked users, if the room is encrypted. pub async fn update( diff --git a/crates/matrix-sdk-base/src/sliding_sync.rs b/crates/matrix-sdk-base/src/sliding_sync.rs index a2e7a6517..f295cfd18 100644 --- a/crates/matrix-sdk-base/src/sliding_sync.rs +++ b/crates/matrix-sdk-base/src/sliding_sync.rs @@ -115,7 +115,7 @@ impl BaseClient { let mut context = processors::Context::default(); let state_store = self.state_store.clone(); - let mut ambiguity_cache = AmbiguityCache::new(state_store.inner.clone()); + let mut ambiguity_cache = AmbiguityCache::new(state_store.inner.store().clone()); let global_account_data_processor = processors::account_data::global(&extensions.account_data.global); diff --git a/crates/matrix-sdk-base/src/store/mod.rs b/crates/matrix-sdk-base/src/store/mod.rs index e8308ccdc..158e35c5f 100644 --- a/crates/matrix-sdk-base/src/store/mod.rs +++ b/crates/matrix-sdk-base/src/store/mod.rs @@ -94,9 +94,9 @@ pub use self::{ }, traits::{ ComposerDraft, ComposerDraftType, DraftAttachment, DraftAttachmentContent, DraftThumbnail, - DynStateStore, IntoStateStore, StateStore, StateStoreDataKey, StateStoreDataValue, - StateStoreExt, SupportedVersionsResponse, ThreadSubscriptionCatchupToken, - WellKnownResponse, + DynStateStore, IntoStateStore, SaveLockedStateStore, StateStore, StateStoreDataKey, + StateStoreDataValue, StateStoreExt, SupportedVersionsResponse, + ThreadSubscriptionCatchupToken, WellKnownResponse, }, }; @@ -176,7 +176,7 @@ pub type Result = std::result::Result; /// `StateStore` implementation. #[derive(Clone)] pub(crate) struct BaseStateStore { - pub(super) inner: Arc, + pub(super) inner: SaveLockedStateStore, session_meta: Arc>, room_load_settings: Arc>, @@ -190,10 +190,6 @@ pub(crate) struct BaseStateStore { /// All rooms the store knows about. rooms: Arc>>, - /// A lock to synchronize access to the store, such that data by the sync is - /// never overwritten. - lock: Arc>, - /// Which rooms have already logged a log line about missing room info, in /// the context of response processors? pub(crate) already_logged_missing_room: Arc>>, @@ -215,20 +211,19 @@ impl BaseStateStore { broadcast::channel(500); Self { - inner, + inner: SaveLockedStateStore::new(inner), session_meta: Default::default(), room_load_settings: Default::default(), room_info_notable_update_sender, sync_token: Default::default(), rooms: Arc::new(StdRwLock::new(ObservableMap::new())), - lock: Default::default(), already_logged_missing_room: Default::default(), } } /// Get access to the syncing lock. pub fn lock(&self) -> &Mutex<()> { - &self.lock + self.inner.lock() } /// Set the [`SessionMeta`] into [`BaseStateStore::session_meta`]. @@ -256,7 +251,7 @@ impl BaseStateStore { for room_info in room_infos { let new_room = Room::restore( user_id, - self.inner.clone(), + self.inner.store().clone(), room_info, self.room_info_notable_update_sender.clone(), ); @@ -278,7 +273,7 @@ impl BaseStateStore { let mut migrated_room_infos = Vec::with_capacity(room_infos.len()); for room_info in room_infos.iter_mut() { - if room_info.apply_migrations(self.inner.clone()).await { + if room_info.apply_migrations(self.inner.store().clone()).await { migrated_room_infos.push(room_info.clone()); } } @@ -378,7 +373,7 @@ impl BaseStateStore { .get_or_create(room_id, || { Room::new( user_id, - self.inner.clone(), + self.inner.store().clone(), room_id, room_state, self.room_info_notable_update_sender.clone(), @@ -412,10 +407,10 @@ impl fmt::Debug for BaseStateStore { } impl Deref for BaseStateStore { - type Target = DynStateStore; + type Target = SaveLockedStateStore; fn deref(&self) -> &Self::Target { - self.inner.deref() + &self.inner } } @@ -874,7 +869,7 @@ mod tests { use ruma::{owned_device_id, owned_user_id, room_id, user_id}; use super::{BaseStateStore, MemoryStore, RoomLoadSettings}; - use crate::{RoomInfo, RoomState, SessionMeta, StateChanges}; + use crate::{RoomInfo, RoomState, SessionMeta, StateChanges, StateStore}; #[async_test] async fn test_set_session_meta() { diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index 51f9b89a6..d3acfdda3 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -1456,7 +1456,6 @@ impl StateStore for EraseStateStoreError { /// A wrapper around a [`StateStore`] that supports synchronizing calls to /// [`StateStore::save_changes`]. -#[allow(unused)] #[derive(Debug, Clone)] pub struct SaveLockedStateStore> { store: T, @@ -1479,17 +1478,23 @@ impl From for StoreError { impl SaveLockedStateStore { /// Creates a new [`SaveLockedStateStore`] with the provided store. - #[allow(unused)] pub fn new(store: T) -> Self { Self { store, lock: Arc::new(Mutex::new(())) } } /// Returns a reference to the underlying [`Mutex`] used to synchronize /// calls to [`StateStore::save_changes`]. - #[allow(unused)] pub fn lock(&self) -> &Mutex<()> { self.lock.as_ref() } + + // This function is added temporarily to allow for making incremental + // commits when types are being transitioned to use a [`SaveLockedStateStore`]. + // This should be deleted once the transition is complete, as the underlying + // store should not be accessible outside of [`SaveLockedStateStore`]. + pub(crate) fn store(&self) -> &T { + &self.store + } } impl SaveLockedStateStore {