refactor(indexeddb): add transaction fns for getting and operating on keys

Signed-off-by: Michael Goldenberg <m@mgoldenberg.net>
This commit is contained in:
Michael Goldenberg
2025-10-01 21:32:32 -04:00
committed by Benjamin Bouvier
parent 3b7dbf5c04
commit db91bb35ee
3 changed files with 82 additions and 0 deletions

1
Cargo.lock generated
View File

@@ -3482,6 +3482,7 @@ dependencies = [
"assert_matches2",
"async-trait",
"base64",
"futures-util",
"getrandom 0.2.15",
"gloo-timers",
"gloo-utils",

View File

@@ -27,6 +27,7 @@ experimental-encrypted-state-events = [
[dependencies]
async-trait.workspace = true
base64.workspace = true
futures-util.workspace = true
gloo-utils = { version = "0.2.0", features = ["serde"] }
growable-bloom-filter = { workspace = true, optional = true }
hkdf.workspace = true

View File

@@ -18,6 +18,7 @@
// clean up any dead code.
#![allow(dead_code)]
use futures_util::TryStreamExt;
use indexed_db_futures::{
internals::SystemRepr, query_source::QuerySource, transaction as inner, BuildSerde,
};
@@ -264,6 +265,85 @@ impl<'a> Transaction<'a> {
Ok(None)
}
/// Query IndexedDB for keys that match the given key range.
pub async fn get_keys<T, K>(
&self,
range: impl Into<IndexedKeyRange<K>>,
) -> Result<Vec<K>, TransactionError>
where
T: Indexed,
K: IndexedKey<T> + Serialize + DeserializeOwned,
{
let range = self.serializer.encode_key_range::<T, K>(range);
let object_store = self.transaction.object_store(T::OBJECT_STORE)?;
if let Some(index) = K::INDEX {
let index = object_store.index(index)?;
if let Some(cursor) = index.open_key_cursor().with_query(range).serde()?.await? {
return cursor.key_stream_ser().try_collect().await.map_err(Into::into);
}
} else if let Some(cursor) =
object_store.open_key_cursor().with_query(range).serde()?.await?
{
return cursor.key_stream_ser().try_collect().await.map_err(Into::into);
}
Ok(Vec::new())
}
/// Query IndexedDB for keys that match the given key range. Iterate over
/// the keys in the given [`direction`](CursorDirection) using a cursor and
/// fold them into an accumulator while the given function `f` returns
/// [`Some`].
///
/// This function returns the final value of the accumulator and the key, if
/// any, which caused the fold to short circuit.
///
/// Note that the use of cursor means that keys are read lazily from
/// IndexedDB.
pub async fn fold_keys_while<T, K, Acc, F>(
&self,
direction: IdbCursorDirection,
range: impl Into<IndexedKeyRange<K>>,
init: Acc,
mut f: F,
) -> Result<(Acc, Option<K>), TransactionError>
where
T: Indexed,
K: IndexedKey<T> + Serialize + DeserializeOwned,
F: FnMut(&Acc, &K) -> Option<Acc>,
{
let range = self.serializer.encode_key_range::<T, K>(range);
let object_store = self.transaction.object_store(T::OBJECT_STORE)?;
let mut state = init;
if let Some(index) = K::INDEX {
let index = object_store.index(index)?;
if let Some(mut cursor) =
index.open_key_cursor().with_query(range).with_direction(direction).serde()?.await?
{
while let Some(key) = cursor.next_key_ser().await? {
match f(&state, &key) {
Some(s) => state = s,
None => return Ok((state, Some(key))),
}
}
}
} else if let Some(mut cursor) = object_store
.open_key_cursor()
.with_query(range)
.with_direction(direction)
.serde()?
.await?
{
while let Some(key) = cursor.next_key_ser().await? {
match f(&state, &key) {
Some(s) => state = s,
None => return Ok((state, Some(key))),
}
}
}
Ok((state, None))
}
/// Adds an item to the corresponding IndexedDB object
/// store, i.e., `T::OBJECT_STORE`. If an item with the same key already
/// exists, it will be rejected.