feat(common): Add a cross-process lock generation.

This patch adds `CrossProcessLockGeneration`. A lock generation is an
integer incremented each time the lock is taken by another holder. If
the generation changes, it means the lock is _dirtied_. This _dirtying_
aspect is going to be expanded in the next patches. This patch focuses
on the introduction of this _generation_.

The `CrossProcessLock::try_lock_once` method, and
the `TryLock::try_lock` method, both returns a
`Option<CrossProcessLockGeneration>` instead of a `bool`: `true` is
replaced by `Some(_)`, `false` by `None`.
This commit is contained in:
Ivan Enderlin
2025-11-04 15:13:35 +01:00
parent 01d75e939c
commit 6c922e69d0
20 changed files with 249 additions and 118 deletions

View File

@@ -1408,57 +1408,57 @@ macro_rules! event_cache_store_integration_tests_time {
let store = get_event_cache_store().await.unwrap().into_event_cache_store();
let acquired0 = store.try_take_leased_lock(0, "key", "alice").await.unwrap();
assert!(acquired0);
assert_eq!(acquired0, Some(1)); // first lock generation
// Should extend the lease automatically (same holder).
let acquired2 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(acquired2);
assert_eq!(acquired2, Some(1)); // same lock generation
// Should extend the lease automatically (same holder + time is ok).
let acquired3 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(acquired3);
assert_eq!(acquired3, Some(1)); // same lock generation
// Another attempt at taking the lock should fail, because it's taken.
let acquired4 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired4);
assert!(acquired4.is_none()); // not acquired
// Even if we insist.
let acquired5 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired5);
assert!(acquired5.is_none()); // not acquired
// That's a nice test we got here, go take a little nap.
sleep(Duration::from_millis(50)).await;
// Still too early.
let acquired55 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired55);
assert!(acquired55.is_none()); // not acquired
// Ok you can take another nap then.
sleep(Duration::from_millis(250)).await;
// At some point, we do get the lock.
let acquired6 = store.try_take_leased_lock(0, "key", "bob").await.unwrap();
assert!(acquired6);
assert_eq!(acquired6, Some(2)); // new lock generation!
sleep(Duration::from_millis(1)).await;
// The other gets it almost immediately too.
let acquired7 = store.try_take_leased_lock(0, "key", "alice").await.unwrap();
assert!(acquired7);
assert_eq!(acquired7, Some(3)); // new lock generation!
sleep(Duration::from_millis(1)).await;
// But when we take a longer lease...
// But when we take a longer lease
let acquired8 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(acquired8);
assert_eq!(acquired8, Some(4)); // new lock generation!
// It blocks the other user.
let acquired9 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(!acquired9);
assert!(acquired9.is_none()); // not acquired
// We can hold onto our lease.
let acquired10 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(acquired10);
assert_eq!(acquired10, Some(4)); // same lock generation
}
}
};

View File

@@ -19,13 +19,16 @@ use std::{
use async_trait::async_trait;
use matrix_sdk_common::{
cross_process_lock::memory_store_helper::try_take_leased_lock,
cross_process_lock::{
CrossProcessLockGeneration,
memory_store_helper::{Lease, try_take_leased_lock},
},
linked_chunk::{
ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
RawChunk, Update, relational::RelationalLinkedChunk,
},
};
use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType, time::Instant};
use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType};
use tracing::error;
use super::{EventCacheStore, EventCacheStoreError, Result, extract_event_relation};
@@ -41,7 +44,7 @@ pub struct MemoryStore {
#[derive(Debug)]
struct MemoryStoreInner {
leases: HashMap<String, (String, Instant)>,
leases: HashMap<String, Lease>,
events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
}
@@ -73,7 +76,7 @@ impl EventCacheStore for MemoryStore {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error> {
) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
let mut inner = self.inner.write().unwrap();
Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))

View File

@@ -28,7 +28,8 @@ mod memory_store;
mod traits;
use matrix_sdk_common::cross_process_lock::{
CrossProcessLock, CrossProcessLockError, CrossProcessLockGuard, TryLock,
CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard,
TryLock,
};
pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
use ruma::{OwnedEventId, events::AnySyncTimelineEvent, serde::Raw};
@@ -188,7 +189,7 @@ impl TryLock for LockableEventCacheStore {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> std::result::Result<bool, Self::LockError> {
) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
}
}

