refactor: Implement try_take_leased_lock on SqliteEventCacheStore

This commit is contained in:
Ivan Enderlin
2024-10-30 16:04:51 +01:00
parent 16a86587ea
commit 37304c8cdc
2 changed files with 42 additions and 4 deletions

View File

@@ -0,0 +1,5 @@
CREATE TABLE "lease_locks" (
"key" TEXT PRIMARY KEY NOT NULL,
"holder" TEXT NOT NULL,
"expiration" REAL NOT NULL
);

View File

@@ -7,6 +7,7 @@ use matrix_sdk_base::{
media::{MediaRequest, UniqueKey},
};
use matrix_sdk_store_encryption::StoreCipher;
use ruma::MilliSecondsSinceUnixEpoch;
use rusqlite::OptionalExtension;
use tokio::fs;
use tracing::debug;
@@ -26,8 +27,8 @@ mod keys {
///
/// This is used to figure whether the SQLite database requires a migration.
/// Every new SQL migration should imply a bump of this number, and changes in
/// the [`SqliteEventCacheStore::run_migrations`] function.
const DATABASE_VERSION: u8 = 1;
/// the [`run_migrations`] function.
const DATABASE_VERSION: u8 = 2;
/// A SQLite-based event cache store.
#[derive(Clone)]
@@ -133,6 +134,14 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
.await?;
}
if version < 2 {
conn.with_transaction(|txn| {
txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
txn.set_db_version(2)
})
.await?;
}
Ok(())
}
@@ -145,8 +154,32 @@ impl EventCacheStore for SqliteEventCacheStore {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error> {
todo!()
) -> Result<bool> {
let key = key.to_owned();
let holder = holder.to_owned();
let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
let expiration = now + lease_duration_ms as u64;
let num_touched = self
.acquire()
.await?
.with_transaction(move |txn| {
txn.execute(
"INSERT INTO lease_locks (key, holder, expiration)
VALUES (?1, ?2, ?3)
ON CONFLICT (key)
DO
UPDATE SET holder = ?2, expiration = ?3
WHERE holder = ?2
OR expiration < ?4
",
(key, holder, expiration, now),
)
})
.await?;
Ok(num_touched == 1)
}
async fn add_media_content(&self, request: &MediaRequest, content: Vec<u8>) -> Result<()> {