feat(indexeddb): Add Lease::generation in crypto, media, and event cache stores.

This patch adds `Lease::generation` support in the crypto, media and
event cache stores.

For the crypto store, we add the new `lease_locks` object store/table.
Previously, `Lease` was stored in `core`, but without any prefix, it's
easy to overwrite another records, it's dangerous. The sad thing is
that it's hard to delete the existing leases in `core` because the keys
aren't known. See the comment in the code explaining the tradeoff.

For media and event cache stores, the already existing `leases` object
store/table is cleared so that we can change the format of `Lease`
easily.
This commit is contained in:
Ivan Enderlin
2025-11-04 14:13:27 +01:00
parent 6c922e69d0
commit f7a767ce97
10 changed files with 278 additions and 107 deletions

View File

@@ -18,7 +18,7 @@ default = ["e2e-encryption", "state-store", "event-cache-store"]
event-cache-store = ["dep:matrix-sdk-base", "media-store"]
media-store = ["dep:matrix-sdk-base"]
state-store = ["dep:matrix-sdk-base", "growable-bloom-filter"]
e2e-encryption = ["dep:matrix-sdk-crypto"]
e2e-encryption = ["dep:matrix-sdk-base", "dep:matrix-sdk-crypto"]
testing = ["matrix-sdk-crypto?/testing"]
experimental-encrypted-state-events = [
"matrix-sdk-crypto?/experimental-encrypted-state-events"

View File

@@ -29,6 +29,7 @@ use crate::{crypto_store::Result, serializer::SafeEncodeSerializer, IndexeddbCry
mod old_keys;
mod v0_to_v5;
mod v101_to_v102;
mod v10_to_v11;
mod v11_to_v12;
mod v12_to_v13;
@@ -186,6 +187,10 @@ pub async fn open_and_upgrade_db(
v14_to_v101::schema_delete(name).await?;
}
if old_version < 102 {
v101_to_v102::schema_add(name).await?;
}
// If you add more migrations here, you'll need to update
// `tests::EXPECTED_SCHEMA_VERSION`.
@@ -290,7 +295,7 @@ mod tests {
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
/// The schema version we expect after we open the store.
const EXPECTED_SCHEMA_VERSION: u32 = 101;
const EXPECTED_SCHEMA_VERSION: u32 = 102;
/// Adjust this to test do a more comprehensive perf test
const NUM_RECORDS_FOR_PERF: usize = 2_000;

View File

@@ -0,0 +1,34 @@
// Copyright 2025 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use indexed_db_futures::{error::OpenDbError, Build};
use crate::crypto_store::{keys, migrations::do_schema_upgrade, Result};
/// Perform the schema upgrade v101 to v102, add the `lease_locks` table.
///
/// Note: it's not trivial to delete old lease locks in the `keys::CORE` table,
/// because we don't know which keys were associated to them. We consider it's
/// fine because it represents a tiny amount of data (maybe 2 rows, with the
/// “`Lease` type” being quite small). To achieve so, we could read all rows in
/// `keys::CORE`, try to deserialize to the `Lease` type, and act accordingly,
/// but each store may have its own `Lease` type. Also note that this
/// `matrix-sdk-indexeddb`
pub(crate) async fn schema_add(name: &str) -> Result<(), OpenDbError> {
do_schema_upgrade(name, 102, |tx, _| {
tx.db().create_object_store(keys::LEASE_LOCKS).build()?;
Ok(())
})
.await
}

View File

@@ -30,6 +30,9 @@ use indexed_db_futures::{
KeyRange,
};
use js_sys::Array;
use matrix_sdk_base::cross_process_lock::{
CrossProcessLockGeneration, FIRST_CROSS_PROCESS_LOCK_GENERATION,
};
use matrix_sdk_crypto::{
olm::{
Curve25519PublicKey, InboundGroupSession, OlmMessageHash, OutboundGroupSession,
@@ -97,6 +100,8 @@ mod keys {
pub const RECEIVED_ROOM_KEY_BUNDLES: &str = "received_room_key_bundles";
pub const LEASE_LOCKS: &str = "lease_locks";
// keys
pub const STORE_CIPHER: &str = "store_cipher";
pub const ACCOUNT: &str = "account";
@@ -1589,53 +1594,68 @@ impl_crypto_store! {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool> {
) -> Result<Option<CrossProcessLockGeneration>> {
// As of 2023-06-23, the code below hasn't been tested yet.
let key = JsValue::from_str(key);
let txn =
self.inner.transaction(keys::CORE).with_mode(TransactionMode::Readwrite).build()?;
let object_store = txn.object_store(keys::CORE)?;
let txn = self
.inner
.transaction(keys::LEASE_LOCKS)
.with_mode(TransactionMode::Readwrite)
.build()?;
let object_store = txn.object_store(keys::LEASE_LOCKS)?;
#[derive(serde::Deserialize, serde::Serialize)]
#[derive(Deserialize, Serialize)]
struct Lease {
holder: String,
expiration_ts: u64,
expiration: u64,
generation: CrossProcessLockGeneration,
}
let now_ts: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
let expiration_ts = now_ts + lease_duration_ms as u64;
let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
let expiration = now + lease_duration_ms as u64;
let prev = object_store.get(&key).await?;
match prev {
Some(prev) => {
let lease: Lease = self.serializer.deserialize_value(prev)?;
if lease.holder == holder || lease.expiration_ts < now_ts {
object_store
.put(
&self.serializer.serialize_value(&Lease {
holder: holder.to_owned(),
expiration_ts,
})?,
)
.with_key(key)
.build()?;
Ok(true)
let lease = match object_store.get(&key).await? {
Some(entry) => {
let mut lease: Lease = self.serializer.deserialize_value(entry)?;
if lease.holder == holder {
// We had the lease before, extend it.
lease.expiration = expiration;
Some(lease)
} else {
Ok(false)
// We didn't have it.
if lease.expiration < now {
// Steal it!
lease.holder = holder.to_owned();
lease.expiration = expiration;
lease.generation += 1;
Some(lease)
} else {
// We tried our best.
None
}
}
}
None => {
object_store
.put(
&self
.serializer
.serialize_value(&Lease { holder: holder.to_owned(), expiration_ts })?,
)
.with_key(key)
.build()?;
Ok(true)
let lease = Lease {
holder: holder.to_owned(),
expiration,
generation: FIRST_CROSS_PROCESS_LOCK_GENERATION,
};
Some(lease)
}
}
};
Ok(if let Some(lease) = lease {
object_store.put(&self.serializer.serialize_value(&lease)?).with_key(key).build()?;
Some(lease.generation)
} else {
None
})
}
}

View File

@@ -15,15 +15,16 @@
use indexed_db_futures::{
database::Database,
error::{DomException, Error, OpenDbError},
transaction::Transaction,
};
use thiserror::Error;
/// The current version and keys used in the database.
pub mod current {
use super::{v1, Version};
use super::{v2, Version};
pub const VERSION: Version = Version::V1;
pub use v1::keys;
pub const VERSION: Version = Version::V2;
pub use v2::keys;
}
/// Opens a connection to the IndexedDB database and takes care of upgrading it
@@ -35,7 +36,7 @@ pub async fn open_and_upgrade_db(name: &str) -> Result<Database, OpenDbError> {
.with_on_upgrade_needed(|event, transaction| {
let mut version = Version::try_from(event.old_version() as u32)?;
while version < current::VERSION {
version = match version.upgrade(transaction.db())? {
version = match version.upgrade(transaction)? {
Some(next) => next,
None => current::VERSION, /* No more upgrades to apply, jump forward! */
};
@@ -49,18 +50,21 @@ pub async fn open_and_upgrade_db(name: &str) -> Result<Database, OpenDbError> {
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[repr(u32)]
pub enum Version {
/// Version 0 of the database, for details see [`v0`]
/// Version 0 of the database, for details see [`v0`].
V0 = 0,
/// Version 1 of the database, for details see [`v1`]
/// Version 1 of the database, for details see [`v1`].
V1 = 1,
/// Version 2 of the database, for details see [`v2`].
V2 = 2,
}
impl Version {
/// Upgrade the database to the next version, if one exists.
pub fn upgrade(self, db: &Database) -> Result<Option<Self>, Error> {
pub fn upgrade(self, transaction: &Transaction<'_>) -> Result<Option<Self>, Error> {
match self {
Self::V0 => v0::upgrade(db).map(Some),
Self::V1 => Ok(None),
Self::V0 => v0::upgrade(transaction).map(Some),
Self::V1 => v1::upgrade(transaction).map(Some),
Self::V2 => Ok(None),
}
}
}
@@ -76,6 +80,7 @@ impl TryFrom<u32> for Version {
match value {
0 => Ok(Version::V0),
1 => Ok(Version::V1),
2 => Ok(Version::V2),
v => Err(UnknownVersionError(v)),
}
}
@@ -96,8 +101,8 @@ pub mod v0 {
use super::*;
/// Upgrade database from `v0` to `v1`
pub fn upgrade(db: &Database) -> Result<Version, Error> {
v1::create_object_stores(db)?;
pub fn upgrade(transaction: &Transaction<'_>) -> Result<Version, Error> {
v1::create_object_stores(transaction.db())?;
Ok(Version::V1)
}
}
@@ -197,4 +202,27 @@ pub mod v1 {
db.create_object_store(keys::GAPS).with_key_path(keys::GAPS_KEY_PATH.into()).build()?;
Ok(())
}
/// Upgrade database from `v1` to `v2`
pub fn upgrade(transaction: &Transaction<'_>) -> Result<Version, Error> {
v2::empty_leases(transaction)?;
Ok(Version::V2)
}
}
mod v2 {
// Re-use all the same keys from `v1`.
pub use super::v1::keys;
use super::*;
/// The format of [`Lease`][super::super::types::Lease] is changing. Let's
/// erase previous values.
pub fn empty_leases(transaction: &Transaction<'_>) -> Result<(), Error> {
let object_store = transaction.object_store(keys::LEASES)?;
// Remove all previous leases.
object_store.clear()?;
Ok(())
}
}

View File

@@ -17,6 +17,10 @@
use std::{rc::Rc, time::Duration};
use indexed_db_futures::{database::Database, Build};
#[cfg(target_family = "wasm")]
use matrix_sdk_base::cross_process_lock::{
CrossProcessLockGeneration, FIRST_CROSS_PROCESS_LOCK_GENERATION,
};
use matrix_sdk_base::{
event_cache::{store::EventCacheStore, Event, Gap},
linked_chunk::{
@@ -103,29 +107,57 @@ impl EventCacheStore for IndexeddbEventCacheStore {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, IndexeddbEventCacheStoreError> {
) -> Result<Option<CrossProcessLockGeneration>, IndexeddbEventCacheStoreError> {
let _timer = timer!("method");
let now = Duration::from_millis(MilliSecondsSinceUnixEpoch::now().get().into());
let transaction =
self.transaction(&[Lease::OBJECT_STORE], IdbTransactionMode::Readwrite)?;
if let Some(lease) = transaction.get_lease_by_id(key).await? {
if lease.holder != holder && !lease.has_expired(now) {
return Ok(false);
}
}
let now = Duration::from_millis(MilliSecondsSinceUnixEpoch::now().get().into());
let expiration = now + Duration::from_millis(lease_duration_ms.into());
transaction
.put_lease(&Lease {
key: key.to_owned(),
holder: holder.to_owned(),
expiration: now + Duration::from_millis(lease_duration_ms.into()),
})
.await?;
transaction.commit().await?;
Ok(true)
let lease = match transaction.get_lease_by_id(key).await? {
Some(mut lease) => {
if lease.holder == holder {
// We had the lease before, extend it.
lease.expiration = expiration;
Some(lease)
} else {
// We didn't have it.
if lease.expiration < now {
// Steal it!
lease.holder = holder.to_owned();
lease.expiration = expiration;
lease.generation += 1;
Some(lease)
} else {
// We tried our best.
None
}
}
}
None => {
let lease = Lease {
key: key.to_owned(),
holder: holder.to_owned(),
expiration,
generation: FIRST_CROSS_PROCESS_LOCK_GENERATION,
};
Some(lease)
}
};
Ok(if let Some(lease) = lease {
transaction.put_lease(&lease).await?;
transaction.commit().await?;
Some(lease.generation)
} else {
None
})
}
#[instrument(skip(self, updates))]

View File

@@ -15,6 +15,7 @@
use std::time::Duration;
use matrix_sdk_base::{
cross_process_lock::CrossProcessLockGeneration,
deserialized_responses::TimelineEvent,
event_cache::store::extract_event_relation,
linked_chunk::{ChunkIdentifier, LinkedChunkId, OwnedLinkedChunkId},
@@ -29,13 +30,7 @@ pub struct Lease {
pub key: String,
pub holder: String,
pub expiration: Duration,
}
impl Lease {
/// Determines whether the lease is expired at a given time `t`
pub fn has_expired(&self, t: Duration) -> bool {
self.expiration < t
}
pub generation: CrossProcessLockGeneration,
}
/// Representation of a [`Chunk`](matrix_sdk_base::linked_chunk::Chunk)

View File

@@ -15,15 +15,16 @@
use indexed_db_futures::{
database::Database,
error::{DomException, Error, OpenDbError},
transaction::Transaction,
};
use thiserror::Error;
/// The current version and keys used in the database.
pub mod current {
use super::{v1, Version};
use super::{v2, Version};
pub const VERSION: Version = Version::V1;
pub use v1::keys;
pub const VERSION: Version = Version::V2;
pub use v2::keys;
}
/// Opens a connection to the IndexedDB database and takes care of upgrading it
@@ -35,7 +36,7 @@ pub async fn open_and_upgrade_db(name: &str) -> Result<Database, OpenDbError> {
.with_on_upgrade_needed(|event, transaction| {
let mut version = Version::try_from(event.old_version() as u32)?;
while version < current::VERSION {
version = match version.upgrade(transaction.db())? {
version = match version.upgrade(transaction)? {
Some(next) => next,
None => current::VERSION, /* No more upgrades to apply, jump forward! */
};
@@ -49,18 +50,21 @@ pub async fn open_and_upgrade_db(name: &str) -> Result<Database, OpenDbError> {
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[repr(u32)]
pub enum Version {
/// Version 0 of the database, for details see [`v0`]
/// Version 0 of the database, for details see [`v0`].
V0 = 0,
/// Version 1 of the database, for details see [`v1`]
/// Version 1 of the database, for details see [`v1`].
V1 = 1,
/// Version 2 of the database, for details see [`v2`].
V2 = 2,
}
impl Version {
/// Upgrade the database to the next version, if one exists.
pub fn upgrade(self, db: &Database) -> Result<Option<Self>, Error> {
pub fn upgrade(self, transaction: &Transaction<'_>) -> Result<Option<Self>, Error> {
match self {
Self::V0 => v0::upgrade(db).map(Some),
Self::V1 => Ok(None),
Self::V0 => v0::upgrade(transaction).map(Some),
Self::V1 => v1::upgrade(transaction).map(Some),
Self::V2 => Ok(None),
}
}
}
@@ -76,6 +80,7 @@ impl TryFrom<u32> for Version {
match value {
0 => Ok(Version::V0),
1 => Ok(Version::V1),
2 => Ok(Version::V2),
v => Err(UnknownVersionError(v)),
}
}
@@ -96,8 +101,8 @@ pub mod v0 {
use super::*;
/// Upgrade database from `v0` to `v1`
pub fn upgrade(db: &Database) -> Result<Version, Error> {
v1::create_object_stores(db)?;
pub fn upgrade(transaction: &Transaction<'_>) -> Result<Version, Error> {
v1::create_object_stores(transaction.db())?;
Ok(Version::V1)
}
}
@@ -208,4 +213,27 @@ pub mod v1 {
.build()?;
Ok(())
}
/// Upgrade database from `v1` to `v2`
pub fn upgrade(transaction: &Transaction<'_>) -> Result<Version, Error> {
v2::empty_leases(transaction)?;
Ok(Version::V2)
}
}
mod v2 {
// Re-use all the same keys from `v1`.
pub use super::v1::keys;
use super::*;
/// The format of [`Lease`][super::super::types::Lease] is changing. Let's
/// erase previous values.
pub fn empty_leases(transaction: &Transaction<'_>) -> Result<(), Error> {
let object_store = transaction.object_store(keys::LEASES)?;
// Remove all previous leases.
object_store.clear()?;
Ok(())
}
}

View File

@@ -31,6 +31,10 @@ pub use error::IndexeddbMediaStoreError;
use indexed_db_futures::{
cursor::CursorDirection, database::Database, transaction::TransactionMode, Build,
};
#[cfg(target_family = "wasm")]
use matrix_sdk_base::cross_process_lock::{
CrossProcessLockGeneration, FIRST_CROSS_PROCESS_LOCK_GENERATION,
};
use matrix_sdk_base::{
media::{
store::{
@@ -106,28 +110,56 @@ impl MediaStore for IndexeddbMediaStore {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, IndexeddbMediaStoreError> {
) -> Result<Option<CrossProcessLockGeneration>, IndexeddbMediaStoreError> {
let _timer = timer!("method");
let now = Duration::from_millis(MilliSecondsSinceUnixEpoch::now().get().into());
let transaction = self.transaction(&[Lease::OBJECT_STORE], TransactionMode::Readwrite)?;
if let Some(lease) = transaction.get_lease_by_id(key).await? {
if lease.holder != holder && !lease.has_expired(now) {
return Ok(false);
}
}
let now = Duration::from_millis(MilliSecondsSinceUnixEpoch::now().get().into());
let expiration = now + Duration::from_millis(lease_duration_ms.into());
transaction
.put_lease(&Lease {
key: key.to_owned(),
holder: holder.to_owned(),
expiration: now + Duration::from_millis(lease_duration_ms.into()),
})
.await?;
transaction.commit().await?;
Ok(true)
let lease = match transaction.get_lease_by_id(key).await? {
Some(mut lease) => {
if lease.holder == holder {
// We had the lease before, extend it.
lease.expiration = expiration;
Some(lease)
} else {
// We didn't have it.
if lease.expiration < now {
// Steal it!
lease.holder = holder.to_owned();
lease.expiration = expiration;
lease.generation += 1;
Some(lease)
} else {
// We tried our best.
None
}
}
}
None => {
let lease = Lease {
key: key.to_owned(),
holder: holder.to_owned(),
expiration,
generation: FIRST_CROSS_PROCESS_LOCK_GENERATION,
};
Some(lease)
}
};
Ok(if let Some(lease) = lease {
transaction.put_lease(&lease).await?;
transaction.commit().await?;
Some(lease.generation)
} else {
None
})
}
#[instrument(skip_all)]

View File

@@ -17,7 +17,10 @@ use std::{
time::Duration,
};
use matrix_sdk_base::media::{store::IgnoreMediaRetentionPolicy, MediaRequestParameters};
use matrix_sdk_base::{
cross_process_lock::CrossProcessLockGeneration,
media::{store::IgnoreMediaRetentionPolicy, MediaRequestParameters},
};
use ruma::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
@@ -29,13 +32,7 @@ pub struct Lease {
pub key: String,
pub holder: String,
pub expiration: Duration,
}
impl Lease {
/// Determines whether the lease is expired at a given time `t`
pub fn has_expired(&self, t: Duration) -> bool {
self.expiration < t
}
pub generation: CrossProcessLockGeneration,
}
/// A representation of media which ignores storage schemas. This is type is not