View File

@@ -17,6 +17,7 @@ use std::{fmt, sync::Arc};
use async_trait::async_trait;
use matrix_sdk_common::{
AsyncTraitDeps,
cross_process_lock::CrossProcessLockGeneration,
linked_chunk::{
ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
RawChunk, Update,
@@ -46,7 +47,7 @@ pub trait EventCacheStore: AsyncTraitDeps {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error>;
) -> Result<Option<CrossProcessLockGeneration>, Self::Error>;
/// An [`Update`] reflects an operation that has happened inside a linked
/// chunk. The linked chunk is used by the event cache to store the events
@@ -196,7 +197,7 @@ impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error> {
) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into)
}

View File

@@ -1337,57 +1337,57 @@ macro_rules! media_store_integration_tests_time {
let store = get_media_store().await.unwrap();
let acquired0 = store.try_take_leased_lock(0, "key", "alice").await.unwrap();
assert!(acquired0);
assert_eq!(acquired0, Some(1)); // first lock generation
// Should extend the lease automatically (same holder).
let acquired2 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(acquired2);
assert_eq!(acquired2, Some(1)); // same lock generation
// Should extend the lease automatically (same holder + time is ok).
let acquired3 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(acquired3);
assert_eq!(acquired3, Some(1)); // same lock generation
// Another attempt at taking the lock should fail, because it's taken.
let acquired4 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired4);
assert!(acquired4.is_none()); // not acquired
// Even if we insist.
let acquired5 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired5);
assert!(acquired5.is_none()); // not acquired
// That's a nice test we got here, go take a little nap.
sleep(Duration::from_millis(50)).await;
// Still too early.
let acquired55 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired55);
assert!(acquired55.is_none()); // not acquired
// Ok you can take another nap then.
sleep(Duration::from_millis(250)).await;
// At some point, we do get the lock.
let acquired6 = store.try_take_leased_lock(0, "key", "bob").await.unwrap();
assert!(acquired6);
assert_eq!(acquired6, Some(2)); // new lock generation!
sleep(Duration::from_millis(1)).await;
// The other gets it almost immediately too.
let acquired7 = store.try_take_leased_lock(0, "key", "alice").await.unwrap();
assert!(acquired7);
assert_eq!(acquired7, Some(3)); // new lock generation!
sleep(Duration::from_millis(1)).await;
// But when we take a longer lease...
// But when we take a longer lease
let acquired8 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(acquired8);
assert_eq!(acquired8, Some(4)); // new lock generation!
// It blocks the other user.
let acquired9 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(!acquired9);
assert!(acquired9.is_none()); // not acquired
// We can hold onto our lease.
let acquired10 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(acquired10);
assert_eq!(acquired10, Some(4)); // same lock generation
}
}
};

View File

