From 975b08c019bf2ef14baf591db70affd06a770cf5 Mon Sep 17 00:00:00 2001 From: Michael Goldenberg Date: Mon, 15 Sep 2025 21:00:43 -0400 Subject: [PATCH] refactor(indexeddb): add generalized transaction type and error Signed-off-by: Michael Goldenberg --- crates/matrix-sdk-indexeddb/src/lib.rs | 2 + .../src/transaction/mod.rs | 324 ++++++++++++++++++ 2 files changed, 326 insertions(+) create mode 100644 crates/matrix-sdk-indexeddb/src/transaction/mod.rs diff --git a/crates/matrix-sdk-indexeddb/src/lib.rs b/crates/matrix-sdk-indexeddb/src/lib.rs index be94947de..932c013d4 100644 --- a/crates/matrix-sdk-indexeddb/src/lib.rs +++ b/crates/matrix-sdk-indexeddb/src/lib.rs @@ -18,6 +18,8 @@ mod serialize_bool_for_indexeddb; mod serializer; #[cfg(feature = "state-store")] mod state_store; +#[cfg(any(feature = "event-cache-store", feature = "media-store"))] +mod transaction; #[cfg(feature = "e2e-encryption")] pub use crypto_store::{IndexeddbCryptoStore, IndexeddbCryptoStoreError}; diff --git a/crates/matrix-sdk-indexeddb/src/transaction/mod.rs b/crates/matrix-sdk-indexeddb/src/transaction/mod.rs new file mode 100644 index 000000000..21a9b1da8 --- /dev/null +++ b/crates/matrix-sdk-indexeddb/src/transaction/mod.rs @@ -0,0 +1,324 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +use indexed_db_futures::{prelude::IdbTransaction, IdbQuerySource}; +use matrix_sdk_base::media::store::{MediaRetentionPolicy, MediaStoreError}; +use serde::{ + de::{DeserializeOwned, Error}, + Serialize, +}; +use thiserror::Error; +use web_sys::IdbCursorDirection; + +use crate::{ + error::AsyncErrorDeps, + serializer::{Indexed, IndexedKey, IndexedKeyRange, IndexedTypeSerializer}, +}; + +#[derive(Debug, Error)] +pub enum TransactionError { + #[error("DomException {name} ({code}): {message}")] + DomException { name: String, message: String, code: u16 }, + #[error("serialization: {0}")] + Serialization(Box), + #[error("item is not unique")] + ItemIsNotUnique, + #[error("item not found")] + ItemNotFound, +} + +impl From for TransactionError { + fn from(value: web_sys::DomException) -> Self { + Self::DomException { name: value.name(), message: value.message(), code: value.code() } + } +} + +impl From for TransactionError { + fn from(e: serde_wasm_bindgen::Error) -> Self { + Self::Serialization(Box::new(serde_json::Error::custom(e.to_string()))) + } +} + +/// Represents an IndexedDB transaction, but provides a convenient interface for +/// performing operations on types that implement [`Indexed`] and related +/// traits. +pub struct Transaction<'a> { + transaction: IdbTransaction<'a>, + serializer: &'a IndexedTypeSerializer, +} + +impl<'a> Transaction<'a> { + pub fn new(transaction: IdbTransaction<'a>, serializer: &'a IndexedTypeSerializer) -> Self { + Self { transaction, serializer } + } + + /// Returns the underlying IndexedDB transaction. + pub fn into_inner(self) -> IdbTransaction<'a> { + self.transaction + } + + /// Commit all operations tracked in this transaction to IndexedDB. + pub async fn commit(self) -> Result<(), TransactionError> { + self.transaction.await.into_result().map_err(Into::into) + } + + /// Query IndexedDB for items that match the given key range + pub async fn get_items_by_key( + &self, + range: impl Into>, + ) -> Result, TransactionError> + where + T: Indexed, + T::IndexedType: DeserializeOwned, + T::Error: AsyncErrorDeps, + K: IndexedKey + Serialize, + { + let range = self.serializer.encode_key_range::(range)?; + let object_store = self.transaction.object_store(T::OBJECT_STORE)?; + let array = if let Some(index) = K::INDEX { + object_store.index(index)?.get_all_with_key(&range)?.await? + } else { + object_store.get_all_with_key(&range)?.await? + }; + let mut items = Vec::with_capacity(array.length() as usize); + for value in array { + let item = self + .serializer + .deserialize(value) + .map_err(|e| TransactionError::Serialization(Box::new(e)))?; + items.push(item); + } + Ok(items) + } + + /// Query IndexedDB for items that match the given key component range + pub async fn get_items_by_key_components<'b, T, K>( + &self, + range: impl Into>>, + ) -> Result, TransactionError> + where + T: Indexed + 'b, + T::IndexedType: DeserializeOwned, + T::Error: AsyncErrorDeps, + K: IndexedKey + Serialize + 'b, + { + let range: IndexedKeyRange = range.into().encoded(self.serializer.inner()); + self.get_items_by_key::(range).await + } + + /// Query IndexedDB for items that match the given key. If + /// more than one item is found, an error is returned. + pub async fn get_item_by_key(&self, key: K) -> Result, TransactionError> + where + T: Indexed, + T::IndexedType: DeserializeOwned, + T::Error: AsyncErrorDeps, + K: IndexedKey + Serialize, + { + let mut items = self.get_items_by_key::(key).await?; + if items.len() > 1 { + return Err(TransactionError::ItemIsNotUnique); + } + Ok(items.pop()) + } + + /// Query IndexedDB for items that match the given key components. If more + /// than one item is found, an error is returned. + pub async fn get_item_by_key_components<'b, T, K>( + &self, + components: K::KeyComponents<'b>, + ) -> Result, TransactionError> + where + T: Indexed + 'b, + T::IndexedType: DeserializeOwned, + T::Error: AsyncErrorDeps, + K: IndexedKey + Serialize + 'b, + { + let mut items = self.get_items_by_key_components::(components).await?; + if items.len() > 1 { + return Err(TransactionError::ItemIsNotUnique); + } + Ok(items.pop()) + } + + /// Query IndexedDB for the number of items that match the given key range. + pub async fn get_items_count_by_key( + &self, + range: impl Into>, + ) -> Result + where + T: Indexed, + T::IndexedType: DeserializeOwned, + T::Error: AsyncErrorDeps, + K: IndexedKey + Serialize, + { + let range = self.serializer.encode_key_range::(range)?; + let object_store = self.transaction.object_store(T::OBJECT_STORE)?; + let count = if let Some(index) = K::INDEX { + object_store.index(index)?.count_with_key(&range)?.await? + } else { + object_store.count_with_key(&range)?.await? + }; + Ok(count as usize) + } + + /// Query IndexedDB for the number of items that match the given key + /// components range. + pub async fn get_items_count_by_key_components<'b, T, K>( + &self, + range: impl Into>>, + ) -> Result + where + T: Indexed + 'b, + T::IndexedType: DeserializeOwned, + T::Error: AsyncErrorDeps, + K: IndexedKey + Serialize + 'b, + { + let range: IndexedKeyRange = range.into().encoded(self.serializer.inner()); + self.get_items_count_by_key::(range).await + } + + /// Query IndexedDB for the item with the maximum key in the given range. + pub async fn get_max_item_by_key( + &self, + range: impl Into>, + ) -> Result, TransactionError> + where + T: Indexed, + T::IndexedType: DeserializeOwned, + T::Error: AsyncErrorDeps, + K: IndexedKey + Serialize, + { + let range = self.serializer.encode_key_range::(range)?; + let direction = IdbCursorDirection::Prev; + let object_store = self.transaction.object_store(T::OBJECT_STORE)?; + if let Some(index) = K::INDEX { + object_store + .index(index)? + .open_cursor_with_range_and_direction(&range, direction)? + .await? + .map(|cursor| self.serializer.deserialize(cursor.value())) + .transpose() + .map_err(|e| TransactionError::Serialization(Box::new(e))) + } else { + object_store + .open_cursor_with_range_and_direction(&range, direction)? + .await? + .map(|cursor| self.serializer.deserialize(cursor.value())) + .transpose() + .map_err(|e| TransactionError::Serialization(Box::new(e))) + } + } + + /// Adds an item to the corresponding IndexedDB object + /// store, i.e., `T::OBJECT_STORE`. If an item with the same key already + /// exists, it will be rejected. + pub async fn add_item(&self, item: &T) -> Result<(), TransactionError> + where + T: Indexed + Serialize, + T::IndexedType: Serialize, + T::Error: AsyncErrorDeps, + { + self.transaction + .object_store(T::OBJECT_STORE)? + .add_val_owned( + self.serializer + .serialize(item) + .map_err(|e| TransactionError::Serialization(Box::new(e)))?, + )? + .await + .map_err(Into::into) + } + + /// Puts an item in the corresponding IndexedDB object + /// store, i.e., `T::OBJECT_STORE`. If an item with the same key already + /// exists, it will be overwritten. + pub async fn put_item(&self, item: &T) -> Result<(), TransactionError> + where + T: Indexed + Serialize, + T::IndexedType: Serialize, + T::Error: AsyncErrorDeps, + { + self.transaction + .object_store(T::OBJECT_STORE)? + .put_val_owned( + self.serializer + .serialize(item) + .map_err(|e| TransactionError::Serialization(Box::new(e)))?, + )? + .await + .map_err(Into::into) + } + + /// Delete items in given key range from IndexedDB + pub async fn delete_items_by_key( + &self, + range: impl Into>, + ) -> Result<(), TransactionError> + where + T: Indexed, + K: IndexedKey + Serialize, + { + let range = self.serializer.encode_key_range::(range)?; + let object_store = self.transaction.object_store(T::OBJECT_STORE)?; + if let Some(index) = K::INDEX { + let index = object_store.index(index)?; + if let Some(cursor) = index.open_cursor_with_range(&range)?.await? { + while cursor.key().is_some() { + cursor.delete()?.await?; + cursor.continue_cursor()?.await?; + } + } + } else { + object_store.delete_owned(&range)?.await?; + } + Ok(()) + } + + /// Delete items in the given key component range from + /// IndexedDB + pub async fn delete_items_by_key_components<'b, T, K>( + &self, + range: impl Into>>, + ) -> Result<(), TransactionError> + where + T: Indexed + 'b, + K: IndexedKey + Serialize + 'b, + { + let range: IndexedKeyRange = range.into().encoded(self.serializer.inner()); + self.delete_items_by_key::(range).await + } + + /// Delete item that matches the given key components from + /// IndexedDB + pub async fn delete_item_by_key<'b, T, K>( + &self, + key: K::KeyComponents<'b>, + ) -> Result<(), TransactionError> + where + T: Indexed + 'b, + K: IndexedKey + Serialize + 'b, + { + self.delete_items_by_key_components::(key).await + } + + /// Clear all items of type `T` from the associated object store + /// `T::OBJECT_STORE` from IndexedDB + pub async fn clear(&self) -> Result<(), TransactionError> + where + T: Indexed, + { + self.transaction.object_store(T::OBJECT_STORE)?.clear()?.await.map_err(Into::into) + } +}