diff --git a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs index 1adcd318c..403295220 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs @@ -1408,57 +1408,57 @@ macro_rules! event_cache_store_integration_tests_time { let store = get_event_cache_store().await.unwrap().into_event_cache_store(); let acquired0 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); - assert!(acquired0); + assert_eq!(acquired0, Some(1)); // first lock generation // Should extend the lease automatically (same holder). let acquired2 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(acquired2); + assert_eq!(acquired2, Some(1)); // same lock generation // Should extend the lease automatically (same holder + time is ok). let acquired3 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(acquired3); + assert_eq!(acquired3, Some(1)); // same lock generation // Another attempt at taking the lock should fail, because it's taken. let acquired4 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired4); + assert!(acquired4.is_none()); // not acquired // Even if we insist. let acquired5 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired5); + assert!(acquired5.is_none()); // not acquired // That's a nice test we got here, go take a little nap. sleep(Duration::from_millis(50)).await; // Still too early. let acquired55 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired55); + assert!(acquired55.is_none()); // not acquired // Ok you can take another nap then. sleep(Duration::from_millis(250)).await; // At some point, we do get the lock. let acquired6 = store.try_take_leased_lock(0, "key", "bob").await.unwrap(); - assert!(acquired6); + assert_eq!(acquired6, Some(2)); // new lock generation! sleep(Duration::from_millis(1)).await; // The other gets it almost immediately too. let acquired7 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); - assert!(acquired7); + assert_eq!(acquired7, Some(3)); // new lock generation! sleep(Duration::from_millis(1)).await; - // But when we take a longer lease... + // But when we take a longer lease… let acquired8 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(acquired8); + assert_eq!(acquired8, Some(4)); // new lock generation! // It blocks the other user. let acquired9 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(!acquired9); + assert!(acquired9.is_none()); // not acquired // We can hold onto our lease. let acquired10 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(acquired10); + assert_eq!(acquired10, Some(4)); // same lock generation } } }; diff --git a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs index 1ca2446c3..73418ca7b 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs @@ -19,13 +19,16 @@ use std::{ use async_trait::async_trait; use matrix_sdk_common::{ - cross_process_lock::memory_store_helper::try_take_leased_lock, + cross_process_lock::{ + CrossProcessLockGeneration, + memory_store_helper::{Lease, try_take_leased_lock}, + }, linked_chunk::{ ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position, RawChunk, Update, relational::RelationalLinkedChunk, }, }; -use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType, time::Instant}; +use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType}; use tracing::error; use super::{EventCacheStore, EventCacheStoreError, Result, extract_event_relation}; @@ -41,7 +44,7 @@ pub struct MemoryStore { #[derive(Debug)] struct MemoryStoreInner { - leases: HashMap, + leases: HashMap, events: RelationalLinkedChunk, } @@ -73,7 +76,7 @@ impl EventCacheStore for MemoryStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result, Self::Error> { let mut inner = self.inner.write().unwrap(); Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder)) diff --git a/crates/matrix-sdk-base/src/event_cache/store/mod.rs b/crates/matrix-sdk-base/src/event_cache/store/mod.rs index 02ad91c04..7735b644f 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/mod.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/mod.rs @@ -28,7 +28,8 @@ mod memory_store; mod traits; use matrix_sdk_common::cross_process_lock::{ - CrossProcessLock, CrossProcessLockError, CrossProcessLockGuard, TryLock, + CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard, + TryLock, }; pub use matrix_sdk_store_encryption::Error as StoreEncryptionError; use ruma::{OwnedEventId, events::AnySyncTimelineEvent, serde::Raw}; @@ -188,7 +189,7 @@ impl TryLock for LockableEventCacheStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> std::result::Result { + ) -> std::result::Result, Self::LockError> { self.0.try_take_leased_lock(lease_duration_ms, key, holder).await } } diff --git a/crates/matrix-sdk-base/src/event_cache/store/traits.rs b/crates/matrix-sdk-base/src/event_cache/store/traits.rs index aca4a8e6d..2086aefd0 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/traits.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/traits.rs @@ -17,6 +17,7 @@ use std::{fmt, sync::Arc}; use async_trait::async_trait; use matrix_sdk_common::{ AsyncTraitDeps, + cross_process_lock::CrossProcessLockGeneration, linked_chunk::{ ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position, RawChunk, Update, @@ -46,7 +47,7 @@ pub trait EventCacheStore: AsyncTraitDeps { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result; + ) -> Result, Self::Error>; /// An [`Update`] reflects an operation that has happened inside a linked /// chunk. The linked chunk is used by the event cache to store the events @@ -196,7 +197,7 @@ impl EventCacheStore for EraseEventCacheStoreError { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result, Self::Error> { self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into) } diff --git a/crates/matrix-sdk-base/src/media/store/integration_tests.rs b/crates/matrix-sdk-base/src/media/store/integration_tests.rs index f7bbe59ef..5dda5f127 100644 --- a/crates/matrix-sdk-base/src/media/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/media/store/integration_tests.rs @@ -1337,57 +1337,57 @@ macro_rules! media_store_integration_tests_time { let store = get_media_store().await.unwrap(); let acquired0 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); - assert!(acquired0); + assert_eq!(acquired0, Some(1)); // first lock generation // Should extend the lease automatically (same holder). let acquired2 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(acquired2); + assert_eq!(acquired2, Some(1)); // same lock generation // Should extend the lease automatically (same holder + time is ok). let acquired3 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(acquired3); + assert_eq!(acquired3, Some(1)); // same lock generation // Another attempt at taking the lock should fail, because it's taken. let acquired4 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired4); + assert!(acquired4.is_none()); // not acquired // Even if we insist. let acquired5 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired5); + assert!(acquired5.is_none()); // not acquired // That's a nice test we got here, go take a little nap. sleep(Duration::from_millis(50)).await; // Still too early. let acquired55 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired55); + assert!(acquired55.is_none()); // not acquired // Ok you can take another nap then. sleep(Duration::from_millis(250)).await; // At some point, we do get the lock. let acquired6 = store.try_take_leased_lock(0, "key", "bob").await.unwrap(); - assert!(acquired6); + assert_eq!(acquired6, Some(2)); // new lock generation! sleep(Duration::from_millis(1)).await; // The other gets it almost immediately too. let acquired7 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); - assert!(acquired7); + assert_eq!(acquired7, Some(3)); // new lock generation! sleep(Duration::from_millis(1)).await; - // But when we take a longer lease... + // But when we take a longer lease… let acquired8 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(acquired8); + assert_eq!(acquired8, Some(4)); // new lock generation! // It blocks the other user. let acquired9 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(!acquired9); + assert!(acquired9.is_none()); // not acquired // We can hold onto our lease. let acquired10 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(acquired10); + assert_eq!(acquired10, Some(4)); // same lock generation } } }; diff --git a/crates/matrix-sdk-base/src/media/store/memory_store.rs b/crates/matrix-sdk-base/src/media/store/memory_store.rs index 8b0fab3e0..b97c343ad 100644 --- a/crates/matrix-sdk-base/src/media/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/media/store/memory_store.rs @@ -20,12 +20,13 @@ use std::{ use async_trait::async_trait; use matrix_sdk_common::{ - cross_process_lock::memory_store_helper::try_take_leased_lock, ring_buffer::RingBuffer, -}; -use ruma::{ - MxcUri, OwnedMxcUri, - time::{Instant, SystemTime}, + cross_process_lock::{ + CrossProcessLockGeneration, + memory_store_helper::{Lease, try_take_leased_lock}, + }, + ring_buffer::RingBuffer, }; +use ruma::{MxcUri, OwnedMxcUri, time::SystemTime}; use super::Result; use crate::media::{ @@ -48,7 +49,7 @@ pub struct MemoryMediaStore { #[derive(Debug)] struct MemoryMediaStoreInner { media: RingBuffer, - leases: HashMap, + leases: HashMap, media_retention_policy: Option, last_media_cleanup_time: SystemTime, } @@ -110,7 +111,7 @@ impl MediaStore for MemoryMediaStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result, Self::Error> { let mut inner = self.inner.write().unwrap(); Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder)) diff --git a/crates/matrix-sdk-base/src/media/store/mod.rs b/crates/matrix-sdk-base/src/media/store/mod.rs index ce64a9726..5269c550c 100644 --- a/crates/matrix-sdk-base/src/media/store/mod.rs +++ b/crates/matrix-sdk-base/src/media/store/mod.rs @@ -32,7 +32,8 @@ use std::fmt; use std::{ops::Deref, sync::Arc}; use matrix_sdk_common::cross_process_lock::{ - CrossProcessLock, CrossProcessLockError, CrossProcessLockGuard, TryLock, + CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard, + TryLock, }; use matrix_sdk_store_encryption::Error as StoreEncryptionError; pub use traits::{DynMediaStore, IntoMediaStore, MediaStore, MediaStoreInner}; @@ -172,7 +173,7 @@ impl TryLock for LockableMediaStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> std::result::Result { + ) -> std::result::Result, Self::LockError> { self.0.try_take_leased_lock(lease_duration_ms, key, holder).await } } diff --git a/crates/matrix-sdk-base/src/media/store/traits.rs b/crates/matrix-sdk-base/src/media/store/traits.rs index 553ffa3d0..73b85e391 100644 --- a/crates/matrix-sdk-base/src/media/store/traits.rs +++ b/crates/matrix-sdk-base/src/media/store/traits.rs @@ -17,7 +17,7 @@ use std::{fmt, sync::Arc}; use async_trait::async_trait; -use matrix_sdk_common::AsyncTraitDeps; +use matrix_sdk_common::{AsyncTraitDeps, cross_process_lock::CrossProcessLockGeneration}; use ruma::{MxcUri, time::SystemTime}; #[cfg(doc)] @@ -41,7 +41,7 @@ pub trait MediaStore: AsyncTraitDeps { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result; + ) -> Result, Self::Error>; /// Add a media file's content in the media store. /// @@ -313,7 +313,7 @@ impl MediaStore for EraseMediaStoreError { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result, Self::Error> { self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into) } diff --git a/crates/matrix-sdk-common/src/cross_process_lock.rs b/crates/matrix-sdk-common/src/cross_process_lock.rs index 977ba62f8..bd45e9b89 100644 --- a/crates/matrix-sdk-common/src/cross_process_lock.rs +++ b/crates/matrix-sdk-common/src/cross_process_lock.rs @@ -56,6 +56,16 @@ use crate::{ sleep::sleep, }; +/// A lock generation is an integer incremented each time the lock is taken by +/// a different holder. +/// +/// This is used to know if a lock has been dirtied. +pub type CrossProcessLockGeneration = u64; + +/// Describe the first lock generation value (see +/// [`CrossProcessLockGeneration`]). +pub const FIRST_CROSS_PROCESS_LOCK_GENERATION: CrossProcessLockGeneration = 1; + /// Trait used to try to take a lock. Foundation of [`CrossProcessLock`]. pub trait TryLock { #[cfg(not(target_family = "wasm"))] @@ -80,7 +90,8 @@ pub trait TryLock { lease_duration_ms: u32, key: &str, holder: &str, - ) -> impl Future> + SendOutsideWasm; + ) -> impl Future, Self::LockError>> + + SendOutsideWasm; } /// Small state machine to handle wait times. @@ -214,7 +225,8 @@ where .locker .try_lock(LEASE_DURATION_MS, &self.lock_key, &self.lock_holder) .await - .map_err(|err| CrossProcessLockError::TryLockError(Box::new(err)))?; + .map_err(|err| CrossProcessLockError::TryLockError(Box::new(err)))? + .is_some(); if !acquired { trace!("Couldn't acquire the lock immediately."); @@ -370,7 +382,6 @@ mod tests { use std::{ collections::HashMap, sync::{Arc, RwLock, atomic}, - time::Instant, }; use assert_matches::assert_matches; @@ -381,17 +392,23 @@ mod tests { }; use super::{ - CrossProcessLock, CrossProcessLockError, CrossProcessLockGuard, EXTEND_LEASE_EVERY_MS, - TryLock, memory_store_helper::try_take_leased_lock, + CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard, + EXTEND_LEASE_EVERY_MS, TryLock, + memory_store_helper::{Lease, try_take_leased_lock}, }; #[derive(Clone, Default)] struct TestStore { - leases: Arc>>, + leases: Arc>>, } impl TestStore { - fn try_take_leased_lock(&self, lease_duration_ms: u32, key: &str, holder: &str) -> bool { + fn try_take_leased_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> Option { try_take_leased_lock(&mut self.leases.write().unwrap(), lease_duration_ms, key, holder) } } @@ -408,7 +425,7 @@ mod tests { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result, Self::LockError> { Ok(self.try_take_leased_lock(lease_duration_ms, key, holder)) } } @@ -533,48 +550,63 @@ pub mod memory_store_helper { use ruma::time::{Duration, Instant}; + use super::{CrossProcessLockGeneration, FIRST_CROSS_PROCESS_LOCK_GENERATION}; + + #[derive(Debug)] + pub struct Lease { + holder: String, + expiration: Instant, + generation: CrossProcessLockGeneration, + } + pub fn try_take_leased_lock( - leases: &mut HashMap, + leases: &mut HashMap, lease_duration_ms: u32, key: &str, holder: &str, - ) -> bool { + ) -> Option { let now = Instant::now(); let expiration = now + Duration::from_millis(lease_duration_ms.into()); match leases.entry(key.to_owned()) { // There is an existing holder. Entry::Occupied(mut entry) => { - let (current_holder, current_expiration) = entry.get_mut(); + let Lease { + holder: current_holder, + expiration: current_expiration, + generation: current_generation, + } = entry.get_mut(); if current_holder == holder { // We had the lease before, extend it. *current_expiration = expiration; - true + Some(*current_generation) } else { // We didn't have it. if *current_expiration < now { // Steal it! *current_holder = holder.to_owned(); *current_expiration = expiration; + *current_generation += 1; - true + Some(*current_generation) } else { // We tried our best. - false + None } } } // There is no holder, easy. Entry::Vacant(entry) => { - entry.insert(( - holder.to_owned(), - Instant::now() + Duration::from_millis(lease_duration_ms.into()), - )); + entry.insert(Lease { + holder: holder.to_owned(), + expiration: Instant::now() + Duration::from_millis(lease_duration_ms.into()), + generation: FIRST_CROSS_PROCESS_LOCK_GENERATION, + }); - true + Some(FIRST_CROSS_PROCESS_LOCK_GENERATION) } } } diff --git a/crates/matrix-sdk-crypto/src/store/integration_tests.rs b/crates/matrix-sdk-crypto/src/store/integration_tests.rs index 39314c61a..7e818f81e 100644 --- a/crates/matrix-sdk-crypto/src/store/integration_tests.rs +++ b/crates/matrix-sdk-crypto/src/store/integration_tests.rs @@ -1457,57 +1457,57 @@ macro_rules! cryptostore_integration_tests_time { let (_account, store) = get_loaded_store("lease_locks").await; let acquired0 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); - assert!(acquired0); + assert_eq!(acquired0, Some(1)); // first generation // Should extend the lease automatically (same holder). let acquired2 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(acquired2); + assert_eq!(acquired2, Some(1)); // same lock generation // Should extend the lease automatically (same holder + time is ok). let acquired3 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(acquired3); + assert_eq!(acquired3, Some(1)); // same lock generation // Another attempt at taking the lock should fail, because it's taken. let acquired4 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired4); + assert!(acquired4.is_none()); // not acquired // Even if we insist. let acquired5 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired5); + assert!(acquired5.is_none()); // That's a nice test we got here, go take a little nap. tokio::time::sleep(Duration::from_millis(50)).await; // Still too early. let acquired55 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired55); + assert!(acquired55.is_none()); // not acquired // Ok you can take another nap then. tokio::time::sleep(Duration::from_millis(250)).await; // At some point, we do get the lock. let acquired6 = store.try_take_leased_lock(0, "key", "bob").await.unwrap(); - assert!(acquired6); + assert_eq!(acquired6, Some(2)); // new lock generation! tokio::time::sleep(Duration::from_millis(1)).await; // The other gets it almost immediately too. let acquired7 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); - assert!(acquired7); + assert_eq!(acquired7, Some(3)); // new lock generation! tokio::time::sleep(Duration::from_millis(1)).await; - // But when we take a longer lease... + // But when we take a longer lease… let acquired8 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(acquired8); + assert_eq!(acquired8, Some(4)); // new lock generation! // It blocks the other user. let acquired9 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(!acquired9); + assert!(acquired9.is_none()); // not acquired // We can hold onto our lease. let acquired10 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(acquired10); + assert_eq!(acquired10, Some(4)); // same lock generation } } }; diff --git a/crates/matrix-sdk-crypto/src/store/memorystore.rs b/crates/matrix-sdk-crypto/src/store/memorystore.rs index c1f12bc2c..9b659801d 100644 --- a/crates/matrix-sdk-crypto/src/store/memorystore.rs +++ b/crates/matrix-sdk-crypto/src/store/memorystore.rs @@ -20,11 +20,15 @@ use std::{ use async_trait::async_trait; use matrix_sdk_common::{ - cross_process_lock::memory_store_helper::try_take_leased_lock, locks::RwLock as StdRwLock, + cross_process_lock::{ + memory_store_helper::{try_take_leased_lock, Lease}, + CrossProcessLockGeneration, + }, + locks::RwLock as StdRwLock, }; use ruma::{ - events::secret::request::SecretName, time::Instant, DeviceId, OwnedDeviceId, OwnedRoomId, - OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId, + events::secret::request::SecretName, DeviceId, OwnedDeviceId, OwnedRoomId, OwnedTransactionId, + OwnedUserId, RoomId, TransactionId, UserId, }; use tokio::sync::{Mutex, RwLock}; use tracing::warn; @@ -99,7 +103,7 @@ pub struct MemoryStore { key_requests_by_info: StdRwLock>, direct_withheld_info: StdRwLock>>, custom_values: StdRwLock>>, - leases: StdRwLock>, + leases: StdRwLock>, secret_inbox: StdRwLock>>, backup_keys: RwLock, dehydrated_device_pickle_key: RwLock>, @@ -768,7 +772,7 @@ impl CryptoStore for MemoryStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result> { Ok(try_take_leased_lock(&mut self.leases.write(), lease_duration_ms, key, holder)) } } @@ -1269,6 +1273,7 @@ mod integration_tests { }; use async_trait::async_trait; + use matrix_sdk_common::cross_process_lock::CrossProcessLockGeneration; use ruma::{ events::secret::request::SecretName, DeviceId, OwnedDeviceId, RoomId, TransactionId, UserId, }; @@ -1586,7 +1591,7 @@ mod integration_tests { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result, Self::Error> { self.0.try_take_leased_lock(lease_duration_ms, key, holder).await } diff --git a/crates/matrix-sdk-crypto/src/store/mod.rs b/crates/matrix-sdk-crypto/src/store/mod.rs index f4004e08f..68b0cbd4d 100644 --- a/crates/matrix-sdk-crypto/src/store/mod.rs +++ b/crates/matrix-sdk-crypto/src/store/mod.rs @@ -100,7 +100,9 @@ pub mod integration_tests; pub(crate) use crypto_store_wrapper::CryptoStoreWrapper; pub use error::{CryptoStoreError, Result}; use matrix_sdk_common::{ - cross_process_lock::CrossProcessLock, deserialized_responses::WithheldCode, timeout::timeout, + cross_process_lock::{CrossProcessLock, CrossProcessLockGeneration}, + deserialized_responses::WithheldCode, + timeout::timeout, }; pub use memorystore::MemoryStore; pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore}; @@ -1790,7 +1792,7 @@ impl matrix_sdk_common::cross_process_lock::TryLock for LockableCryptoStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> std::result::Result { + ) -> std::result::Result, Self::LockError> { self.0.try_take_leased_lock(lease_duration_ms, key, holder).await } } diff --git a/crates/matrix-sdk-crypto/src/store/traits.rs b/crates/matrix-sdk-crypto/src/store/traits.rs index 9643ab4c0..eaf2fb9be 100644 --- a/crates/matrix-sdk-crypto/src/store/traits.rs +++ b/crates/matrix-sdk-crypto/src/store/traits.rs @@ -15,7 +15,7 @@ use std::{collections::HashMap, fmt, sync::Arc}; use async_trait::async_trait; -use matrix_sdk_common::AsyncTraitDeps; +use matrix_sdk_common::{cross_process_lock::CrossProcessLockGeneration, AsyncTraitDeps}; use ruma::{ events::secret::request::SecretName, DeviceId, OwnedDeviceId, RoomId, TransactionId, UserId, }; @@ -395,7 +395,7 @@ pub trait CryptoStore: AsyncTraitDeps { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result; + ) -> Result, Self::Error>; /// Load the next-batch token for a to-device query, if any. async fn next_batch_token(&self) -> Result, Self::Error>; @@ -641,7 +641,7 @@ impl CryptoStore for EraseCryptoStoreError { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result, Self::Error> { self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into) } diff --git a/crates/matrix-sdk-sqlite/Cargo.toml b/crates/matrix-sdk-sqlite/Cargo.toml index 83b7e945e..78c8119b4 100644 --- a/crates/matrix-sdk-sqlite/Cargo.toml +++ b/crates/matrix-sdk-sqlite/Cargo.toml @@ -15,7 +15,7 @@ default = ["state-store", "event-cache"] testing = ["matrix-sdk-crypto?/testing"] bundled = ["rusqlite/bundled"] -crypto-store = ["dep:matrix-sdk-crypto"] +crypto-store = ["dep:matrix-sdk-base", "dep:matrix-sdk-crypto"] event-cache = ["dep:matrix-sdk-base"] state-store = ["dep:matrix-sdk-base"] diff --git a/crates/matrix-sdk-sqlite/migrations/crypto_store/013_lease_locks_with_generation.sql b/crates/matrix-sdk-sqlite/migrations/crypto_store/013_lease_locks_with_generation.sql new file mode 100644 index 000000000..5c67fcb8b --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/crypto_store/013_lease_locks_with_generation.sql @@ -0,0 +1,7 @@ +-- Rename the `expiration_ts` column to `expiration` to be consistent with other +-- `lease_locks` tables in other stores. +ALTER TABLE "lease_locks" RENAME COLUMN "expiration_ts" TO "expiration"; + +-- Add the `generation` column to handle _dirtiness. +-- Default value is `FIRST_CROSS_PROCESS_LOCK_GENERATION`. +ALTER TABLE "lease_locks" ADD COLUMN "generation" INTEGER NOT NULL DEFAULT 1; diff --git a/crates/matrix-sdk-sqlite/migrations/event_cache_store/013_lease_locks_with_generation.sql b/crates/matrix-sdk-sqlite/migrations/event_cache_store/013_lease_locks_with_generation.sql new file mode 100644 index 000000000..70496cebc --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/event_cache_store/013_lease_locks_with_generation.sql @@ -0,0 +1,3 @@ +-- Add the `generation` column to handle _dirtiness. +-- Default value is `FIRST_CROSS_PROCESS_LOCK_GENERATION`. +ALTER TABLE "lease_locks" ADD COLUMN "generation" INTEGER NOT NULL DEFAULT 1; diff --git a/crates/matrix-sdk-sqlite/migrations/media_store/002_lease_locks_with_generation.sql b/crates/matrix-sdk-sqlite/migrations/media_store/002_lease_locks_with_generation.sql new file mode 100644 index 000000000..e65cbbb5f --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/media_store/002_lease_locks_with_generation.sql @@ -0,0 +1,3 @@ +-- Add the `generation` column to handle _dirtiness_. +-- Default value is `FIRST_CROSS_PROCESS_LOCK_GENERATION`. +ALTER TABLE "lease_locks" ADD COLUMN "generation" INTEGER NOT NULL DEFAULT 1; diff --git a/crates/matrix-sdk-sqlite/src/crypto_store.rs b/crates/matrix-sdk-sqlite/src/crypto_store.rs index 03ca2d22d..558b30e11 100644 --- a/crates/matrix-sdk-sqlite/src/crypto_store.rs +++ b/crates/matrix-sdk-sqlite/src/crypto_store.rs @@ -21,6 +21,7 @@ use std::{ use async_trait::async_trait; use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime}; +use matrix_sdk_base::{cross_process_lock::CrossProcessLockGeneration, timer}; use matrix_sdk_crypto::{ olm::{ InboundGroupSession, OutboundGroupSession, PickledInboundGroupSession, @@ -175,7 +176,7 @@ impl SqliteCryptoStore { } } -const DATABASE_VERSION: u8 = 12; +const DATABASE_VERSION: u8 = 13; /// key for the dehydrated device pickle key in the key/value table. const DEHYDRATED_DEVICE_PICKLE_KEY: &str = "dehydrated_device_pickle_key"; @@ -301,6 +302,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> { .await?; } + if version < 13 { + conn.with_transaction(|txn| { + txn.execute_batch(include_str!( + "../migrations/crypto_store/013_lease_locks_with_generation.sql" + ))?; + txn.set_db_version(13) + }) + .await?; + } + Ok(()) } @@ -1488,37 +1499,52 @@ impl CryptoStore for SqliteCryptoStore { Ok(()) } + #[instrument(skip(self))] async fn try_take_leased_lock( &self, lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result> { + let _timer = timer!("method"); + let key = key.to_owned(); let holder = holder.to_owned(); - let now_ts: u64 = MilliSecondsSinceUnixEpoch::now().get().into(); - let expiration_ts = now_ts + lease_duration_ms as u64; + let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into(); + let expiration = now + lease_duration_ms as u64; - let num_touched = self + // Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html. + let generation = self .acquire() .await? .with_transaction(move |txn| { - txn.execute( - "INSERT INTO lease_locks (key, holder, expiration_ts) + txn.query_row( + "INSERT INTO lease_locks (key, holder, expiration) VALUES (?1, ?2, ?3) ON CONFLICT (key) DO - UPDATE SET holder = ?2, expiration_ts = ?3 - WHERE holder = ?2 - OR expiration_ts < ?4 - ", - (key, holder, expiration_ts, now_ts), + UPDATE SET + holder = excluded.holder, + expiration = excluded.expiration, + generation = + CASE holder + WHEN excluded.holder THEN generation + ELSE generation + 1 + END + WHERE + holder = excluded.holder + OR expiration < ?4 + RETURNING generation + ", + (key, holder, expiration, now), + |row| row.get(0), ) + .optional() }) .await?; - Ok(num_touched == 1) + Ok(generation) } async fn next_batch_token(&self) -> Result, Self::Error> { diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index d029c40fc..4fb20222c 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -19,6 +19,7 @@ use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc}; use async_trait::async_trait; use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime}; use matrix_sdk_base::{ + cross_process_lock::CrossProcessLockGeneration, deserialized_responses::TimelineEvent, event_cache::{ store::{extract_event_relation, EventCacheStore}, @@ -66,7 +67,7 @@ const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3"; /// This is used to figure whether the SQLite database requires a migration. /// Every new SQL migration should imply a bump of this number, and changes in /// the [`run_migrations`] function. -const DATABASE_VERSION: u8 = 12; +const DATABASE_VERSION: u8 = 13; /// The string used to identify a chunk of type events, in the `type` field in /// the database. @@ -485,6 +486,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> { .await?; } + if version < 13 { + conn.with_transaction(|txn| { + txn.execute_batch(include_str!( + "../migrations/event_cache_store/013_lease_locks_with_generation.sql" + ))?; + txn.set_db_version(13) + }) + .await?; + } + Ok(()) } @@ -498,7 +509,7 @@ impl EventCacheStore for SqliteEventCacheStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result> { let _timer = timer!("method"); let key = key.to_owned(); @@ -507,25 +518,37 @@ impl EventCacheStore for SqliteEventCacheStore { let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into(); let expiration = now + lease_duration_ms as u64; - let num_touched = self + // Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html. + let generation = self .write() .await? .with_transaction(move |txn| { - txn.execute( + txn.query_row( "INSERT INTO lease_locks (key, holder, expiration) VALUES (?1, ?2, ?3) ON CONFLICT (key) DO - UPDATE SET holder = ?2, expiration = ?3 - WHERE holder = ?2 - OR expiration < ?4 - ", + UPDATE SET + holder = excluded.holder, + expiration = excluded.expiration, + generation = + CASE holder + WHEN excluded.holder THEN generation + ELSE generation + 1 + END + WHERE + holder = excluded.holder + OR expiration < ?4 + RETURNING generation + ", (key, holder, expiration, now), + |row| row.get(0), ) + .optional() }) .await?; - Ok(num_touched == 1) + Ok(generation) } #[instrument(skip(self, updates))] diff --git a/crates/matrix-sdk-sqlite/src/media_store.rs b/crates/matrix-sdk-sqlite/src/media_store.rs index 1bcc4d860..a7275f550 100644 --- a/crates/matrix-sdk-sqlite/src/media_store.rs +++ b/crates/matrix-sdk-sqlite/src/media_store.rs @@ -19,6 +19,7 @@ use std::{fmt, path::Path, sync::Arc}; use async_trait::async_trait; use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime}; use matrix_sdk_base::{ + cross_process_lock::CrossProcessLockGeneration, media::{ store::{ IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStore, @@ -63,7 +64,7 @@ const DATABASE_NAME: &str = "matrix-sdk-media.sqlite3"; /// This is used to figure whether the SQLite database requires a migration. /// Every new SQL migration should imply a bump of this number, and changes in /// the [`run_migrations`] function. -const DATABASE_VERSION: u8 = 1; +const DATABASE_VERSION: u8 = 2; /// An SQLite-based media store. #[derive(Clone)] @@ -225,6 +226,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> { .await?; } + if version < 2 { + conn.with_transaction(|txn| { + txn.execute_batch(include_str!( + "../migrations/media_store/002_lease_locks_with_generation.sql" + ))?; + txn.set_db_version(2) + }) + .await?; + } + Ok(()) } @@ -238,7 +249,7 @@ impl MediaStore for SqliteMediaStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result> { let _timer = timer!("method"); let key = key.to_owned(); @@ -247,25 +258,37 @@ impl MediaStore for SqliteMediaStore { let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into(); let expiration = now + lease_duration_ms as u64; - let num_touched = self + // Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html. + let generation = self .write() .await? .with_transaction(move |txn| { - txn.execute( + txn.query_row( "INSERT INTO lease_locks (key, holder, expiration) VALUES (?1, ?2, ?3) ON CONFLICT (key) DO - UPDATE SET holder = ?2, expiration = ?3 - WHERE holder = ?2 - OR expiration < ?4 - ", + UPDATE SET + holder = excluded.holder, + expiration = excluded.expiration, + generation = + CASE holder + WHEN excluded.holder THEN generation + ELSE generation + 1 + END + WHERE + holder = excluded.holder + OR expiration < ?4 + RETURNING generation + ", (key, holder, expiration, now), + |row| row.get(0), ) + .optional() }) .await?; - Ok(num_touched == 1) + Ok(generation) } async fn add_media_content(