From 685cc2bbc35ab876ba3953c2a7150e704e2b765b Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 5 Sep 2023 17:22:07 +0200 Subject: [PATCH] feat: make the cross-process store locks generic And move the implementation to the common crate. --- Cargo.lock | 3 + crates/matrix-sdk-common/Cargo.toml | 7 +- crates/matrix-sdk-common/src/lib.rs | 3 + .../src/store_locks.rs} | 275 +++++++++++------- crates/matrix-sdk-crypto/src/machine.rs | 21 +- .../src/store/crypto_store_wrapper.rs | 17 +- crates/matrix-sdk-crypto/src/store/error.rs | 7 +- crates/matrix-sdk-crypto/src/store/mod.rs | 39 ++- .../src/encryption_sync_service.rs | 10 +- crates/matrix-sdk/src/client/mod.rs | 10 +- crates/matrix-sdk/src/encryption/mod.rs | 10 +- crates/matrix-sdk/src/error.rs | 6 + crates/matrix-sdk/src/lib.rs | 1 + 13 files changed, 251 insertions(+), 158 deletions(-) rename crates/{matrix-sdk-crypto/src/store/locks.rs => matrix-sdk-common/src/store_locks.rs} (63%) diff --git a/Cargo.lock b/Cargo.lock index d0b57d116..a7d516cdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3122,6 +3122,8 @@ dependencies = [ name = "matrix-sdk-common" version = "0.6.0" dependencies = [ + "assert_matches", + "async-trait", "futures-core", "futures-util", "gloo-timers", @@ -3130,6 +3132,7 @@ dependencies = [ "ruma", "serde", "serde_json", + "thiserror", "tokio", "tracing", "tracing-subscriber", diff --git a/crates/matrix-sdk-common/Cargo.toml b/crates/matrix-sdk-common/Cargo.toml index 9fcc5b81b..94416715a 100644 --- a/crates/matrix-sdk-common/Cargo.toml +++ b/crates/matrix-sdk-common/Cargo.toml @@ -19,22 +19,23 @@ targets = ["x86_64-unknown-linux-gnu", "wasm32-unknown-unknown"] js = ["instant/wasm-bindgen", "instant/inaccurate", "wasm-bindgen-futures"] [dependencies] +async-trait = { workspace = true } futures-core = { workspace = true } instant = "0.1.12" ruma = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +thiserror = { workspace = true } tracing = { workspace = true } +tokio = { workspace = true, features = ["rt", "time", "sync"] } [target.'cfg(target_arch = "wasm32")'.dependencies] futures-util = { workspace = true, features = ["channel"] } wasm-bindgen-futures = { version = "0.4.33", optional = true } gloo-timers = { version = "0.2.6", features = ["futures"] } -[target.'cfg(not(target_arch = "wasm32"))'.dependencies] -tokio = { workspace = true, features = ["rt", "time"] } - [dev-dependencies] +assert_matches = { workspace = true } matrix-sdk-test = { path = "../../testing/matrix-sdk-test/", version= "0.6.0"} wasm-bindgen-test = "0.3.33" tracing-subscriber = "0.3.15" diff --git a/crates/matrix-sdk-common/src/lib.rs b/crates/matrix-sdk-common/src/lib.rs index 5d4109293..e3de14681 100644 --- a/crates/matrix-sdk-common/src/lib.rs +++ b/crates/matrix-sdk-common/src/lib.rs @@ -24,9 +24,12 @@ pub mod debug; pub mod deserialized_responses; pub mod executor; pub mod ring_buffer; +pub mod store_locks; pub mod timeout; pub mod tracing_timer; +pub use store_locks::LEASE_DURATION_MS; + /// Alias for `Send` on non-wasm, empty trait (implemented by everything) on /// wasm. #[cfg(not(target_arch = "wasm32"))] diff --git a/crates/matrix-sdk-crypto/src/store/locks.rs b/crates/matrix-sdk-common/src/store_locks.rs similarity index 63% rename from crates/matrix-sdk-crypto/src/store/locks.rs rename to crates/matrix-sdk-common/src/store_locks.rs index 137bfef63..b6b06a8eb 100644 --- a/crates/matrix-sdk-crypto/src/store/locks.rs +++ b/crates/matrix-sdk-common/src/store_locks.rs @@ -16,13 +16,13 @@ //! //! This is a per-process lock that may be used only for very specific use //! cases, where multiple processes might concurrently write to the same -//! database at the same time; this would invalidate crypto store caches, so +//! database at the same time; this would invalidate store caches, so //! that should be done mindfully. Such a lock can be acquired multiple times by //! the same process, and it remains active as long as there's at least one user //! in a given process. //! //! The lock is implemented using time-based leases to values inserted in a -//! crypto store. The store maintains the lock identifier (key), who's the +//! store. The store maintains the lock identifier (key), who's the //! current holder (value), and an expiration timestamp on the side; see also //! `CryptoStore::try_take_leased_lock` for more details. //! @@ -38,16 +38,36 @@ //! automatically after the duration of the last lease, at most. use std::{ - sync::{atomic::AtomicU32, Arc}, + error::Error, + sync::{ + atomic::{self, AtomicU32}, + Arc, + }, time::Duration, }; -use matrix_sdk_common::executor::JoinHandle; use tokio::{sync::Mutex, time::sleep}; use tracing::instrument; -use super::DynCryptoStore; -use crate::CryptoStoreError; +use crate::{ + executor::{spawn, JoinHandle}, + SendOutsideWasm, +}; + +/// Backing store for a cross-process lock. +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +pub trait BackingStore { + type Error: Error + Send + Sync; + + /// Try to take a lock using the given store. + async fn try_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> Result; +} /// Small state machine to handle wait times. #[derive(Clone, Debug)] @@ -58,26 +78,28 @@ enum WaitingTime { Stop, } -/// A guard on the crypto store lock. +/// A guard on the store lock. /// /// The lock will be automatically released a short period of time after all the /// guards have dropped. #[derive(Debug)] -pub struct CryptoStoreLockGuard { +pub struct CrossProcessStoreLockGuard { num_holders: Arc, } -impl Drop for CryptoStoreLockGuard { +impl Drop for CrossProcessStoreLockGuard { fn drop(&mut self) { self.num_holders.fetch_sub(1, atomic::Ordering::SeqCst); } } -/// A store-based lock for the `CryptoStore`. +/// A store-based lock for a `Store`. +/// +/// See the doc-comment of this module for more information. #[derive(Clone, Debug)] -pub struct CryptoStoreLock { +pub struct CrossProcessStoreLock { /// The store we're using to lock. - store: Arc, + store: S, /// Number of holders of the lock in this process. /// @@ -105,38 +127,37 @@ pub struct CryptoStoreLock { backoff: Arc>, } -impl CryptoStoreLock { - /// Amount of time a lease of the lock should last, in milliseconds. - pub const LEASE_DURATION_MS: u32 = 500; +/// Amount of time a lease of the lock should last, in milliseconds. +pub const LEASE_DURATION_MS: u32 = 500; - /// Period of time between two attempts to extend the lease. We'll - /// re-request a lease for an entire duration of `LEASE_DURATION_MS` - /// milliseconds, every `EXTEND_LEASE_EVERY_MS`, so this has to - /// be an amount safely low compared to `LEASE_DURATION_MS`, to make sure - /// that we can miss a deadline without compromising the lock. - const EXTEND_LEASE_EVERY_MS: u64 = 50; +/// Period of time between two attempts to extend the lease. We'll +/// re-request a lease for an entire duration of `LEASE_DURATION_MS` +/// milliseconds, every `EXTEND_LEASE_EVERY_MS`, so this has to +/// be an amount safely low compared to `LEASE_DURATION_MS`, to make sure +/// that we can miss a deadline without compromising the lock. +pub const EXTEND_LEASE_EVERY_MS: u64 = 50; - /// Initial backoff, in milliseconds. This is the time we wait the first - /// time, if taking the lock initially failed. - const INITIAL_BACKOFF_MS: u32 = 10; +/// Initial backoff, in milliseconds. This is the time we wait the first +/// time, if taking the lock initially failed. +const INITIAL_BACKOFF_MS: u32 = 10; - /// Maximal backoff, in milliseconds. This is the maximum amount of time - /// we'll wait for the lock, *between two attempts*. - pub const MAX_BACKOFF_MS: u32 = 1000; +/// Maximal backoff, in milliseconds. This is the maximum amount of time +/// we'll wait for the lock, *between two attempts*. +pub const MAX_BACKOFF_MS: u32 = 1000; - /// Create a new store-based lock implemented as a value in the - /// crypto-store. +impl CrossProcessStoreLock { + /// Create a new store-based lock implemented as a value in the store. /// /// # Parameters /// /// - `lock_key`: key in the key-value store to store the lock's state. /// - `lock_holder`: identify the lock's holder with this given value. - pub fn new(store: Arc, lock_key: String, lock_holder: String) -> Self { + pub fn new(store: S, lock_key: String, lock_holder: String) -> Self { Self { store, lock_key, lock_holder, - backoff: Arc::new(Mutex::new(WaitingTime::Some(Self::INITIAL_BACKOFF_MS))), + backoff: Arc::new(Mutex::new(WaitingTime::Some(INITIAL_BACKOFF_MS))), num_holders: Arc::new(0.into()), locking_attempt: Arc::new(Mutex::new(())), renew_task: Default::default(), @@ -145,7 +166,9 @@ impl CryptoStoreLock { /// Try to lock once, returns whether the lock was obtained or not. #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))] - pub async fn try_lock_once(&self) -> Result, CryptoStoreError> { + pub async fn try_lock_once( + &self, + ) -> Result, LockStoreError> { // Hold onto the locking attempt mutex for the entire lifetime of this // function, to avoid multiple reentrant calls. let mut _attempt = self.locking_attempt.lock().await; @@ -159,14 +182,15 @@ impl CryptoStoreLock { // taken by at least one thread. tracing::trace!("We already had the lock, incrementing holder count"); self.num_holders.fetch_add(1, atomic::Ordering::SeqCst); - let guard = CryptoStoreLockGuard { num_holders: self.num_holders.clone() }; + let guard = CrossProcessStoreLockGuard { num_holders: self.num_holders.clone() }; return Ok(Some(guard)); } let acquired = self .store - .try_take_leased_lock(Self::LEASE_DURATION_MS, &self.lock_key, &self.lock_holder) - .await?; + .try_lock(LEASE_DURATION_MS, &self.lock_key, &self.lock_holder) + .await + .map_err(|err| LockStoreError::BackingStoreError(Box::new(err)))?; if !acquired { tracing::trace!("Couldn't acquire the lock immediately."); @@ -179,15 +203,14 @@ impl CryptoStoreLock { // that will renew the lease. // Clone data to be owned by the task. - let this = self.clone(); + let this = (*self).clone(); let mut renew_task = self.renew_task.lock().await; // Cancel the previous task, if any. That's safe to do, because: // - either the task was done, // - or it was still running, but taking a lock in the db has to be an atomic - // operation - // running in a transaction. + // operation running in a transaction. if let Some(_prev) = renew_task.take() { #[cfg(not(target_arch = "wasm32"))] @@ -195,7 +218,7 @@ impl CryptoStoreLock { } // Restart a new one. - *renew_task = Some(matrix_sdk_common::executor::spawn(async move { + *renew_task = Some(spawn(async move { loop { { // First, check if there are still users of this lock. @@ -215,27 +238,18 @@ impl CryptoStoreLock { // Cancel the lease with another 0ms lease. // If we don't get the lock, that's (weird but) fine. - let _ = this - .store - .try_take_leased_lock(0, &this.lock_key, &this.lock_holder) - .await; + let fut = this.store.try_lock(0, &this.lock_key, &this.lock_holder); + let _ = fut.await; // Exit the loop. break; } } - sleep(Duration::from_millis(Self::EXTEND_LEASE_EVERY_MS)).await; + sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await; - if let Err(err) = this - .store - .try_take_leased_lock( - Self::LEASE_DURATION_MS, - &this.lock_key, - &this.lock_holder, - ) - .await - { + let fut = this.store.try_lock(LEASE_DURATION_MS, &this.lock_key, &this.lock_holder); + if let Err(err) = fut.await { tracing::error!("error when extending lock lease: {err:#}"); // Exit the loop. break; @@ -245,7 +259,7 @@ impl CryptoStoreLock { self.num_holders.fetch_add(1, atomic::Ordering::SeqCst); - let guard = CryptoStoreLockGuard { num_holders: self.num_holders.clone() }; + let guard = CrossProcessStoreLockGuard { num_holders: self.num_holders.clone() }; Ok(Some(guard)) } @@ -256,13 +270,13 @@ impl CryptoStoreLock { /// should be waited for, between two attempts. When that time is /// reached a second time, the lock will stop attempting to get the lock /// and will return a timeout error upon locking. If not provided, - /// will wait for [`Self::MAX_BACKOFF_MS`]. + /// will wait for [`MAX_BACKOFF_MS`]. #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))] pub async fn spin_lock( &self, max_backoff: Option, - ) -> Result { - let max_backoff = max_backoff.unwrap_or(Self::MAX_BACKOFF_MS); + ) -> Result { + let max_backoff = max_backoff.unwrap_or(MAX_BACKOFF_MS); // Note: reads/writes to the backoff are racy across threads in theory, but the // lock in `try_lock_once` should sequentialize it all. @@ -270,7 +284,7 @@ impl CryptoStoreLock { loop { if let Some(guard) = self.try_lock_once().await? { // Reset backoff before returning, for the next attempt to lock. - *self.backoff.lock().await = WaitingTime::Some(Self::INITIAL_BACKOFF_MS); + *self.backoff.lock().await = WaitingTime::Some(INITIAL_BACKOFF_MS); return Ok(guard); } @@ -289,7 +303,7 @@ impl CryptoStoreLock { } WaitingTime::Stop => { // We've reached the maximum backoff, abandon. - return Err(LockStoreError::LockTimeout.into()); + return Err(LockStoreError::LockTimeout); } }; @@ -305,34 +319,26 @@ impl CryptoStoreLock { } } -/// Error related to the locking API of the crypto store. +/// Error related to the locking API of the store. #[derive(Debug, thiserror::Error)] pub enum LockStoreError { - /// A lock value was to be removed, but it didn't contain the expected lock - /// value. - #[error("a lock value was to be removed, but it didn't contain the expected lock value")] - IncorrectLockValue, - - /// A lock value was to be removed, but it was missing in the database. - #[error("a lock value was to be removed, but it was missing in the database")] - MissingLockValue, - /// Spent too long waiting for a database lock. #[error("a lock timed out")] LockTimeout, - /// The generation counter is missing, and should always be present. - #[error("missing generation counter in the store")] - MissingGeneration, - - /// Unexpected format for the generation counter. Is someone tampering the - /// database? - #[error("invalid format of the generation counter")] - InvalidGenerationFormat, + #[error(transparent)] + BackingStoreError(#[from] Box), } #[cfg(test)] +#[cfg(not(target_arch = "wasm32"))] // These tests require tokio::time, which is not implemented on wasm. mod tests { + use std::{ + collections::HashMap, + sync::{atomic, Arc, Mutex}, + time::Instant, + }; + use assert_matches::assert_matches; use matrix_sdk_test::async_test; use tokio::{ @@ -340,23 +346,81 @@ mod tests { time::{sleep, Duration}, }; - use super::{CryptoStoreLock, CryptoStoreLockGuard, LockStoreError}; - use crate::{ - store::{IntoCryptoStore as _, MemoryStore}, - CryptoStoreError, + use super::{ + BackingStore, CrossProcessStoreLock, CrossProcessStoreLockGuard, LockStoreError, + EXTEND_LEASE_EVERY_MS, }; - async fn release_lock(guard: Option) { - drop(guard); - sleep(Duration::from_millis(CryptoStoreLock::EXTEND_LEASE_EVERY_MS)).await; + #[derive(Clone, Default)] + struct TestStore { + leases: Arc>>, } - #[async_test] - async fn test_simple_lock_unlock() -> Result<(), CryptoStoreError> { - let mem_store = MemoryStore::new(); - let dyn_store = mem_store.into_crypto_store(); + impl TestStore { + fn try_take_leased_lock(&self, lease_duration_ms: u32, key: &str, holder: &str) -> bool { + let now = Instant::now(); + let expiration = now + Duration::from_millis(lease_duration_ms.into()); + let mut leases = self.leases.lock().unwrap(); + if let Some(prev) = leases.get_mut(key) { + if prev.0 == holder { + // We had the lease before, extend it. + prev.1 = expiration; + true + } else { + // We didn't have it. + if prev.1 < now { + // Steal it! + prev.0 = holder.to_owned(); + prev.1 = expiration; + true + } else { + // We tried our best. + false + } + } + } else { + leases.insert( + key.to_owned(), + ( + holder.to_owned(), + Instant::now() + Duration::from_millis(lease_duration_ms.into()), + ), + ); + true + } + } + } - let lock = CryptoStoreLock::new(dyn_store, "key".to_owned(), "first".to_owned()); + #[derive(Debug, thiserror::Error)] + enum DummyError {} + + #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] + #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] + impl BackingStore for TestStore { + type Error = DummyError; + + /// Try to take a lock using the given store. + async fn try_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> Result { + Ok(self.try_take_leased_lock(lease_duration_ms, key, holder)) + } + } + + async fn release_lock(guard: Option) { + drop(guard); + sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await; + } + + type TestResult = Result<(), LockStoreError>; + + #[async_test] + async fn test_simple_lock_unlock() -> TestResult { + let store = TestStore::default(); + let lock = CrossProcessStoreLock::new(store, "key".to_owned(), "first".to_owned()); // The lock plain works when used with a single holder. let acquired = lock.try_lock_once().await?; @@ -378,11 +442,9 @@ mod tests { } #[async_test] - async fn test_self_recovery() -> Result<(), CryptoStoreError> { - let mem_store = MemoryStore::new(); - let dyn_store = mem_store.into_crypto_store(); - - let lock = CryptoStoreLock::new(dyn_store.clone(), "key".to_owned(), "first".to_owned()); + async fn test_self_recovery() -> TestResult { + let store = TestStore::default(); + let lock = CrossProcessStoreLock::new(store.clone(), "key".to_owned(), "first".to_owned()); // When a lock is acquired... let acquired = lock.try_lock_once().await?; @@ -393,7 +455,7 @@ mod tests { drop(lock); // And when rematerializing the lock with the same key/value... - let lock = CryptoStoreLock::new(dyn_store.clone(), "key".to_owned(), "first".to_owned()); + let lock = CrossProcessStoreLock::new(store.clone(), "key".to_owned(), "first".to_owned()); // We still got it. let acquired = lock.try_lock_once().await?; @@ -404,11 +466,9 @@ mod tests { } #[async_test] - async fn test_multiple_holders_same_process() -> Result<(), CryptoStoreError> { - let mem_store = MemoryStore::new(); - let dyn_store = mem_store.into_crypto_store(); - - let lock = CryptoStoreLock::new(dyn_store, "key".to_owned(), "first".to_owned()); + async fn test_multiple_holders_same_process() -> TestResult { + let store = TestStore::default(); + let lock = CrossProcessStoreLock::new(store, "key".to_owned(), "first".to_owned()); // Taking the lock twice... let acquired = lock.try_lock_once().await?; @@ -430,12 +490,10 @@ mod tests { } #[async_test] - async fn test_multiple_processes() -> Result<(), CryptoStoreError> { - let mem_store = MemoryStore::new(); - let dyn_store = mem_store.into_crypto_store(); - - let lock1 = CryptoStoreLock::new(dyn_store.clone(), "key".to_owned(), "first".to_owned()); - let lock2 = CryptoStoreLock::new(dyn_store, "key".to_owned(), "second".to_owned()); + async fn test_multiple_processes() -> TestResult { + let store = TestStore::default(); + let lock1 = CrossProcessStoreLock::new(store.clone(), "key".to_owned(), "first".to_owned()); + let lock2 = CrossProcessStoreLock::new(store, "key".to_owned(), "second".to_owned()); // When the first process takes the lock... let acquired1 = lock1.try_lock_once().await?; @@ -459,10 +517,7 @@ mod tests { .expect("lock was obtained after spin-locking"); // Now if lock1 tries to get the lock with a small timeout, it will fail. - assert_matches!( - lock1.spin_lock(Some(200)).await, - Err(CryptoStoreError::Lock(LockStoreError::LockTimeout)) - ); + assert_matches!(lock1.spin_lock(Some(200)).await, Err(LockStoreError::LockTimeout)); Ok(()) } diff --git a/crates/matrix-sdk-crypto/src/machine.rs b/crates/matrix-sdk-crypto/src/machine.rs index 774a57f53..ef2e9fa04 100644 --- a/crates/matrix-sdk-crypto/src/machine.rs +++ b/crates/matrix-sdk-crypto/src/machine.rs @@ -69,8 +69,8 @@ use crate::{ requests::{IncomingResponse, OutgoingRequest, UploadSigningKeysRequest}, session_manager::{GroupSessionManager, SessionManager}, store::{ - locks::LockStoreError, Changes, CryptoStoreWrapper, DeviceChanges, IdentityChanges, - IntoCryptoStore, MemoryStore, Result as StoreResult, RoomKeyInfo, SecretImportError, Store, + Changes, CryptoStoreWrapper, DeviceChanges, IdentityChanges, IntoCryptoStore, MemoryStore, + Result as StoreResult, RoomKeyInfo, SecretImportError, Store, }, types::{ events::{ @@ -1866,9 +1866,9 @@ impl OlmMachine { Some(val) => { // There was a value in the store. We need to signal that we're a different // process, so we don't just reuse the value but increment it. - u64::from_le_bytes( - val.try_into().map_err(|_| LockStoreError::InvalidGenerationFormat)?, - ) + u64::from_le_bytes(val.try_into().map_err(|_| { + CryptoStoreError::InvalidLockGeneration("invalid format".to_owned()) + })?) .wrapping_add(1) } None => 0, @@ -1911,11 +1911,14 @@ impl OlmMachine { .store .get_custom_value(Self::CURRENT_GENERATION_STORE_KEY) .await? - .ok_or(LockStoreError::MissingGeneration)?; + .ok_or_else(|| { + CryptoStoreError::InvalidLockGeneration("counter missing in store".to_owned()) + })?; - let actual_gen = u64::from_le_bytes( - actual_gen.try_into().map_err(|_| LockStoreError::InvalidGenerationFormat)?, - ); + let actual_gen = + u64::from_le_bytes(actual_gen.try_into().map_err(|_| { + CryptoStoreError::InvalidLockGeneration("invalid format".to_owned()) + })?); let expected_gen = match gen_guard.as_ref() { Some(expected_gen) => { diff --git a/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs b/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs index 1ecf7e3d5..84733bbca 100644 --- a/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs +++ b/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs @@ -2,15 +2,16 @@ use std::{ops::Deref, sync::Arc}; use futures_core::Stream; use futures_util::StreamExt; +use matrix_sdk_common::store_locks::CrossProcessStoreLock; use ruma::{OwnedUserId, UserId}; use tokio::sync::broadcast; use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; use tracing::warn; -use super::{DeviceChanges, IdentityChanges}; +use super::{DeviceChanges, IdentityChanges, LockableCryptoStore}; use crate::{ store, - store::{locks::CryptoStoreLock, Changes, DynCryptoStore, IntoCryptoStore, RoomKeyInfo}, + store::{Changes, DynCryptoStore, IntoCryptoStore, RoomKeyInfo}, GossippedSecret, ReadOnlyOwnUserIdentity, }; @@ -160,10 +161,14 @@ impl CryptoStoreWrapper { }) } - /// Creates a `CryptoStoreLock` for this store, that will contain the given - /// key and value when held. - pub fn create_store_lock(&self, lock_key: String, lock_value: String) -> CryptoStoreLock { - CryptoStoreLock::new(self.store.clone(), lock_key, lock_value) + /// Creates a `CrossProcessStoreLock` for this store, that will contain the + /// given key and value when hold. + pub(crate) fn create_store_lock( + &self, + lock_key: String, + lock_value: String, + ) -> CrossProcessStoreLock { + CrossProcessStoreLock::new(LockableCryptoStore(self.store.clone()), lock_key, lock_value) } } diff --git a/crates/matrix-sdk-crypto/src/store/error.rs b/crates/matrix-sdk-crypto/src/store/error.rs index 0a9285ba1..e40126a5c 100644 --- a/crates/matrix-sdk-crypto/src/store/error.rs +++ b/crates/matrix-sdk-crypto/src/store/error.rs @@ -18,7 +18,6 @@ use ruma::{IdParseError, OwnedDeviceId, OwnedUserId}; use serde_json::Error as SerdeError; use thiserror::Error; -use super::locks::LockStoreError; use crate::olm::SessionCreationError; /// A `CryptoStore` specific result type. @@ -80,9 +79,9 @@ pub enum CryptoStoreError { #[error(transparent)] Backend(Box), - /// An error due to locking. - #[error(transparent)] - Lock(#[from] LockStoreError), + /// An error due to an invalid generation in a cross-process locking scheme. + #[error("invalid lock generation: {0}")] + InvalidLockGeneration(String), } impl CryptoStoreError { diff --git a/crates/matrix-sdk-crypto/src/store/mod.rs b/crates/matrix-sdk-crypto/src/store/mod.rs index 786f9919b..069f1f100 100644 --- a/crates/matrix-sdk-crypto/src/store/mod.rs +++ b/crates/matrix-sdk-crypto/src/store/mod.rs @@ -79,7 +79,6 @@ use crate::{ pub mod caches; mod crypto_store_wrapper; mod error; -pub mod locks; mod memorystore; mod traits; @@ -91,11 +90,10 @@ pub mod integration_tests; use caches::{SequenceNumber, UsersForKeyQuery}; pub(crate) use crypto_store_wrapper::CryptoStoreWrapper; pub use error::{CryptoStoreError, Result}; -use matrix_sdk_common::timeout::timeout; +use matrix_sdk_common::{store_locks::CrossProcessStoreLock, timeout::timeout}; pub use memorystore::MemoryStore; pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore}; -use self::locks::CryptoStoreLock; pub use crate::gossiping::{GossipRequest, SecretInfo}; /// A wrapper for our CryptoStore trait object. @@ -1167,6 +1165,16 @@ impl Store { }) } + /// Creates a `CrossProcessStoreLock` for this store, that will contain the + /// given key and value when hold. + pub fn create_store_lock( + &self, + lock_key: String, + lock_value: String, + ) -> CrossProcessStoreLock { + self.inner.store.create_store_lock(lock_key, lock_value) + } + /// Receive notifications of gossipped secrets being received and stored in /// the secret inbox as a [`Stream`]. /// @@ -1209,12 +1217,6 @@ impl Store { pub fn secrets_stream(&self) -> impl Stream { self.inner.store.secrets_stream() } - - /// Creates a `CryptoStoreLock` for this store, that will contain the given - /// key and value when hold. - pub fn create_store_lock(&self, lock_key: String, lock_value: String) -> CryptoStoreLock { - self.inner.store.create_store_lock(lock_key, lock_value) - } } impl Deref for Store { @@ -1224,3 +1226,22 @@ impl Deref for Store { self.inner.store.deref().deref() } } + +/// A crypto store that implements primitives for cross-process locking. +#[derive(Clone, Debug)] +pub struct LockableCryptoStore(Arc>); + +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +impl matrix_sdk_common::store_locks::BackingStore for LockableCryptoStore { + type Error = CryptoStoreError; + + async fn try_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> std::result::Result { + self.0.try_take_leased_lock(lease_duration_ms, key, holder).await + } +} diff --git a/crates/matrix-sdk-ui/src/encryption_sync_service.rs b/crates/matrix-sdk-ui/src/encryption_sync_service.rs index 8fa4373f3..0dbc5de2a 100644 --- a/crates/matrix-sdk-ui/src/encryption_sync_service.rs +++ b/crates/matrix-sdk-ui/src/encryption_sync_service.rs @@ -31,8 +31,7 @@ use std::time::Duration; use async_stream::stream; use futures_core::stream::Stream; use futures_util::{pin_mut, StreamExt}; -use matrix_sdk::{Client, SlidingSync}; -use matrix_sdk_crypto::store::locks::CryptoStoreLock; +use matrix_sdk::{Client, SlidingSync, LEASE_DURATION_MS}; use ruma::{api::client::sync::sync_events::v4, assign}; use tokio::sync::OwnedMutexGuard; use tracing::{debug, trace}; @@ -168,13 +167,10 @@ impl EncryptionSyncService { // yet. In case it's the latter, wait a bit and retry. tracing::debug!( "Lock was already taken, and we're not the main loop; retrying in {}ms...", - CryptoStoreLock::LEASE_DURATION_MS + LEASE_DURATION_MS ); - tokio::time::sleep(Duration::from_millis( - CryptoStoreLock::LEASE_DURATION_MS.into(), - )) - .await; + tokio::time::sleep(Duration::from_millis(LEASE_DURATION_MS.into())).await; lock_guard = self .client diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 18b78681f..129174f8b 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -29,7 +29,7 @@ use dashmap::DashMap; use eyeball::{Observable, SharedObservable, Subscriber}; use futures_core::Stream; #[cfg(feature = "e2e-encryption")] -use matrix_sdk_base::crypto::store::locks::CryptoStoreLock; +use matrix_sdk_base::crypto::store::LockableCryptoStore; use matrix_sdk_base::{ store::DynStateStore, BaseClient, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta, SyncOutsideWasm, @@ -69,8 +69,6 @@ use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard}; use tracing::{debug, error, info, instrument, trace, Instrument, Span}; use url::Url; -#[cfg(feature = "e2e-encryption")] -use crate::encryption::Encryption; #[cfg(feature = "experimental-oidc")] use crate::oidc::Oidc; use crate::{ @@ -87,6 +85,8 @@ use crate::{ Account, AuthApi, AuthSession, Error, Media, RefreshTokenError, Result, Room, TransmissionProgress, }; +#[cfg(feature = "e2e-encryption")] +use crate::{encryption::Encryption, store_locks::CrossProcessStoreLock}; mod builder; mod futures; @@ -184,7 +184,9 @@ pub(crate) struct ClientInner { pub(crate) sync_beat: event_listener::Event, #[cfg(feature = "e2e-encryption")] - pub(crate) cross_process_crypto_store_lock: OnceCell, + pub(crate) cross_process_crypto_store_lock: + OnceCell>, + /// Latest "generation" of data known by the crypto store. /// /// This is a counter that only increments, set in the database (and can diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index dbf83c585..055707900 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -29,10 +29,7 @@ use futures_util::{ future::try_join, stream::{self, StreamExt}, }; -use matrix_sdk_base::crypto::{ - store::locks::CryptoStoreLockGuard, OlmMachine, OutgoingRequest, RoomMessageRequest, - ToDeviceRequest, -}; +use matrix_sdk_base::crypto::{OlmMachine, OutgoingRequest, RoomMessageRequest, ToDeviceRequest}; use ruma::{ api::client::{ backup::add_backup_keys::v3::Response as KeysBackupResponse, @@ -65,6 +62,7 @@ use crate::{ verification::{SasVerification, Verification, VerificationRequest}, }, error::HttpResult, + store_locks::CrossProcessStoreLockGuard, Client, Error, Result, Room, TransmissionProgress, }; @@ -1011,7 +1009,7 @@ impl Encryption { pub async fn spin_lock_store( &self, max_backoff: Option, - ) -> Result, Error> { + ) -> Result, Error> { if let Some(lock) = self.client.inner.cross_process_crypto_store_lock.get() { let guard = lock.spin_lock(max_backoff).await?; @@ -1027,7 +1025,7 @@ impl Encryption { /// attempts to lock it once. /// /// Returns a guard to the lock, if it was obtained. - pub async fn try_lock_store_once(&self) -> Result, Error> { + pub async fn try_lock_store_once(&self) -> Result, Error> { if let Some(lock) = self.client.inner.cross_process_crypto_store_lock.get() { let maybe_guard = lock.try_lock_once().await?; diff --git a/crates/matrix-sdk/src/error.rs b/crates/matrix-sdk/src/error.rs index 0f3bf50ed..9ae7a4aa2 100644 --- a/crates/matrix-sdk/src/error.rs +++ b/crates/matrix-sdk/src/error.rs @@ -37,6 +37,8 @@ use serde_json::Error as JsonError; use thiserror::Error; use url::ParseError as UrlParseError; +use crate::store_locks::LockStoreError; + /// Result type of the matrix-sdk. pub type Result = std::result::Result; @@ -200,6 +202,10 @@ pub enum Error { #[error(transparent)] CryptoStoreError(#[from] CryptoStoreError), + /// An error occurred with a cross-process store lock. + #[error(transparent)] + CrossProcessLockError(#[from] LockStoreError), + /// An error occurred during a E2EE operation. #[cfg(feature = "e2e-encryption")] #[error(transparent)] diff --git a/crates/matrix-sdk/src/lib.rs b/crates/matrix-sdk/src/lib.rs index 135ecd0d3..b974e33fc 100644 --- a/crates/matrix-sdk/src/lib.rs +++ b/crates/matrix-sdk/src/lib.rs @@ -45,6 +45,7 @@ pub mod notification_settings; #[cfg(feature = "experimental-oidc")] pub mod oidc; pub mod room; + #[cfg(feature = "experimental-sliding-sync")] pub mod sliding_sync; pub mod sync;