feat: make the cross-process store locks generic

And move the implementation to the common crate.
This commit is contained in:
Benjamin Bouvier
2023-09-05 17:22:07 +02:00
parent 9802795d8d
commit 685cc2bbc3
13 changed files with 251 additions and 158 deletions

3
Cargo.lock generated
View File

@@ -3122,6 +3122,8 @@ dependencies = [
name = "matrix-sdk-common"
version = "0.6.0"
dependencies = [
"assert_matches",
"async-trait",
"futures-core",
"futures-util",
"gloo-timers",
@@ -3130,6 +3132,7 @@ dependencies = [
"ruma",
"serde",
"serde_json",
"thiserror",
"tokio",
"tracing",
"tracing-subscriber",

View File

@@ -19,22 +19,23 @@ targets = ["x86_64-unknown-linux-gnu", "wasm32-unknown-unknown"]
js = ["instant/wasm-bindgen", "instant/inaccurate", "wasm-bindgen-futures"]
[dependencies]
async-trait = { workspace = true }
futures-core = { workspace = true }
instant = "0.1.12"
ruma = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true, features = ["rt", "time", "sync"] }
[target.'cfg(target_arch = "wasm32")'.dependencies]
futures-util = { workspace = true, features = ["channel"] }
wasm-bindgen-futures = { version = "0.4.33", optional = true }
gloo-timers = { version = "0.2.6", features = ["futures"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { workspace = true, features = ["rt", "time"] }
[dev-dependencies]
assert_matches = { workspace = true }
matrix-sdk-test = { path = "../../testing/matrix-sdk-test/", version= "0.6.0"}
wasm-bindgen-test = "0.3.33"
tracing-subscriber = "0.3.15"

View File

@@ -24,9 +24,12 @@ pub mod debug;
pub mod deserialized_responses;
pub mod executor;
pub mod ring_buffer;
pub mod store_locks;
pub mod timeout;
pub mod tracing_timer;
pub use store_locks::LEASE_DURATION_MS;
/// Alias for `Send` on non-wasm, empty trait (implemented by everything) on
/// wasm.
#[cfg(not(target_arch = "wasm32"))]

View File

@@ -16,13 +16,13 @@
//!
//! This is a per-process lock that may be used only for very specific use
//! cases, where multiple processes might concurrently write to the same
//! database at the same time; this would invalidate crypto store caches, so
//! database at the same time; this would invalidate store caches, so
//! that should be done mindfully. Such a lock can be acquired multiple times by
//! the same process, and it remains active as long as there's at least one user
//! in a given process.
//!
//! The lock is implemented using time-based leases to values inserted in a
//! crypto store. The store maintains the lock identifier (key), who's the
//! store. The store maintains the lock identifier (key), who's the
//! current holder (value), and an expiration timestamp on the side; see also
//! `CryptoStore::try_take_leased_lock` for more details.
//!
@@ -38,16 +38,36 @@
//! automatically after the duration of the last lease, at most.
use std::{
sync::{atomic::AtomicU32, Arc},
error::Error,
sync::{
atomic::{self, AtomicU32},
Arc,
},
time::Duration,
};
use matrix_sdk_common::executor::JoinHandle;
use tokio::{sync::Mutex, time::sleep};
use tracing::instrument;
use super::DynCryptoStore;
use crate::CryptoStoreError;
use crate::{
executor::{spawn, JoinHandle},
SendOutsideWasm,
};
/// Backing store for a cross-process lock.
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
pub trait BackingStore {
type Error: Error + Send + Sync;
/// Try to take a lock using the given store.
async fn try_lock(
&self,
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error>;
}
/// Small state machine to handle wait times.
#[derive(Clone, Debug)]
@@ -58,26 +78,28 @@ enum WaitingTime {
Stop,
}
/// A guard on the crypto store lock.
/// A guard on the store lock.
///
/// The lock will be automatically released a short period of time after all the
/// guards have dropped.
#[derive(Debug)]
pub struct CryptoStoreLockGuard {
pub struct CrossProcessStoreLockGuard {
num_holders: Arc<AtomicU32>,
}
impl Drop for CryptoStoreLockGuard {
impl Drop for CrossProcessStoreLockGuard {
fn drop(&mut self) {
self.num_holders.fetch_sub(1, atomic::Ordering::SeqCst);
}
}
/// A store-based lock for the `CryptoStore`.
/// A store-based lock for a `Store`.
///
/// See the doc-comment of this module for more information.
#[derive(Clone, Debug)]
pub struct CryptoStoreLock {
pub struct CrossProcessStoreLock<S: BackingStore + Clone + SendOutsideWasm + 'static> {
/// The store we're using to lock.
store: Arc<DynCryptoStore>,
store: S,
/// Number of holders of the lock in this process.
///
@@ -105,38 +127,37 @@ pub struct CryptoStoreLock {
backoff: Arc<Mutex<WaitingTime>>,
}
impl CryptoStoreLock {
/// Amount of time a lease of the lock should last, in milliseconds.
pub const LEASE_DURATION_MS: u32 = 500;
/// Amount of time a lease of the lock should last, in milliseconds.
pub const LEASE_DURATION_MS: u32 = 500;
/// Period of time between two attempts to extend the lease. We'll
/// re-request a lease for an entire duration of `LEASE_DURATION_MS`
/// milliseconds, every `EXTEND_LEASE_EVERY_MS`, so this has to
/// be an amount safely low compared to `LEASE_DURATION_MS`, to make sure
/// that we can miss a deadline without compromising the lock.
const EXTEND_LEASE_EVERY_MS: u64 = 50;
/// Period of time between two attempts to extend the lease. We'll
/// re-request a lease for an entire duration of `LEASE_DURATION_MS`
/// milliseconds, every `EXTEND_LEASE_EVERY_MS`, so this has to
/// be an amount safely low compared to `LEASE_DURATION_MS`, to make sure
/// that we can miss a deadline without compromising the lock.
pub const EXTEND_LEASE_EVERY_MS: u64 = 50;
/// Initial backoff, in milliseconds. This is the time we wait the first
/// time, if taking the lock initially failed.
const INITIAL_BACKOFF_MS: u32 = 10;
/// Initial backoff, in milliseconds. This is the time we wait the first
/// time, if taking the lock initially failed.
const INITIAL_BACKOFF_MS: u32 = 10;
/// Maximal backoff, in milliseconds. This is the maximum amount of time
/// we'll wait for the lock, *between two attempts*.
pub const MAX_BACKOFF_MS: u32 = 1000;
/// Maximal backoff, in milliseconds. This is the maximum amount of time
/// we'll wait for the lock, *between two attempts*.
pub const MAX_BACKOFF_MS: u32 = 1000;
/// Create a new store-based lock implemented as a value in the
/// crypto-store.
impl<S: BackingStore + Clone + SendOutsideWasm + 'static> CrossProcessStoreLock<S> {
/// Create a new store-based lock implemented as a value in the store.
///
/// # Parameters
///
/// - `lock_key`: key in the key-value store to store the lock's state.
/// - `lock_holder`: identify the lock's holder with this given value.
pub fn new(store: Arc<DynCryptoStore>, lock_key: String, lock_holder: String) -> Self {
pub fn new(store: S, lock_key: String, lock_holder: String) -> Self {
Self {
store,
lock_key,
lock_holder,
backoff: Arc::new(Mutex::new(WaitingTime::Some(Self::INITIAL_BACKOFF_MS))),
backoff: Arc::new(Mutex::new(WaitingTime::Some(INITIAL_BACKOFF_MS))),
num_holders: Arc::new(0.into()),
locking_attempt: Arc::new(Mutex::new(())),
renew_task: Default::default(),
@@ -145,7 +166,9 @@ impl CryptoStoreLock {
/// Try to lock once, returns whether the lock was obtained or not.
#[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
pub async fn try_lock_once(&self) -> Result<Option<CryptoStoreLockGuard>, CryptoStoreError> {
pub async fn try_lock_once(
&self,
) -> Result<Option<CrossProcessStoreLockGuard>, LockStoreError> {
// Hold onto the locking attempt mutex for the entire lifetime of this
// function, to avoid multiple reentrant calls.
let mut _attempt = self.locking_attempt.lock().await;
@@ -159,14 +182,15 @@ impl CryptoStoreLock {
// taken by at least one thread.
tracing::trace!("We already had the lock, incrementing holder count");
self.num_holders.fetch_add(1, atomic::Ordering::SeqCst);
let guard = CryptoStoreLockGuard { num_holders: self.num_holders.clone() };
let guard = CrossProcessStoreLockGuard { num_holders: self.num_holders.clone() };
return Ok(Some(guard));
}
let acquired = self
.store
.try_take_leased_lock(Self::LEASE_DURATION_MS, &self.lock_key, &self.lock_holder)
.await?;
.try_lock(LEASE_DURATION_MS, &self.lock_key, &self.lock_holder)
.await
.map_err(|err| LockStoreError::BackingStoreError(Box::new(err)))?;
if !acquired {
tracing::trace!("Couldn't acquire the lock immediately.");
@@ -179,15 +203,14 @@ impl CryptoStoreLock {
// that will renew the lease.
// Clone data to be owned by the task.
let this = self.clone();
let this = (*self).clone();
let mut renew_task = self.renew_task.lock().await;
// Cancel the previous task, if any. That's safe to do, because:
// - either the task was done,
// - or it was still running, but taking a lock in the db has to be an atomic
// operation
// running in a transaction.
// operation running in a transaction.
if let Some(_prev) = renew_task.take() {
#[cfg(not(target_arch = "wasm32"))]
@@ -195,7 +218,7 @@ impl CryptoStoreLock {
}
// Restart a new one.
*renew_task = Some(matrix_sdk_common::executor::spawn(async move {
*renew_task = Some(spawn(async move {
loop {
{
// First, check if there are still users of this lock.
@@ -215,27 +238,18 @@ impl CryptoStoreLock {
// Cancel the lease with another 0ms lease.
// If we don't get the lock, that's (weird but) fine.
let _ = this
.store
.try_take_leased_lock(0, &this.lock_key, &this.lock_holder)
.await;
let fut = this.store.try_lock(0, &this.lock_key, &this.lock_holder);
let _ = fut.await;
// Exit the loop.
break;
}
}
sleep(Duration::from_millis(Self::EXTEND_LEASE_EVERY_MS)).await;
sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
if let Err(err) = this
.store
.try_take_leased_lock(
Self::LEASE_DURATION_MS,
&this.lock_key,
&this.lock_holder,
)
.await
{
let fut = this.store.try_lock(LEASE_DURATION_MS, &this.lock_key, &this.lock_holder);
if let Err(err) = fut.await {
tracing::error!("error when extending lock lease: {err:#}");
// Exit the loop.
break;
@@ -245,7 +259,7 @@ impl CryptoStoreLock {
self.num_holders.fetch_add(1, atomic::Ordering::SeqCst);
let guard = CryptoStoreLockGuard { num_holders: self.num_holders.clone() };
let guard = CrossProcessStoreLockGuard { num_holders: self.num_holders.clone() };
Ok(Some(guard))
}
@@ -256,13 +270,13 @@ impl CryptoStoreLock {
/// should be waited for, between two attempts. When that time is
/// reached a second time, the lock will stop attempting to get the lock
/// and will return a timeout error upon locking. If not provided,
/// will wait for [`Self::MAX_BACKOFF_MS`].
/// will wait for [`MAX_BACKOFF_MS`].
#[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
pub async fn spin_lock(
&self,
max_backoff: Option<u32>,
) -> Result<CryptoStoreLockGuard, CryptoStoreError> {
let max_backoff = max_backoff.unwrap_or(Self::MAX_BACKOFF_MS);
) -> Result<CrossProcessStoreLockGuard, LockStoreError> {
let max_backoff = max_backoff.unwrap_or(MAX_BACKOFF_MS);
// Note: reads/writes to the backoff are racy across threads in theory, but the
// lock in `try_lock_once` should sequentialize it all.
@@ -270,7 +284,7 @@ impl CryptoStoreLock {
loop {
if let Some(guard) = self.try_lock_once().await? {
// Reset backoff before returning, for the next attempt to lock.
*self.backoff.lock().await = WaitingTime::Some(Self::INITIAL_BACKOFF_MS);
*self.backoff.lock().await = WaitingTime::Some(INITIAL_BACKOFF_MS);
return Ok(guard);
}
@@ -289,7 +303,7 @@ impl CryptoStoreLock {
}
WaitingTime::Stop => {
// We've reached the maximum backoff, abandon.
return Err(LockStoreError::LockTimeout.into());
return Err(LockStoreError::LockTimeout);
}
};
@@ -305,34 +319,26 @@ impl CryptoStoreLock {
}
}
/// Error related to the locking API of the crypto store.
/// Error related to the locking API of the store.
#[derive(Debug, thiserror::Error)]
pub enum LockStoreError {
/// A lock value was to be removed, but it didn't contain the expected lock
/// value.
#[error("a lock value was to be removed, but it didn't contain the expected lock value")]
IncorrectLockValue,
/// A lock value was to be removed, but it was missing in the database.
#[error("a lock value was to be removed, but it was missing in the database")]
MissingLockValue,
/// Spent too long waiting for a database lock.
#[error("a lock timed out")]
LockTimeout,
/// The generation counter is missing, and should always be present.
#[error("missing generation counter in the store")]
MissingGeneration,
/// Unexpected format for the generation counter. Is someone tampering the
/// database?
#[error("invalid format of the generation counter")]
InvalidGenerationFormat,
#[error(transparent)]
BackingStoreError(#[from] Box<dyn Error + Send + Sync>),
}
#[cfg(test)]
#[cfg(not(target_arch = "wasm32"))] // These tests require tokio::time, which is not implemented on wasm.
mod tests {
use std::{
collections::HashMap,
sync::{atomic, Arc, Mutex},
time::Instant,
};
use assert_matches::assert_matches;
use matrix_sdk_test::async_test;
use tokio::{
@@ -340,23 +346,81 @@ mod tests {
time::{sleep, Duration},
};
use super::{CryptoStoreLock, CryptoStoreLockGuard, LockStoreError};
use crate::{
store::{IntoCryptoStore as _, MemoryStore},
CryptoStoreError,
use super::{
BackingStore, CrossProcessStoreLock, CrossProcessStoreLockGuard, LockStoreError,
EXTEND_LEASE_EVERY_MS,
};
async fn release_lock(guard: Option<CryptoStoreLockGuard>) {
drop(guard);
sleep(Duration::from_millis(CryptoStoreLock::EXTEND_LEASE_EVERY_MS)).await;
#[derive(Clone, Default)]
struct TestStore {
leases: Arc<Mutex<HashMap<String, (String, Instant)>>>,
}
#[async_test]
async fn test_simple_lock_unlock() -> Result<(), CryptoStoreError> {
let mem_store = MemoryStore::new();
let dyn_store = mem_store.into_crypto_store();
impl TestStore {
fn try_take_leased_lock(&self, lease_duration_ms: u32, key: &str, holder: &str) -> bool {
let now = Instant::now();
let expiration = now + Duration::from_millis(lease_duration_ms.into());
let mut leases = self.leases.lock().unwrap();
if let Some(prev) = leases.get_mut(key) {
if prev.0 == holder {
// We had the lease before, extend it.
prev.1 = expiration;
true
} else {
// We didn't have it.
if prev.1 < now {
// Steal it!
prev.0 = holder.to_owned();
prev.1 = expiration;
true
} else {
// We tried our best.
false
}
}
} else {
leases.insert(
key.to_owned(),
(
holder.to_owned(),
Instant::now() + Duration::from_millis(lease_duration_ms.into()),
),
);
true
}
}
}
let lock = CryptoStoreLock::new(dyn_store, "key".to_owned(), "first".to_owned());
#[derive(Debug, thiserror::Error)]
enum DummyError {}
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl BackingStore for TestStore {
type Error = DummyError;
/// Try to take a lock using the given store.
async fn try_lock(
&self,
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error> {
Ok(self.try_take_leased_lock(lease_duration_ms, key, holder))
}
}
async fn release_lock(guard: Option<CrossProcessStoreLockGuard>) {
drop(guard);
sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
}
type TestResult = Result<(), LockStoreError>;
#[async_test]
async fn test_simple_lock_unlock() -> TestResult {
let store = TestStore::default();
let lock = CrossProcessStoreLock::new(store, "key".to_owned(), "first".to_owned());
// The lock plain works when used with a single holder.
let acquired = lock.try_lock_once().await?;
@@ -378,11 +442,9 @@ mod tests {
}
#[async_test]
async fn test_self_recovery() -> Result<(), CryptoStoreError> {
let mem_store = MemoryStore::new();
let dyn_store = mem_store.into_crypto_store();
let lock = CryptoStoreLock::new(dyn_store.clone(), "key".to_owned(), "first".to_owned());
async fn test_self_recovery() -> TestResult {
let store = TestStore::default();
let lock = CrossProcessStoreLock::new(store.clone(), "key".to_owned(), "first".to_owned());
// When a lock is acquired...
let acquired = lock.try_lock_once().await?;
@@ -393,7 +455,7 @@ mod tests {
drop(lock);
// And when rematerializing the lock with the same key/value...
let lock = CryptoStoreLock::new(dyn_store.clone(), "key".to_owned(), "first".to_owned());
let lock = CrossProcessStoreLock::new(store.clone(), "key".to_owned(), "first".to_owned());
// We still got it.
let acquired = lock.try_lock_once().await?;
@@ -404,11 +466,9 @@ mod tests {
}
#[async_test]
async fn test_multiple_holders_same_process() -> Result<(), CryptoStoreError> {
let mem_store = MemoryStore::new();
let dyn_store = mem_store.into_crypto_store();
let lock = CryptoStoreLock::new(dyn_store, "key".to_owned(), "first".to_owned());
async fn test_multiple_holders_same_process() -> TestResult {
let store = TestStore::default();
let lock = CrossProcessStoreLock::new(store, "key".to_owned(), "first".to_owned());
// Taking the lock twice...
let acquired = lock.try_lock_once().await?;
@@ -430,12 +490,10 @@ mod tests {
}
#[async_test]
async fn test_multiple_processes() -> Result<(), CryptoStoreError> {
let mem_store = MemoryStore::new();
let dyn_store = mem_store.into_crypto_store();
let lock1 = CryptoStoreLock::new(dyn_store.clone(), "key".to_owned(), "first".to_owned());
let lock2 = CryptoStoreLock::new(dyn_store, "key".to_owned(), "second".to_owned());
async fn test_multiple_processes() -> TestResult {
let store = TestStore::default();
let lock1 = CrossProcessStoreLock::new(store.clone(), "key".to_owned(), "first".to_owned());
let lock2 = CrossProcessStoreLock::new(store, "key".to_owned(), "second".to_owned());
// When the first process takes the lock...
let acquired1 = lock1.try_lock_once().await?;
@@ -459,10 +517,7 @@ mod tests {
.expect("lock was obtained after spin-locking");
// Now if lock1 tries to get the lock with a small timeout, it will fail.
assert_matches!(
lock1.spin_lock(Some(200)).await,
Err(CryptoStoreError::Lock(LockStoreError::LockTimeout))
);
assert_matches!(lock1.spin_lock(Some(200)).await, Err(LockStoreError::LockTimeout));
Ok(())
}

View File

@@ -69,8 +69,8 @@ use crate::{
requests::{IncomingResponse, OutgoingRequest, UploadSigningKeysRequest},
session_manager::{GroupSessionManager, SessionManager},
store::{
locks::LockStoreError, Changes, CryptoStoreWrapper, DeviceChanges, IdentityChanges,
IntoCryptoStore, MemoryStore, Result as StoreResult, RoomKeyInfo, SecretImportError, Store,
Changes, CryptoStoreWrapper, DeviceChanges, IdentityChanges, IntoCryptoStore, MemoryStore,
Result as StoreResult, RoomKeyInfo, SecretImportError, Store,
},
types::{
events::{
@@ -1866,9 +1866,9 @@ impl OlmMachine {
Some(val) => {
// There was a value in the store. We need to signal that we're a different
// process, so we don't just reuse the value but increment it.
u64::from_le_bytes(
val.try_into().map_err(|_| LockStoreError::InvalidGenerationFormat)?,
)
u64::from_le_bytes(val.try_into().map_err(|_| {
CryptoStoreError::InvalidLockGeneration("invalid format".to_owned())
})?)
.wrapping_add(1)
}
None => 0,
@@ -1911,11 +1911,14 @@ impl OlmMachine {
.store
.get_custom_value(Self::CURRENT_GENERATION_STORE_KEY)
.await?
.ok_or(LockStoreError::MissingGeneration)?;
.ok_or_else(|| {
CryptoStoreError::InvalidLockGeneration("counter missing in store".to_owned())
})?;
let actual_gen = u64::from_le_bytes(
actual_gen.try_into().map_err(|_| LockStoreError::InvalidGenerationFormat)?,
);
let actual_gen =
u64::from_le_bytes(actual_gen.try_into().map_err(|_| {
CryptoStoreError::InvalidLockGeneration("invalid format".to_owned())
})?);
let expected_gen = match gen_guard.as_ref() {
Some(expected_gen) => {

View File

@@ -2,15 +2,16 @@ use std::{ops::Deref, sync::Arc};
use futures_core::Stream;
use futures_util::StreamExt;
use matrix_sdk_common::store_locks::CrossProcessStoreLock;
use ruma::{OwnedUserId, UserId};
use tokio::sync::broadcast;
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
use tracing::warn;
use super::{DeviceChanges, IdentityChanges};
use super::{DeviceChanges, IdentityChanges, LockableCryptoStore};
use crate::{
store,
store::{locks::CryptoStoreLock, Changes, DynCryptoStore, IntoCryptoStore, RoomKeyInfo},
store::{Changes, DynCryptoStore, IntoCryptoStore, RoomKeyInfo},
GossippedSecret, ReadOnlyOwnUserIdentity,
};
@@ -160,10 +161,14 @@ impl CryptoStoreWrapper {
})
}
/// Creates a `CryptoStoreLock` for this store, that will contain the given
/// key and value when held.
pub fn create_store_lock(&self, lock_key: String, lock_value: String) -> CryptoStoreLock {
CryptoStoreLock::new(self.store.clone(), lock_key, lock_value)
/// Creates a `CrossProcessStoreLock` for this store, that will contain the
/// given key and value when hold.
pub(crate) fn create_store_lock(
&self,
lock_key: String,
lock_value: String,
) -> CrossProcessStoreLock<LockableCryptoStore> {
CrossProcessStoreLock::new(LockableCryptoStore(self.store.clone()), lock_key, lock_value)
}
}

View File

@@ -18,7 +18,6 @@ use ruma::{IdParseError, OwnedDeviceId, OwnedUserId};
use serde_json::Error as SerdeError;
use thiserror::Error;
use super::locks::LockStoreError;
use crate::olm::SessionCreationError;
/// A `CryptoStore` specific result type.
@@ -80,9 +79,9 @@ pub enum CryptoStoreError {
#[error(transparent)]
Backend(Box<dyn std::error::Error + Send + Sync>),
/// An error due to locking.
#[error(transparent)]
Lock(#[from] LockStoreError),
/// An error due to an invalid generation in a cross-process locking scheme.
#[error("invalid lock generation: {0}")]
InvalidLockGeneration(String),
}
impl CryptoStoreError {

View File

@@ -79,7 +79,6 @@ use crate::{
pub mod caches;
mod crypto_store_wrapper;
mod error;
pub mod locks;
mod memorystore;
mod traits;
@@ -91,11 +90,10 @@ pub mod integration_tests;
use caches::{SequenceNumber, UsersForKeyQuery};
pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
pub use error::{CryptoStoreError, Result};
use matrix_sdk_common::timeout::timeout;
use matrix_sdk_common::{store_locks::CrossProcessStoreLock, timeout::timeout};
pub use memorystore::MemoryStore;
pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
use self::locks::CryptoStoreLock;
pub use crate::gossiping::{GossipRequest, SecretInfo};
/// A wrapper for our CryptoStore trait object.
@@ -1167,6 +1165,16 @@ impl Store {
})
}
/// Creates a `CrossProcessStoreLock` for this store, that will contain the
/// given key and value when hold.
pub fn create_store_lock(
&self,
lock_key: String,
lock_value: String,
) -> CrossProcessStoreLock<LockableCryptoStore> {
self.inner.store.create_store_lock(lock_key, lock_value)
}
/// Receive notifications of gossipped secrets being received and stored in
/// the secret inbox as a [`Stream`].
///
@@ -1209,12 +1217,6 @@ impl Store {
pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> {
self.inner.store.secrets_stream()
}
/// Creates a `CryptoStoreLock` for this store, that will contain the given
/// key and value when hold.
pub fn create_store_lock(&self, lock_key: String, lock_value: String) -> CryptoStoreLock {
self.inner.store.create_store_lock(lock_key, lock_value)
}
}
impl Deref for Store {
@@ -1224,3 +1226,22 @@ impl Deref for Store {
self.inner.store.deref().deref()
}
}
/// A crypto store that implements primitives for cross-process locking.
#[derive(Clone, Debug)]
pub struct LockableCryptoStore(Arc<dyn CryptoStore<Error = CryptoStoreError>>);
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl matrix_sdk_common::store_locks::BackingStore for LockableCryptoStore {
type Error = CryptoStoreError;
async fn try_lock(
&self,
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> std::result::Result<bool, Self::Error> {
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
}
}

View File

@@ -31,8 +31,7 @@ use std::time::Duration;
use async_stream::stream;
use futures_core::stream::Stream;
use futures_util::{pin_mut, StreamExt};
use matrix_sdk::{Client, SlidingSync};
use matrix_sdk_crypto::store::locks::CryptoStoreLock;
use matrix_sdk::{Client, SlidingSync, LEASE_DURATION_MS};
use ruma::{api::client::sync::sync_events::v4, assign};
use tokio::sync::OwnedMutexGuard;
use tracing::{debug, trace};
@@ -168,13 +167,10 @@ impl EncryptionSyncService {
// yet. In case it's the latter, wait a bit and retry.
tracing::debug!(
"Lock was already taken, and we're not the main loop; retrying in {}ms...",
CryptoStoreLock::LEASE_DURATION_MS
LEASE_DURATION_MS
);
tokio::time::sleep(Duration::from_millis(
CryptoStoreLock::LEASE_DURATION_MS.into(),
))
.await;
tokio::time::sleep(Duration::from_millis(LEASE_DURATION_MS.into())).await;
lock_guard = self
.client

View File

@@ -29,7 +29,7 @@ use dashmap::DashMap;
use eyeball::{Observable, SharedObservable, Subscriber};
use futures_core::Stream;
#[cfg(feature = "e2e-encryption")]
use matrix_sdk_base::crypto::store::locks::CryptoStoreLock;
use matrix_sdk_base::crypto::store::LockableCryptoStore;
use matrix_sdk_base::{
store::DynStateStore, BaseClient, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
SyncOutsideWasm,
@@ -69,8 +69,6 @@ use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard};
use tracing::{debug, error, info, instrument, trace, Instrument, Span};
use url::Url;
#[cfg(feature = "e2e-encryption")]
use crate::encryption::Encryption;
#[cfg(feature = "experimental-oidc")]
use crate::oidc::Oidc;
use crate::{
@@ -87,6 +85,8 @@ use crate::{
Account, AuthApi, AuthSession, Error, Media, RefreshTokenError, Result, Room,
TransmissionProgress,
};
#[cfg(feature = "e2e-encryption")]
use crate::{encryption::Encryption, store_locks::CrossProcessStoreLock};
mod builder;
mod futures;
@@ -184,7 +184,9 @@ pub(crate) struct ClientInner {
pub(crate) sync_beat: event_listener::Event,
#[cfg(feature = "e2e-encryption")]
pub(crate) cross_process_crypto_store_lock: OnceCell<CryptoStoreLock>,
pub(crate) cross_process_crypto_store_lock:
OnceCell<CrossProcessStoreLock<LockableCryptoStore>>,
/// Latest "generation" of data known by the crypto store.
///
/// This is a counter that only increments, set in the database (and can

View File

@@ -29,10 +29,7 @@ use futures_util::{
future::try_join,
stream::{self, StreamExt},
};
use matrix_sdk_base::crypto::{
store::locks::CryptoStoreLockGuard, OlmMachine, OutgoingRequest, RoomMessageRequest,
ToDeviceRequest,
};
use matrix_sdk_base::crypto::{OlmMachine, OutgoingRequest, RoomMessageRequest, ToDeviceRequest};
use ruma::{
api::client::{
backup::add_backup_keys::v3::Response as KeysBackupResponse,
@@ -65,6 +62,7 @@ use crate::{
verification::{SasVerification, Verification, VerificationRequest},
},
error::HttpResult,
store_locks::CrossProcessStoreLockGuard,
Client, Error, Result, Room, TransmissionProgress,
};
@@ -1011,7 +1009,7 @@ impl Encryption {
pub async fn spin_lock_store(
&self,
max_backoff: Option<u32>,
) -> Result<Option<CryptoStoreLockGuard>, Error> {
) -> Result<Option<CrossProcessStoreLockGuard>, Error> {
if let Some(lock) = self.client.inner.cross_process_crypto_store_lock.get() {
let guard = lock.spin_lock(max_backoff).await?;
@@ -1027,7 +1025,7 @@ impl Encryption {
/// attempts to lock it once.
///
/// Returns a guard to the lock, if it was obtained.
pub async fn try_lock_store_once(&self) -> Result<Option<CryptoStoreLockGuard>, Error> {
pub async fn try_lock_store_once(&self) -> Result<Option<CrossProcessStoreLockGuard>, Error> {
if let Some(lock) = self.client.inner.cross_process_crypto_store_lock.get() {
let maybe_guard = lock.try_lock_once().await?;

View File

@@ -37,6 +37,8 @@ use serde_json::Error as JsonError;
use thiserror::Error;
use url::ParseError as UrlParseError;
use crate::store_locks::LockStoreError;
/// Result type of the matrix-sdk.
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -200,6 +202,10 @@ pub enum Error {
#[error(transparent)]
CryptoStoreError(#[from] CryptoStoreError),
/// An error occurred with a cross-process store lock.
#[error(transparent)]
CrossProcessLockError(#[from] LockStoreError),
/// An error occurred during a E2EE operation.
#[cfg(feature = "e2e-encryption")]
#[error(transparent)]

View File

@@ -45,6 +45,7 @@ pub mod notification_settings;
#[cfg(feature = "experimental-oidc")]
pub mod oidc;
pub mod room;
#[cfg(feature = "experimental-sliding-sync")]
pub mod sliding_sync;
pub mod sync;