mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-06-10 17:34:20 -04:00
feat(base): use save locked state store in base state store
Signed-off-by: Michael Goldenberg <m@mgoldenberg.net>
This commit is contained in:
committed by
Damir Jelić
parent
e7fb820f7e
commit
5dff50aff5
@@ -55,7 +55,7 @@ use tracing::{Level, debug, enabled, info, instrument, warn};
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
use crate::RoomMemberships;
|
||||
use crate::{
|
||||
RoomStateFilter, SessionMeta,
|
||||
RoomStateFilter, SessionMeta, StateStore,
|
||||
deserialized_responses::DisplayName,
|
||||
error::{Error, Result},
|
||||
event_cache::store::{EventCacheStoreLock, EventCacheStoreLockState},
|
||||
@@ -652,7 +652,7 @@ impl BaseClient {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.clone());
|
||||
let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.store().clone());
|
||||
|
||||
let global_account_data_processor =
|
||||
processors::account_data::global(&response.account_data.events);
|
||||
|
||||
@@ -28,7 +28,7 @@ use ruma::{
|
||||
use tracing::{debug, instrument, trace, warn};
|
||||
|
||||
use super::super::Context;
|
||||
use crate::{RoomInfo, StateChanges, store::BaseStateStore};
|
||||
use crate::{RoomInfo, StateChanges, StateStore, store::BaseStateStore};
|
||||
|
||||
/// Create the [`Global`] account data processor.
|
||||
pub fn global(events: &[Raw<AnyGlobalAccountDataEvent>]) -> Global {
|
||||
|
||||
@@ -22,7 +22,7 @@ use tracing::{error, instrument, trace};
|
||||
|
||||
use super::Context;
|
||||
use crate::{
|
||||
Result,
|
||||
Result, StateStore,
|
||||
store::{BaseStateStore, StateStoreExt as _},
|
||||
};
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use matrix_sdk_common::timer;
|
||||
use matrix_sdk_crypto::OlmMachine;
|
||||
use ruma::{OwnedUserId, RoomId};
|
||||
|
||||
use crate::{EncryptionState, Result, RoomMemberships, store::BaseStateStore};
|
||||
use crate::{EncryptionState, Result, RoomMemberships, StateStore, store::BaseStateStore};
|
||||
|
||||
/// Update tracked users, if the room is encrypted.
|
||||
pub async fn update(
|
||||
|
||||
@@ -115,7 +115,7 @@ impl BaseClient {
|
||||
let mut context = processors::Context::default();
|
||||
|
||||
let state_store = self.state_store.clone();
|
||||
let mut ambiguity_cache = AmbiguityCache::new(state_store.inner.clone());
|
||||
let mut ambiguity_cache = AmbiguityCache::new(state_store.inner.store().clone());
|
||||
|
||||
let global_account_data_processor =
|
||||
processors::account_data::global(&extensions.account_data.global);
|
||||
|
||||
@@ -94,9 +94,9 @@ pub use self::{
|
||||
},
|
||||
traits::{
|
||||
ComposerDraft, ComposerDraftType, DraftAttachment, DraftAttachmentContent, DraftThumbnail,
|
||||
DynStateStore, IntoStateStore, StateStore, StateStoreDataKey, StateStoreDataValue,
|
||||
StateStoreExt, SupportedVersionsResponse, ThreadSubscriptionCatchupToken,
|
||||
WellKnownResponse,
|
||||
DynStateStore, IntoStateStore, SaveLockedStateStore, StateStore, StateStoreDataKey,
|
||||
StateStoreDataValue, StateStoreExt, SupportedVersionsResponse,
|
||||
ThreadSubscriptionCatchupToken, WellKnownResponse,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -176,7 +176,7 @@ pub type Result<T, E = StoreError> = std::result::Result<T, E>;
|
||||
/// `StateStore` implementation.
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct BaseStateStore {
|
||||
pub(super) inner: Arc<DynStateStore>,
|
||||
pub(super) inner: SaveLockedStateStore,
|
||||
session_meta: Arc<OnceLock<SessionMeta>>,
|
||||
room_load_settings: Arc<RwLock<RoomLoadSettings>>,
|
||||
|
||||
@@ -190,10 +190,6 @@ pub(crate) struct BaseStateStore {
|
||||
/// All rooms the store knows about.
|
||||
rooms: Arc<StdRwLock<ObservableMap<OwnedRoomId, Room>>>,
|
||||
|
||||
/// A lock to synchronize access to the store, such that data by the sync is
|
||||
/// never overwritten.
|
||||
lock: Arc<Mutex<()>>,
|
||||
|
||||
/// Which rooms have already logged a log line about missing room info, in
|
||||
/// the context of response processors?
|
||||
pub(crate) already_logged_missing_room: Arc<SyncMutex<HashSet<OwnedRoomId>>>,
|
||||
@@ -215,20 +211,19 @@ impl BaseStateStore {
|
||||
broadcast::channel(500);
|
||||
|
||||
Self {
|
||||
inner,
|
||||
inner: SaveLockedStateStore::new(inner),
|
||||
session_meta: Default::default(),
|
||||
room_load_settings: Default::default(),
|
||||
room_info_notable_update_sender,
|
||||
sync_token: Default::default(),
|
||||
rooms: Arc::new(StdRwLock::new(ObservableMap::new())),
|
||||
lock: Default::default(),
|
||||
already_logged_missing_room: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get access to the syncing lock.
|
||||
pub fn lock(&self) -> &Mutex<()> {
|
||||
&self.lock
|
||||
self.inner.lock()
|
||||
}
|
||||
|
||||
/// Set the [`SessionMeta`] into [`BaseStateStore::session_meta`].
|
||||
@@ -256,7 +251,7 @@ impl BaseStateStore {
|
||||
for room_info in room_infos {
|
||||
let new_room = Room::restore(
|
||||
user_id,
|
||||
self.inner.clone(),
|
||||
self.inner.store().clone(),
|
||||
room_info,
|
||||
self.room_info_notable_update_sender.clone(),
|
||||
);
|
||||
@@ -278,7 +273,7 @@ impl BaseStateStore {
|
||||
let mut migrated_room_infos = Vec::with_capacity(room_infos.len());
|
||||
|
||||
for room_info in room_infos.iter_mut() {
|
||||
if room_info.apply_migrations(self.inner.clone()).await {
|
||||
if room_info.apply_migrations(self.inner.store().clone()).await {
|
||||
migrated_room_infos.push(room_info.clone());
|
||||
}
|
||||
}
|
||||
@@ -378,7 +373,7 @@ impl BaseStateStore {
|
||||
.get_or_create(room_id, || {
|
||||
Room::new(
|
||||
user_id,
|
||||
self.inner.clone(),
|
||||
self.inner.store().clone(),
|
||||
room_id,
|
||||
room_state,
|
||||
self.room_info_notable_update_sender.clone(),
|
||||
@@ -412,10 +407,10 @@ impl fmt::Debug for BaseStateStore {
|
||||
}
|
||||
|
||||
impl Deref for BaseStateStore {
|
||||
type Target = DynStateStore;
|
||||
type Target = SaveLockedStateStore;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.inner.deref()
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
@@ -874,7 +869,7 @@ mod tests {
|
||||
use ruma::{owned_device_id, owned_user_id, room_id, user_id};
|
||||
|
||||
use super::{BaseStateStore, MemoryStore, RoomLoadSettings};
|
||||
use crate::{RoomInfo, RoomState, SessionMeta, StateChanges};
|
||||
use crate::{RoomInfo, RoomState, SessionMeta, StateChanges, StateStore};
|
||||
|
||||
#[async_test]
|
||||
async fn test_set_session_meta() {
|
||||
|
||||
@@ -1456,7 +1456,6 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
|
||||
|
||||
/// A wrapper around a [`StateStore`] that supports synchronizing calls to
|
||||
/// [`StateStore::save_changes`].
|
||||
#[allow(unused)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SaveLockedStateStore<T = Arc<DynStateStore>> {
|
||||
store: T,
|
||||
@@ -1479,17 +1478,23 @@ impl From<IncorrectMutexGuardError> for StoreError {
|
||||
|
||||
impl<T> SaveLockedStateStore<T> {
|
||||
/// Creates a new [`SaveLockedStateStore`] with the provided store.
|
||||
#[allow(unused)]
|
||||
pub fn new(store: T) -> Self {
|
||||
Self { store, lock: Arc::new(Mutex::new(())) }
|
||||
}
|
||||
|
||||
/// Returns a reference to the underlying [`Mutex`] used to synchronize
|
||||
/// calls to [`StateStore::save_changes`].
|
||||
#[allow(unused)]
|
||||
pub fn lock(&self) -> &Mutex<()> {
|
||||
self.lock.as_ref()
|
||||
}
|
||||
|
||||
// This function is added temporarily to allow for making incremental
|
||||
// commits when types are being transitioned to use a [`SaveLockedStateStore`].
|
||||
// This should be deleted once the transition is complete, as the underlying
|
||||
// store should not be accessible outside of [`SaveLockedStateStore`].
|
||||
pub(crate) fn store(&self) -> &T {
|
||||
&self.store
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: StateStore> SaveLockedStateStore<T> {
|
||||
|
||||
Reference in New Issue
Block a user