@@ -20,12 +20,13 @@ use std::{
use async_trait::async_trait;
use matrix_sdk_common::{
cross_process_lock::memory_store_helper::try_take_leased_lock, ring_buffer::RingBuffer,
};
use ruma::{
MxcUri, OwnedMxcUri,
time::{Instant, SystemTime},
cross_process_lock::{
CrossProcessLockGeneration,
memory_store_helper::{Lease, try_take_leased_lock},
},
ring_buffer::RingBuffer,
};
use ruma::{MxcUri, OwnedMxcUri, time::SystemTime};
use super::Result;
use crate::media::{
@@ -48,7 +49,7 @@ pub struct MemoryMediaStore {
#[derive(Debug)]
struct MemoryMediaStoreInner {
media: RingBuffer<MediaContent>,
leases: HashMap<String, (String, Instant)>,
leases: HashMap<String, Lease>,
media_retention_policy: Option<MediaRetentionPolicy>,
last_media_cleanup_time: SystemTime,
}
@@ -110,7 +111,7 @@ impl MediaStore for MemoryMediaStore {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error> {
) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
let mut inner = self.inner.write().unwrap();
Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))

View File

@@ -32,7 +32,8 @@ use std::fmt;
use std::{ops::Deref, sync::Arc};
use matrix_sdk_common::cross_process_lock::{
CrossProcessLock, CrossProcessLockError, CrossProcessLockGuard, TryLock,
CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard,
TryLock,
};
use matrix_sdk_store_encryption::Error as StoreEncryptionError;
pub use traits::{DynMediaStore, IntoMediaStore, MediaStore, MediaStoreInner};
@@ -172,7 +173,7 @@ impl TryLock for LockableMediaStore {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> std::result::Result<bool, Self::LockError> {
) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
}
}

View File

@@ -17,7 +17,7 @@
use std::{fmt, sync::Arc};
use async_trait::async_trait;
use matrix_sdk_common::AsyncTraitDeps;
use matrix_sdk_common::{AsyncTraitDeps, cross_process_lock::CrossProcessLockGeneration};
use ruma::{MxcUri, time::SystemTime};
#[cfg(doc)]
@@ -41,7 +41,7 @@ pub trait MediaStore: AsyncTraitDeps {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error>;
) -> Result<Option<CrossProcessLockGeneration>, Self::Error>;
/// Add a media file's content in the media store.
///
@@ -313,7 +313,7 @@ impl<T: MediaStore> MediaStore for EraseMediaStoreError<T> {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error> {
) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into)
}

View File

