From db91bb35eef8421566e3aae79df032f0284d9340 Mon Sep 17 00:00:00 2001 From: Michael Goldenberg Date: Wed, 1 Oct 2025 21:32:32 -0400 Subject: [PATCH] refactor(indexeddb): add transaction fns for getting and operating on keys Signed-off-by: Michael Goldenberg --- Cargo.lock | 1 + crates/matrix-sdk-indexeddb/Cargo.toml | 1 + .../src/transaction/mod.rs | 80 +++++++++++++++++++ 3 files changed, 82 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 3ff4cd0d5..ca900f45f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3482,6 +3482,7 @@ dependencies = [ "assert_matches2", "async-trait", "base64", + "futures-util", "getrandom 0.2.15", "gloo-timers", "gloo-utils", diff --git a/crates/matrix-sdk-indexeddb/Cargo.toml b/crates/matrix-sdk-indexeddb/Cargo.toml index 8c6d39689..b7ceed1f7 100644 --- a/crates/matrix-sdk-indexeddb/Cargo.toml +++ b/crates/matrix-sdk-indexeddb/Cargo.toml @@ -27,6 +27,7 @@ experimental-encrypted-state-events = [ [dependencies] async-trait.workspace = true base64.workspace = true +futures-util.workspace = true gloo-utils = { version = "0.2.0", features = ["serde"] } growable-bloom-filter = { workspace = true, optional = true } hkdf.workspace = true diff --git a/crates/matrix-sdk-indexeddb/src/transaction/mod.rs b/crates/matrix-sdk-indexeddb/src/transaction/mod.rs index 4a2638547..c3775f326 100644 --- a/crates/matrix-sdk-indexeddb/src/transaction/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/transaction/mod.rs @@ -18,6 +18,7 @@ // clean up any dead code. #![allow(dead_code)] +use futures_util::TryStreamExt; use indexed_db_futures::{ internals::SystemRepr, query_source::QuerySource, transaction as inner, BuildSerde, }; @@ -264,6 +265,85 @@ impl<'a> Transaction<'a> { Ok(None) } + /// Query IndexedDB for keys that match the given key range. + pub async fn get_keys( + &self, + range: impl Into>, + ) -> Result, TransactionError> + where + T: Indexed, + K: IndexedKey + Serialize + DeserializeOwned, + { + let range = self.serializer.encode_key_range::(range); + let object_store = self.transaction.object_store(T::OBJECT_STORE)?; + if let Some(index) = K::INDEX { + let index = object_store.index(index)?; + if let Some(cursor) = index.open_key_cursor().with_query(range).serde()?.await? { + return cursor.key_stream_ser().try_collect().await.map_err(Into::into); + } + } else if let Some(cursor) = + object_store.open_key_cursor().with_query(range).serde()?.await? + { + return cursor.key_stream_ser().try_collect().await.map_err(Into::into); + } + Ok(Vec::new()) + } + + /// Query IndexedDB for keys that match the given key range. Iterate over + /// the keys in the given [`direction`](CursorDirection) using a cursor and + /// fold them into an accumulator while the given function `f` returns + /// [`Some`]. + /// + /// This function returns the final value of the accumulator and the key, if + /// any, which caused the fold to short circuit. + /// + /// Note that the use of cursor means that keys are read lazily from + /// IndexedDB. + pub async fn fold_keys_while( + &self, + direction: IdbCursorDirection, + range: impl Into>, + init: Acc, + mut f: F, + ) -> Result<(Acc, Option), TransactionError> + where + T: Indexed, + K: IndexedKey + Serialize + DeserializeOwned, + F: FnMut(&Acc, &K) -> Option, + { + let range = self.serializer.encode_key_range::(range); + let object_store = self.transaction.object_store(T::OBJECT_STORE)?; + + let mut state = init; + if let Some(index) = K::INDEX { + let index = object_store.index(index)?; + if let Some(mut cursor) = + index.open_key_cursor().with_query(range).with_direction(direction).serde()?.await? + { + while let Some(key) = cursor.next_key_ser().await? { + match f(&state, &key) { + Some(s) => state = s, + None => return Ok((state, Some(key))), + } + } + } + } else if let Some(mut cursor) = object_store + .open_key_cursor() + .with_query(range) + .with_direction(direction) + .serde()? + .await? + { + while let Some(key) = cursor.next_key_ser().await? { + match f(&state, &key) { + Some(s) => state = s, + None => return Ok((state, Some(key))), + } + } + } + Ok((state, None)) + } + /// Adds an item to the corresponding IndexedDB object /// store, i.e., `T::OBJECT_STORE`. If an item with the same key already /// exists, it will be rejected.