@@ -56,6 +56,16 @@ use crate::{
sleep::sleep,
};
/// A lock generation is an integer incremented each time the lock is taken by
/// a different holder.
///
/// This is used to know if a lock has been dirtied.
pub type CrossProcessLockGeneration = u64;
/// Describe the first lock generation value (see
/// [`CrossProcessLockGeneration`]).
pub const FIRST_CROSS_PROCESS_LOCK_GENERATION: CrossProcessLockGeneration = 1;
/// Trait used to try to take a lock. Foundation of [`CrossProcessLock`].
pub trait TryLock {
#[cfg(not(target_family = "wasm"))]
@@ -80,7 +90,8 @@ pub trait TryLock {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> impl Future<Output = Result<bool, Self::LockError>> + SendOutsideWasm;
) -> impl Future<Output = Result<Option<CrossProcessLockGeneration>, Self::LockError>>
+ SendOutsideWasm;
}
/// Small state machine to handle wait times.
@@ -214,7 +225,8 @@ where
.locker
.try_lock(LEASE_DURATION_MS, &self.lock_key, &self.lock_holder)
.await
.map_err(|err| CrossProcessLockError::TryLockError(Box::new(err)))?;
.map_err(|err| CrossProcessLockError::TryLockError(Box::new(err)))?
.is_some();
if !acquired {
trace!("Couldn't acquire the lock immediately.");
@@ -370,7 +382,6 @@ mod tests {
use std::{
collections::HashMap,
sync::{Arc, RwLock, atomic},
time::Instant,
};
use assert_matches::assert_matches;
@@ -381,17 +392,23 @@ mod tests {
};
use super::{
CrossProcessLock, CrossProcessLockError, CrossProcessLockGuard, EXTEND_LEASE_EVERY_MS,
TryLock, memory_store_helper::try_take_leased_lock,
CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard,
EXTEND_LEASE_EVERY_MS, TryLock,
memory_store_helper::{Lease, try_take_leased_lock},
};
#[derive(Clone, Default)]
struct TestStore {
leases: Arc<RwLock<HashMap<String, (String, Instant)>>>,
leases: Arc<RwLock<HashMap<String, Lease>>>,
}
impl TestStore {
fn try_take_leased_lock(&self, lease_duration_ms: u32, key: &str, holder: &str) -> bool {
fn try_take_leased_lock(
&self,
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Option<CrossProcessLockGeneration> {
try_take_leased_lock(&mut self.leases.write().unwrap(), lease_duration_ms, key, holder)
}
}
@@ -408,7 +425,7 @@ mod tests {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::LockError> {
) -> Result<Option<CrossProcessLockGeneration>, Self::LockError> {
Ok(self.try_take_leased_lock(lease_duration_ms, key, holder))
}
}
@@ -533,48 +550,63 @@ pub mod memory_store_helper {
use ruma::time::{Duration, Instant};
use super::{CrossProcessLockGeneration, FIRST_CROSS_PROCESS_LOCK_GENERATION};
#[derive(Debug)]
pub struct Lease {
holder: String,
expiration: Instant,
generation: CrossProcessLockGeneration,
}
pub fn try_take_leased_lock(
leases: &mut HashMap<String, (String, Instant)>,
leases: &mut HashMap<String, Lease>,
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> bool {
) -> Option<CrossProcessLockGeneration> {
let now = Instant::now();
let expiration = now + Duration::from_millis(lease_duration_ms.into());
match leases.entry(key.to_owned()) {
// There is an existing holder.
Entry::Occupied(mut entry) => {
let (current_holder, current_expiration) = entry.get_mut();
let Lease {
holder: current_holder,
expiration: current_expiration,
generation: current_generation,
} = entry.get_mut();
if current_holder == holder {
// We had the lease before, extend it.
*current_expiration = expiration;
true
Some(*current_generation)
} else {
// We didn't have it.
if *current_expiration < now {
// Steal it!
*current_holder = holder.to_owned();
*current_expiration = expiration;
*current_generation += 1;
true
Some(*current_generation)
} else {
// We tried our best.
false
None
}
}
}
// There is no holder, easy.
Entry::Vacant(entry) => {
entry.insert((
holder.to_owned(),
Instant::now() + Duration::from_millis(lease_duration_ms.into()),
));
entry.insert(Lease {
holder: holder.to_owned(),
expiration: Instant::now() + Duration::from_millis(lease_duration_ms.into()),
generation: FIRST_CROSS_PROCESS_LOCK_GENERATION,
});
true
Some(FIRST_CROSS_PROCESS_LOCK_GENERATION)
}
}
}

View File

@@ -1457,57 +1457,57 @@ macro_rules! cryptostore_integration_tests_time {
let (_account, store) = get_loaded_store("lease_locks").await;
let acquired0 = store.try_take_leased_lock(0, "key", "alice").await.unwrap();
assert!(acquired0);
assert_eq!(acquired0, Some(1)); // first generation
// Should extend the lease automatically (same holder).
let acquired2 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(acquired2);
assert_eq!(acquired2, Some(1)); // same lock generation
// Should extend the lease automatically (same holder + time is ok).
let acquired3 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(acquired3);
assert_eq!(acquired3, Some(1)); // same lock generation
// Another attempt at taking the lock should fail, because it's taken.
let acquired4 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired4);
assert!(acquired4.is_none()); // not acquired
// Even if we insist.
let acquired5 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired5);
assert!(acquired5.is_none());
// That's a nice test we got here, go take a little nap.
tokio::time::sleep(Duration::from_millis(50)).await;
// Still too early.
let acquired55 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired55);
assert!(acquired55.is_none()); // not acquired
// Ok you can take another nap then.
tokio::time::sleep(Duration::from_millis(250)).await;
// At some point, we do get the lock.
let acquired6 = store.try_take_leased_lock(0, "key", "bob").await.unwrap();
assert!(acquired6);
assert_eq!(acquired6, Some(2)); // new lock generation!
tokio::time::sleep(Duration::from_millis(1)).await;
// The other gets it almost immediately too.
let acquired7 = store.try_take_leased_lock(0, "key", "alice").await.unwrap();
assert!(acquired7);
assert_eq!(acquired7, Some(3)); // new lock generation!
tokio::time::sleep(Duration::from_millis(1)).await;
// But when we take a longer lease...
// But when we take a longer lease
let acquired8 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(acquired8);
assert_eq!(acquired8, Some(4)); // new lock generation!
// It blocks the other user.
let acquired9 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(!acquired9);
assert!(acquired9.is_none()); // not acquired
// We can hold onto our lease.
let acquired10 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(acquired10);
assert_eq!(acquired10, Some(4)); // same lock generation
}
}
};

View File

@@ -20,11 +20,15 @@ use std::{
use async_trait::async_trait;
use matrix_sdk_common::{
cross_process_lock::memory_store_helper::try_take_leased_lock, locks::RwLock as StdRwLock,
cross_process_lock::{
memory_store_helper::{try_take_leased_lock, Lease},
CrossProcessLockGeneration,
},
locks::RwLock as StdRwLock,
};
use ruma::{
events::secret::request::SecretName, time::Instant, DeviceId, OwnedDeviceId, OwnedRoomId,
OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
events::secret::request::SecretName, DeviceId, OwnedDeviceId, OwnedRoomId, OwnedTransactionId,
OwnedUserId, RoomId, TransactionId, UserId,
};
use tokio::sync::{Mutex, RwLock};
use tracing::warn;
@@ -99,7 +103,7 @@ pub struct MemoryStore {
key_requests_by_info: StdRwLock<HashMap<String, OwnedTransactionId>>,
direct_withheld_info: StdRwLock<HashMap<OwnedRoomId, HashMap<String, RoomKeyWithheldEntry>>>,
custom_values: StdRwLock<HashMap<String, Vec<u8>>>,
leases: StdRwLock<HashMap<String, (String, Instant)>>,
leases: StdRwLock<HashMap<String, Lease>>,
secret_inbox: StdRwLock<HashMap<String, Vec<GossippedSecret>>>,
backup_keys: RwLock<BackupKeys>,
dehydrated_device_pickle_key: RwLock<Option<DehydratedDeviceKey>>,
@@ -768,7 +772,7 @@ impl CryptoStore for MemoryStore {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool> {
) -> Result<Option<CrossProcessLockGeneration>> {
Ok(try_take_leased_lock(&mut self.leases.write(), lease_duration_ms, key, holder))
}
}
@@ -1269,6 +1273,7 @@ mod integration_tests {
};
use async_trait::async_trait;
use matrix_sdk_common::cross_process_lock::CrossProcessLockGeneration;
use ruma::{
events::secret::request::SecretName, DeviceId, OwnedDeviceId, RoomId, TransactionId, UserId,
};
@@ -1586,7 +1591,7 @@ mod integration_tests {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error> {
) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
}

View File

@@ -100,7 +100,9 @@ pub mod integration_tests;
pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
pub use error::{CryptoStoreError, Result};
use matrix_sdk_common::{
cross_process_lock::CrossProcessLock, deserialized_responses::WithheldCode, timeout::timeout,
cross_process_lock::{CrossProcessLock, CrossProcessLockGeneration},
deserialized_responses::WithheldCode,
timeout::timeout,
};
pub use memorystore::MemoryStore;
pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
@@ -1790,7 +1792,7 @@ impl matrix_sdk_common::cross_process_lock::TryLock for LockableCryptoStore {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> std::result::Result<bool, Self::LockError> {
) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
}
}

View File

@@ -15,7 +15,7 @@
use std::{collections::HashMap, fmt, sync::Arc};
use async_trait::async_trait;
use matrix_sdk_common::AsyncTraitDeps;
use matrix_sdk_common::{cross_process_lock::CrossProcessLockGeneration, AsyncTraitDeps};
use ruma::{
events::secret::request::SecretName, DeviceId, OwnedDeviceId, RoomId, TransactionId, UserId,
};
@@ -395,7 +395,7 @@ pub trait CryptoStore: AsyncTraitDeps {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error>;
) -> Result<Option<CrossProcessLockGeneration>, Self::Error>;
/// Load the next-batch token for a to-device query, if any.
async fn next_batch_token(&self) -> Result<Option<String>, Self::Error>;
@@ -641,7 +641,7 @@ impl<T: CryptoStore> CryptoStore for EraseCryptoStoreError<T> {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error> {
) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into)
}

View File

@@ -15,7 +15,7 @@ default = ["state-store", "event-cache"]
testing = ["matrix-sdk-crypto?/testing"]
bundled = ["rusqlite/bundled"]
crypto-store = ["dep:matrix-sdk-crypto"]
crypto-store = ["dep:matrix-sdk-base", "dep:matrix-sdk-crypto"]
event-cache = ["dep:matrix-sdk-base"]
state-store = ["dep:matrix-sdk-base"]

View File

@@ -0,0 +1,7 @@
-- Rename the `expiration_ts` column to `expiration` to be consistent with other
-- `lease_locks` tables in other stores.
ALTER TABLE "lease_locks" RENAME COLUMN "expiration_ts" TO "expiration";
-- Add the `generation` column to handle _dirtiness.
-- Default value is `FIRST_CROSS_PROCESS_LOCK_GENERATION`.
ALTER TABLE "lease_locks" ADD COLUMN "generation" INTEGER NOT NULL DEFAULT 1;

View File

@@ -0,0 +1,3 @@
-- Add the `generation` column to handle _dirtiness.
-- Default value is `FIRST_CROSS_PROCESS_LOCK_GENERATION`.
ALTER TABLE "lease_locks" ADD COLUMN "generation" INTEGER NOT NULL DEFAULT 1;

View File

@@ -0,0 +1,3 @@
-- Add the `generation` column to handle _dirtiness_.
-- Default value is `FIRST_CROSS_PROCESS_LOCK_GENERATION`.
ALTER TABLE "lease_locks" ADD COLUMN "generation" INTEGER NOT NULL DEFAULT 1;

View File

@@ -21,6 +21,7 @@ use std::{
use async_trait::async_trait;
use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
use matrix_sdk_base::{cross_process_lock::CrossProcessLockGeneration, timer};
use matrix_sdk_crypto::{
olm::{
InboundGroupSession, OutboundGroupSession, PickledInboundGroupSession,
@@ -175,7 +176,7 @@ impl SqliteCryptoStore {
}
}
const DATABASE_VERSION: u8 = 12;
const DATABASE_VERSION: u8 = 13;
/// key for the dehydrated device pickle key in the key/value table.
const DEHYDRATED_DEVICE_PICKLE_KEY: &str = "dehydrated_device_pickle_key";
@@ -301,6 +302,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
.await?;
}
if version < 13 {
conn.with_transaction(|txn| {
txn.execute_batch(include_str!(
"../migrations/crypto_store/013_lease_locks_with_generation.sql"
))?;
txn.set_db_version(13)
})
.await?;
}
Ok(())
}
@@ -1488,37 +1499,52 @@ impl CryptoStore for SqliteCryptoStore {
Ok(())
}
#[instrument(skip(self))]
async fn try_take_leased_lock(
&self,
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool> {
) -> Result<Option<CrossProcessLockGeneration>> {
let _timer = timer!("method");
let key = key.to_owned();
let holder = holder.to_owned();
let now_ts: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
let expiration_ts = now_ts + lease_duration_ms as u64;
let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
let expiration = now + lease_duration_ms as u64;
let num_touched = self
// Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html.
let generation = self
.acquire()
.await?
.with_transaction(move |txn| {
txn.execute(
"INSERT INTO lease_locks (key, holder, expiration_ts)
txn.query_row(
"INSERT INTO lease_locks (key, holder, expiration)
VALUES (?1, ?2, ?3)
ON CONFLICT (key)
DO
UPDATE SET holder = ?2, expiration_ts = ?3
WHERE holder = ?2
OR expiration_ts < ?4
",
(key, holder, expiration_ts, now_ts),
UPDATE SET
holder = excluded.holder,
expiration = excluded.expiration,
generation =
CASE holder
WHEN excluded.holder THEN generation
ELSE generation + 1
END
WHERE
holder = excluded.holder
OR expiration < ?4
RETURNING generation
",
(key, holder, expiration, now),
|row| row.get(0),
)
.optional()
})
.await?;
Ok(num_touched == 1)
Ok(generation)
}
async fn next_batch_token(&self) -> Result<Option<String>, Self::Error> {

View File

@@ -19,6 +19,7 @@ use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
use async_trait::async_trait;
use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
use matrix_sdk_base::{
cross_process_lock::CrossProcessLockGeneration,
deserialized_responses::TimelineEvent,
event_cache::{
store::{extract_event_relation, EventCacheStore},
@@ -66,7 +67,7 @@ const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
/// This is used to figure whether the SQLite database requires a migration.
/// Every new SQL migration should imply a bump of this number, and changes in
/// the [`run_migrations`] function.
const DATABASE_VERSION: u8 = 12;
const DATABASE_VERSION: u8 = 13;
/// The string used to identify a chunk of type events, in the `type` field in
/// the database.
@@ -485,6 +486,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
.await?;
}
if version < 13 {
conn.with_transaction(|txn| {
txn.execute_batch(include_str!(
"../migrations/event_cache_store/013_lease_locks_with_generation.sql"
))?;
txn.set_db_version(13)
})
.await?;
}
Ok(())
}
@@ -498,7 +509,7 @@ impl EventCacheStore for SqliteEventCacheStore {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool> {
) -> Result<Option<CrossProcessLockGeneration>> {
let _timer = timer!("method");
let key = key.to_owned();
@@ -507,25 +518,37 @@ impl EventCacheStore for SqliteEventCacheStore {
let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
let expiration = now + lease_duration_ms as u64;
let num_touched = self
// Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html.
let generation = self
.write()
.await?
.with_transaction(move |txn| {
txn.execute(
txn.query_row(
"INSERT INTO lease_locks (key, holder, expiration)
VALUES (?1, ?2, ?3)
ON CONFLICT (key)
DO
UPDATE SET holder = ?2, expiration = ?3
WHERE holder = ?2
OR expiration < ?4
",
UPDATE SET
holder = excluded.holder,
expiration = excluded.expiration,
generation =
CASE holder
WHEN excluded.holder THEN generation
ELSE generation + 1
END
WHERE
holder = excluded.holder
OR expiration < ?4
RETURNING generation
",
(key, holder, expiration, now),
|row| row.get(0),
)
.optional()
})
.await?;
Ok(num_touched == 1)
Ok(generation)
}
#[instrument(skip(self, updates))]

View File

@@ -19,6 +19,7 @@ use std::{fmt, path::Path, sync::Arc};
use async_trait::async_trait;
use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
use matrix_sdk_base::{
cross_process_lock::CrossProcessLockGeneration,
media::{
store::{
IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStore,
@@ -63,7 +64,7 @@ const DATABASE_NAME: &str = "matrix-sdk-media.sqlite3";
/// This is used to figure whether the SQLite database requires a migration.
/// Every new SQL migration should imply a bump of this number, and changes in
/// the [`run_migrations`] function.
const DATABASE_VERSION: u8 = 1;
const DATABASE_VERSION: u8 = 2;
/// An SQLite-based media store.
#[derive(Clone)]
@@ -225,6 +226,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
.await?;
}
if version < 2 {
conn.with_transaction(|txn| {
txn.execute_batch(include_str!(
"../migrations/media_store/002_lease_locks_with_generation.sql"
))?;
txn.set_db_version(2)
})
.await?;
}
Ok(())
}
@@ -238,7 +249,7 @@ impl MediaStore for SqliteMediaStore {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool> {
) -> Result<Option<CrossProcessLockGeneration>> {
let _timer = timer!("method");
let key = key.to_owned();
@@ -247,25 +258,37 @@ impl MediaStore for SqliteMediaStore {
let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
let expiration = now + lease_duration_ms as u64;
let num_touched = self
// Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html.
let generation = self
.write()
.await?
.with_transaction(move |txn| {
txn.execute(
txn.query_row(
"INSERT INTO lease_locks (key, holder, expiration)
VALUES (?1, ?2, ?3)
ON CONFLICT (key)
DO
UPDATE SET holder = ?2, expiration = ?3
WHERE holder = ?2
OR expiration < ?4
",
UPDATE SET
holder = excluded.holder,
expiration = excluded.expiration,
generation =
CASE holder
WHEN excluded.holder THEN generation
ELSE generation + 1
END
WHERE
holder = excluded.holder
OR expiration < ?4
RETURNING generation
",
(key, holder, expiration, now),
|row| row.get(0),
)
.optional()
})
.await?;
Ok(num_touched == 1)
Ok(generation)
}
async fn add_media_content(