indexeddb: Update storage for inbound_group_sessions (#2885)

Currently, querying for inbound group sessions which need backing up is very
inefficient: we have to search through the whole list.

Here, we change the way they are stored so that we can maintain an index of the
ones that need a backup.

Fixes: https://github.com/vector-im/element-web/issues/26488
Fixes: https://github.com/matrix-org/matrix-rust-sdk/issues/2877

---

* indexeddb: Update storage for inbound_group_sessions

Currently, querying for inbound group sessions which need backing up is very
inefficient: we have to search through the whole list.

Here, we change the way they are stored so that we can maintain an index of the
ones that need a backup.

* Rename functions for clarity

* Remove spurious log line

This was a bit verbose

* Rename constants for i_g_s store names

* improve log messages

* add a warning

* Rename `InboundGroupSessionIndexedDbObject.data`

* formatting
This commit is contained in:
Richard van der Hoff
2023-11-30 11:01:20 +00:00
committed by GitHub
parent 8b04db666c
commit 1fcd5af526
4 changed files with 454 additions and 65 deletions

1
Cargo.lock generated
View File

@@ -3259,6 +3259,7 @@ dependencies = [
"thiserror",
"tokio",
"tracing",
"tracing-subscriber",
"uuid",
"wasm-bindgen",
"wasm-bindgen-test",

View File

@@ -49,5 +49,6 @@ matrix-sdk-base = { path = "../matrix-sdk-base", features = ["testing"] }
matrix-sdk-common = { path = "../matrix-sdk-common", features = ["js"] }
matrix-sdk-crypto = { path = "../matrix-sdk-crypto", features = ["js", "testing"] }
matrix-sdk-test = { path = "../../testing/matrix-sdk-test" }
tracing-subscriber = { version = "0.3.18", default-features = false, features = ["registry", "tracing-log"] }
uuid = "1.3.0"
wasm-bindgen-test = "0.3.33"

View File

@@ -13,15 +13,67 @@
// limitations under the License.
use indexed_db_futures::{prelude::*, web_sys::DomException};
use tracing::info;
use matrix_sdk_crypto::olm::InboundGroupSession;
use tracing::{debug, info};
use wasm_bindgen::JsValue;
use crate::crypto_store::{keys, Result};
use crate::{
crypto_store::{
indexeddb_serializer::IndexeddbSerializer, keys, InboundGroupSessionIndexedDbObject, Result,
},
IndexeddbCryptoStoreError,
};
mod old_keys {
/// Old format of the inbound_group_sessions store which lacked indexes or a
/// sensible structure
pub const INBOUND_GROUP_SESSIONS_V1: &str = "inbound_group_sessions";
}
/// Open the indexeddb with the given name, upgrading it to the latest version
/// of the schema if necessary.
pub async fn open_and_upgrade_db(name: &str) -> Result<IdbDatabase, DomException> {
let mut db_req: OpenDbRequest = IdbDatabase::open_u32(name, 5)?;
pub async fn open_and_upgrade_db(
name: &str,
serializer: &IndexeddbSerializer,
) -> Result<IdbDatabase, IndexeddbCryptoStoreError> {
// This is all a bit of a hack. Some of the version migrations require a data
// migration, which has to be done via async APIs; however, the
// JS `upgrade_needed` mechanism does not allow for async calls.
//
// Start by finding out what the existing version is, if any.
let db = IdbDatabase::open(name)?.await?;
let old_version = db.version() as u32;
db.close();
// If we have yet to complete the migration to V7, migrate the schema to V6
// (if necessary), and then migrate any remaining data.
if old_version <= 6 {
let db = migrate_schema_up_to_v6(name).await?;
migrate_data_for_v6(serializer, &db).await?;
db.close();
}
// Now we can safely complete the migration to V7 which will drop the old store.
let mut db_req: OpenDbRequest = IdbDatabase::open_u32(name, 7)?;
db_req.set_on_upgrade_needed(Some(|evt: &IdbVersionChangeEvent| -> Result<(), JsValue> {
let old_version = evt.old_version() as u32;
let new_version = evt.new_version() as u32;
info!(old_version, new_version, "Continuing IndexeddbCryptoStore upgrade");
if old_version < 7 {
migrate_stores_to_v7(evt.db())?;
}
info!(old_version, new_version, "IndexeddbCryptoStore upgrade complete");
Ok(())
}));
Ok(db_req.await?)
}
async fn migrate_schema_up_to_v6(name: &str) -> Result<IdbDatabase, DomException> {
let mut db_req: OpenDbRequest = IdbDatabase::open_u32(name, 6)?;
db_req.set_on_upgrade_needed(Some(|evt: &IdbVersionChangeEvent| -> Result<(), JsValue> {
// Even if the web-sys bindings expose the version as a f64, the IndexedDB API
@@ -30,40 +82,56 @@ pub async fn open_and_upgrade_db(name: &str) -> Result<IdbDatabase, DomException
let old_version = evt.old_version() as u32;
let new_version = evt.new_version() as u32;
info!(old_version, new_version, "Upgrading IndexeddbCryptoStore");
info!(old_version, new_version, "Upgrading IndexeddbCryptoStore, phase 1");
if old_version < 1 {
create_stores_for_v1(evt.db())?;
// An old_version of 1 could either mean actually the first version of the
// schema, or a completely empty schema that has been created with a
// call to `IdbDatabase::open` with no explicit "version". So, to determine
// if we need to create the V1 stores, we actually check if the schema is empty.
if evt.db().object_store_names().next().is_none() {
migrate_stores_to_v1(evt.db())?;
}
if old_version < 2 {
create_stores_for_v2(evt.db())?;
migrate_stores_to_v2(evt.db())?;
}
if old_version < 3 {
create_stores_for_v3(evt.db())?;
migrate_stores_to_v3(evt.db())?;
}
if old_version < 4 {
create_stores_for_v4(evt.db())?;
migrate_stores_to_v4(evt.db())?;
}
if old_version < 5 {
create_stores_for_v5(evt.db())?;
migrate_stores_to_v5(evt.db())?;
}
info!(old_version, new_version, "IndexeddbCryptoStore upgrade complete");
if old_version < 6 {
migrate_stores_to_v6(evt.db())?;
}
// NOTE! Further migrations must NOT be added here.
//
// At this point we need to start an asynchronous operation to migrate
// inbound_group_sessions to a new format. We then resume schema migrations
// afterwards.
//
// Further migrations can be added in `open_and_upgrade_db`.
info!(old_version, new_version, "IndexeddbCryptoStore upgrade phase 1 complete");
Ok(())
}));
db_req.await
}
fn create_stores_for_v1(db: &IdbDatabase) -> Result<(), DomException> {
fn migrate_stores_to_v1(db: &IdbDatabase) -> Result<(), DomException> {
db.create_object_store(keys::CORE)?;
db.create_object_store(keys::SESSION)?;
db.create_object_store(keys::INBOUND_GROUP_SESSIONS)?;
db.create_object_store(old_keys::INBOUND_GROUP_SESSIONS_V1)?;
db.create_object_store(keys::OUTBOUND_GROUP_SESSIONS)?;
db.create_object_store(keys::TRACKED_USERS)?;
db.create_object_store(keys::OLM_HASHES)?;
@@ -75,21 +143,21 @@ fn create_stores_for_v1(db: &IdbDatabase) -> Result<(), DomException> {
Ok(())
}
fn create_stores_for_v2(db: &IdbDatabase) -> Result<(), DomException> {
fn migrate_stores_to_v2(db: &IdbDatabase) -> Result<(), DomException> {
// We changed how we store inbound group sessions, the key used to
// be a tuple of `(room_id, sender_key, session_id)` now it's a
// tuple of `(room_id, session_id)`
//
// Let's just drop the whole object store.
db.delete_object_store(keys::INBOUND_GROUP_SESSIONS)?;
db.create_object_store(keys::INBOUND_GROUP_SESSIONS)?;
db.delete_object_store(old_keys::INBOUND_GROUP_SESSIONS_V1)?;
db.create_object_store(old_keys::INBOUND_GROUP_SESSIONS_V1)?;
db.create_object_store(keys::ROOM_SETTINGS)?;
Ok(())
}
fn create_stores_for_v3(db: &IdbDatabase) -> Result<(), DomException> {
fn migrate_stores_to_v3(db: &IdbDatabase) -> Result<(), DomException> {
// We changed the way we store outbound session.
// ShareInfo changed from a struct to an enum with struct variant.
// Let's just discard the existing outbounds
@@ -102,12 +170,12 @@ fn create_stores_for_v3(db: &IdbDatabase) -> Result<(), DomException> {
Ok(())
}
fn create_stores_for_v4(db: &IdbDatabase) -> Result<(), DomException> {
fn migrate_stores_to_v4(db: &IdbDatabase) -> Result<(), DomException> {
db.create_object_store(keys::SECRETS_INBOX)?;
Ok(())
}
fn create_stores_for_v5(db: &IdbDatabase) -> Result<(), DomException> {
fn migrate_stores_to_v5(db: &IdbDatabase) -> Result<(), DomException> {
// Create a new store for outgoing secret requests
let object_store = db.create_object_store(keys::GOSSIP_REQUESTS)?;
@@ -136,3 +204,225 @@ fn create_stores_for_v5(db: &IdbDatabase) -> Result<(), DomException> {
Ok(())
}
fn migrate_stores_to_v6(db: &IdbDatabase) -> Result<(), DomException> {
// We want to change the shape of the inbound group sessions store. To do so, we
// first need to build a new store, then copy all the data over.
//
// But copying the data needs to happen outside the database upgrade process
// (because it needs async calls). So, here we create a new store for
// inbound group sessions. We don't populate it yet; that happens once we
// have done the upgrade to v6, in `migrate_data_for_v6`. Finally we drop the
// old store in create_stores_for_v7.
let object_store = db.create_object_store(keys::INBOUND_GROUP_SESSIONS_V2)?;
let mut params = IdbIndexParameters::new();
params.unique(false);
object_store.create_index_with_params(
keys::INBOUND_GROUP_SESSIONS_BACKUP_INDEX,
&IdbKeyPath::str("needs_backup"),
&params,
)?;
Ok(())
}
async fn migrate_data_for_v6(serializer: &IndexeddbSerializer, db: &IdbDatabase) -> Result<()> {
// The new store has been made for inbound group sessions; time to populate it.
let txn = db.transaction_on_multi_with_mode(
&[old_keys::INBOUND_GROUP_SESSIONS_V1, keys::INBOUND_GROUP_SESSIONS_V2],
IdbTransactionMode::Readwrite,
)?;
let old_store = txn.object_store(old_keys::INBOUND_GROUP_SESSIONS_V1)?;
let new_store = txn.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?;
let row_count = old_store.count()?.await?;
info!(row_count, "Migrating inbound group session data from v1 to v2");
if let Some(cursor) = old_store.open_cursor()?.await? {
let mut idx = 0;
loop {
idx += 1;
let key = cursor.key().ok_or(matrix_sdk_crypto::CryptoStoreError::Backend(
"inbound_group_sessions v1 cursor has no key".into(),
))?;
let value = cursor.value();
if idx % 100 == 0 {
debug!("Migrating session {idx} of {row_count}");
}
let igs = InboundGroupSession::from_pickle(serializer.deserialize_value(value)?)
.map_err(|e| IndexeddbCryptoStoreError::CryptoStoreError(e.into()))?;
// This is much the same as `IndexeddbStore::serialize_inbound_group_session`.
let new_data = serde_wasm_bindgen::to_value(&InboundGroupSessionIndexedDbObject {
pickled_session: serializer.serialize_value_as_bytes(&igs.pickle().await)?,
needs_backup: !igs.backed_up(),
})?;
new_store.add_key_val(&key, &new_data)?;
// we are done with the original data, so delete it now.
cursor.delete()?;
if !cursor.continue_cursor()?.await? {
break;
}
}
}
Ok(txn.await.into_result()?)
}
fn migrate_stores_to_v7(db: &IdbDatabase) -> Result<(), DomException> {
db.delete_object_store(old_keys::INBOUND_GROUP_SESSIONS_V1)
}
#[cfg(all(test, target_arch = "wasm32"))]
mod tests {
use std::sync::Arc;
use indexed_db_futures::prelude::*;
use matrix_sdk_common::js_tracing::make_tracing_subscriber;
use matrix_sdk_crypto::{
olm::SessionKey,
store::CryptoStore,
types::EventEncryptionAlgorithm,
vodozemac::{Curve25519PublicKey, Curve25519SecretKey, Ed25519SecretKey},
};
use matrix_sdk_store_encryption::StoreCipher;
use matrix_sdk_test::async_test;
use ruma::room_id;
use tracing_subscriber::util::SubscriberInitExt;
use crate::{crypto_store::migrations::*, IndexeddbCryptoStore};
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
/// Test migrating `inbound_group_session` data from store v5 to store v7,
/// on a store with encryption disabled.
#[async_test]
async fn test_v7_migration_unencrypted() {
test_v7_migration_with_cipher("test_v7_migration_unencrypted", None).await
}
/// Test migrating `inbound_group_session` data from store v5 to store v7,
/// on a store with encryption enabled.
#[async_test]
async fn test_v7_migration_encrypted() {
let cipher = StoreCipher::new().unwrap();
test_v7_migration_with_cipher("test_v7_migration_encrypted", Some(Arc::new(cipher))).await;
}
/// Helper function for `test_v7_migration_{un,}encrypted`: test migrating
/// `inbound_group_session` data from store v5 to store v7.
async fn test_v7_migration_with_cipher(
db_prefix: &str,
store_cipher: Option<Arc<StoreCipher>>,
) {
let _ = make_tracing_subscriber(None).try_init();
let db_name = format!("{db_prefix:0}::matrix-sdk-crypto");
// delete the db in case it was used in a previous run
let _ = IdbDatabase::delete_by_name(&db_name);
// Schema V7 migrated the inbound group sessions to a new format.
// To test, first create a database and populate it with the *old* style of
// entry.
let db = create_v5_db(&db_name).await.unwrap();
let room_id = room_id!("!test:localhost");
let curve_key = Curve25519PublicKey::from(&Curve25519SecretKey::new());
let ed_key = Ed25519SecretKey::new().public_key();
// a backed-up session
let session1 = InboundGroupSession::new(
curve_key,
ed_key,
room_id,
&SessionKey::from_base64(
"AgAAAABTyn3CR8mzAxhsHH88td5DrRqfipJCnNbZeMrfzhON6O1Cyr9ewx/sDFLO6\
+NvyW92yGvMub7nuAEQb+SgnZLm7nwvuVvJgSZKpoJMVliwg8iY9TXKFT286oBtT2\
/8idy6TcpKax4foSHdMYlZXu5zOsGDdd9eYnYHpUEyDT0utuiaakZM3XBMNLEVDj9\
Ps929j1FGgne1bDeFVoty2UAOQK8s/0JJigbKSu6wQ/SzaCYpE/LD4Egk2Nxs1JE2\
33ii9J8RGPYOp7QWl0kTEc8mAlqZL7mKppo9AwgtmYweAg",
)
.unwrap(),
EventEncryptionAlgorithm::MegolmV1AesSha2,
None,
)
.unwrap();
session1.mark_as_backed_up();
// an un-backed-up session
let session2 = InboundGroupSession::new(
curve_key,
ed_key,
room_id,
&SessionKey::from_base64(
"AgAAAACO1PjBdqucFUcNFU6JgXYAi7KMeeUqUibaLm6CkHJcMiDTFWq/K5SFAukJc\
WjeyOpnZr4vpezRlbvNaQpNPMub2Cs2u14fHj9OpKFD7c4hFS4j94q4pTLZly3qEV\
BIjWdOpcIVfN7QVGVIxYiI6KHEddCHrNCo9fc8GUdfzrMnmUooQr/m4ZAkRdErzUH\
uUAlUBwOKcPi7Cs/KrMw/sHCRDkTntHZ3BOrzJsAVbHUgq+8/Sqy3YE+CX6uEnig+\
1NWjZD9f1vvXnSKKDdHj1927WFMFZ/yYc24607zEVUaODQ",
)
.unwrap(),
EventEncryptionAlgorithm::MegolmV1AesSha2,
None,
)
.unwrap();
let serializer = IndexeddbSerializer::new(store_cipher.clone());
let txn = db
.transaction_on_one_with_mode(
old_keys::INBOUND_GROUP_SESSIONS_V1,
IdbTransactionMode::Readwrite,
)
.unwrap();
let sessions = txn.object_store(old_keys::INBOUND_GROUP_SESSIONS_V1).unwrap();
for session in vec![&session1, &session2] {
let room_id = session.room_id();
let session_id = session.session_id();
let key = serializer.encode_key(keys::INBOUND_GROUP_SESSIONS_V2, (room_id, session_id));
let pickle = session.pickle().await;
sessions.put_key_val(&key, &serializer.serialize_value(&pickle).unwrap()).unwrap();
}
txn.await.into_result().unwrap();
// now close our DB, reopen it properly, and check that we can still read our
// data.
db.close();
let store =
IndexeddbCryptoStore::open_with_store_cipher(&db_prefix, store_cipher).await.unwrap();
let s =
store.get_inbound_group_session(room_id, session1.session_id()).await.unwrap().unwrap();
assert_eq!(s.session_id(), session1.session_id());
assert_eq!(s.backed_up(), true);
let s =
store.get_inbound_group_session(room_id, session2.session_id()).await.unwrap().unwrap();
assert_eq!(s.session_id(), session2.session_id());
assert_eq!(s.backed_up(), false);
}
async fn create_v5_db(name: &str) -> std::result::Result<IdbDatabase, DomException> {
let mut db_req: OpenDbRequest = IdbDatabase::open_u32(name, 5)?;
db_req.set_on_upgrade_needed(Some(|evt: &IdbVersionChangeEvent| -> Result<(), JsValue> {
let db = evt.db();
migrate_stores_to_v1(db)?;
migrate_stores_to_v2(db)?;
migrate_stores_to_v3(db)?;
migrate_stores_to_v4(db)?;
migrate_stores_to_v5(db)?;
Ok(())
}));
db_req.await
}
}

View File

@@ -53,7 +53,9 @@ mod keys {
pub const CORE: &str = "core";
pub const SESSION: &str = "session";
pub const INBOUND_GROUP_SESSIONS: &str = "inbound_group_sessions";
pub const INBOUND_GROUP_SESSIONS_V2: &str = "inbound_group_sessions2";
pub const INBOUND_GROUP_SESSIONS_BACKUP_INDEX: &str = "backup";
pub const OUTBOUND_GROUP_SESSIONS: &str = "outbound_group_sessions";
@@ -164,7 +166,7 @@ impl IndexeddbCryptoStore {
let name = format!("{prefix:0}::matrix-sdk-crypto");
let serializer = IndexeddbSerializer::new(store_cipher);
let db = open_and_upgrade_db(&name).await?;
let db = open_and_upgrade_db(&name, &serializer).await?;
let session_cache = SessionStore::new();
Ok(Self {
@@ -251,6 +253,44 @@ impl IndexeddbCryptoStore {
self.static_account.read().unwrap().clone()
}
/// Transform an [`InboundGroupSession`] into a `JsValue` holding a
/// [`InboundGroupSessionIndexedDbObject`], ready for storing.
async fn serialize_inbound_group_session(
&self,
session: &InboundGroupSession,
) -> Result<JsValue> {
let obj = InboundGroupSessionIndexedDbObject {
pickled_session: self.serializer.serialize_value_as_bytes(&session.pickle().await)?,
needs_backup: !session.backed_up(),
};
Ok(serde_wasm_bindgen::to_value(&obj)?)
}
/// Transform a JsValue holding a [`InboundGroupSessionIndexedDbObject`]
/// back into a [`InboundGroupSession`].
fn deserialize_inbound_group_session(
&self,
stored_value: JsValue,
) -> Result<InboundGroupSession> {
let idb_object: InboundGroupSessionIndexedDbObject =
serde_wasm_bindgen::from_value(stored_value)?;
let pickled_session =
self.serializer.deserialize_value_from_bytes(&idb_object.pickled_session)?;
let session = InboundGroupSession::from_pickle(pickled_session)
.map_err(|e| IndexeddbCryptoStoreError::CryptoStoreError(e.into()))?;
// Although a "backed up" flag is stored inside `idb_object.pickled_session`, it
// is not maintained when backups are reset. Overwrite the flag with the
// needs_backup value from the IDB object.
if idb_object.needs_backup {
session.reset_backup_state();
} else {
session.mark_as_backed_up();
}
Ok(session)
}
/// Transform a [`GossipRequest`] into a `JsValue` holding a
/// [`GossipRequestIndexedDbObject`], ready for storing.
fn serialize_gossip_request(&self, gossip_request: &GossipRequest) -> Result<JsValue> {
@@ -369,7 +409,7 @@ impl_crypto_store! {
keys::IDENTITIES,
),
(!changes.inbound_group_sessions.is_empty(), keys::INBOUND_GROUP_SESSIONS),
(!changes.inbound_group_sessions.is_empty(), keys::INBOUND_GROUP_SESSIONS_V2),
(!changes.outbound_group_sessions.is_empty(), keys::OUTBOUND_GROUP_SESSIONS),
(!changes.message_hashes.is_empty(), keys::OLM_HASHES),
(!changes.withheld_session_info.is_empty(), keys::DIRECT_WITHHELD_INFO),
@@ -439,15 +479,14 @@ impl_crypto_store! {
}
if !changes.inbound_group_sessions.is_empty() {
let sessions = tx.object_store(keys::INBOUND_GROUP_SESSIONS)?;
let sessions = tx.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?;
for session in changes.inbound_group_sessions {
let room_id = session.room_id();
let session_id = session.session_id();
let key = self.serializer.encode_key(keys::INBOUND_GROUP_SESSIONS, (room_id, session_id));
let pickle = session.pickle().await;
sessions.put_key_val(&key, &self.serializer.serialize_value(&pickle)?)?;
let key = self.serializer.encode_key(keys::INBOUND_GROUP_SESSIONS_V2, (room_id, session_id));
let value = self.serialize_inbound_group_session(&session).await?;
sessions.put_key_val(&key, &value)?;
}
}
@@ -717,19 +756,18 @@ impl_crypto_store! {
room_id: &RoomId,
session_id: &str,
) -> Result<Option<InboundGroupSession>> {
let key = self.serializer.encode_key(keys::INBOUND_GROUP_SESSIONS, (room_id, session_id));
if let Some(pickle) = self
let key = self.serializer.encode_key(keys::INBOUND_GROUP_SESSIONS_V2, (room_id, session_id));
if let Some(value) = self
.inner
.transaction_on_one_with_mode(
keys::INBOUND_GROUP_SESSIONS,
keys::INBOUND_GROUP_SESSIONS_V2,
IdbTransactionMode::Readonly,
)?
.object_store(keys::INBOUND_GROUP_SESSIONS)?
.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?
.get(&key)?
.await?
{
let pickle = self.serializer.deserialize_value(pickle)?;
Ok(Some(InboundGroupSession::from_pickle(pickle).map_err(CryptoStoreError::from)?))
Ok(Some(self.deserialize_inbound_group_session(value)?))
} else {
Ok(None)
}
@@ -739,57 +777,92 @@ impl_crypto_store! {
Ok(self
.inner
.transaction_on_one_with_mode(
keys::INBOUND_GROUP_SESSIONS,
keys::INBOUND_GROUP_SESSIONS_V2,
IdbTransactionMode::Readonly,
)?
.object_store(keys::INBOUND_GROUP_SESSIONS)?
.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?
.get_all()?
.await?
.iter()
.filter_map(|i| self.serializer.deserialize_value(i).ok())
.filter_map(|p| InboundGroupSession::from_pickle(p).ok())
.filter_map(|v| self.deserialize_inbound_group_session(v).ok())
.collect())
}
async fn inbound_group_session_counts(&self) -> Result<RoomKeyCounts> {
let all = self.get_inbound_group_sessions().await?;
let backed_up = all.iter().filter(|s| s.backed_up()).count();
Ok(RoomKeyCounts { total: all.len(), backed_up })
let tx = self
.inner
.transaction_on_one_with_mode(
keys::INBOUND_GROUP_SESSIONS_V2,
IdbTransactionMode::Readonly,
)?;
let store = tx.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?;
let all = store.count()?.await? as usize;
let not_backed_up = store.index(keys::INBOUND_GROUP_SESSIONS_BACKUP_INDEX)?.count()?.await? as usize;
tx.await.into_result()?;
Ok(RoomKeyCounts { total: all, backed_up: all - not_backed_up })
}
async fn inbound_group_sessions_for_backup(
&self,
limit: usize,
) -> Result<Vec<InboundGroupSession>> {
Ok(self
.get_inbound_group_sessions()
.await?
.into_iter()
.filter(|s| !s.backed_up())
.take(limit)
.collect())
let tx = self
.inner
.transaction_on_one_with_mode(
keys::INBOUND_GROUP_SESSIONS_V2,
IdbTransactionMode::Readonly,
)?;
let store = tx.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?;
let idx = store.index(keys::INBOUND_GROUP_SESSIONS_BACKUP_INDEX)?;
// XXX ideally we would use `get_all_with_key_and_limit`, but that doesn't appear to be
// exposed (https://github.com/Alorel/rust-indexed-db/issues/31). Instead we replicate
// the behaviour with a cursor.
let Some(cursor) = idx.open_cursor()?.await? else {
return Ok(vec![]);
};
let mut result = Vec::new();
for _ in 0..limit {
result.push(self.deserialize_inbound_group_session(cursor.value())?);
if !cursor.continue_cursor()?.await? {
break;
}
}
tx.await.into_result()?;
Ok(result)
}
async fn reset_backup_state(&self) -> Result<()> {
let inbound_group_sessions = self
.get_inbound_group_sessions()
.await?
.into_iter()
.filter(|s| {
if s.backed_up() {
s.reset_backup_state();
true
} else {
false
let tx = self
.inner
.transaction_on_one_with_mode(
keys::INBOUND_GROUP_SESSIONS_V2,
IdbTransactionMode::Readwrite,
)?;
if let Some(cursor) = tx.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?.open_cursor()?.await? {
loop {
let mut idb_object: InboundGroupSessionIndexedDbObject = serde_wasm_bindgen::from_value(cursor.value())?;
if !idb_object.needs_backup {
idb_object.needs_backup = true;
// We don't bother to update the encrypted `InboundGroupSession` object stored
// inside `idb_object.data`, since that would require decryption and encryption.
// Instead, it will be patched up by `deserialize_inbound_group_session`.
let idb_object = serde_wasm_bindgen::to_value(&idb_object)?;
cursor.update(&idb_object)?.await?;
}
})
.collect::<Vec<_>>();
if !inbound_group_sessions.is_empty() {
let changes = Changes { inbound_group_sessions, ..Default::default() };
self.save_changes(changes).await?;
if !cursor.continue_cursor()?.await? {
break;
}
}
}
Ok(())
Ok(tx.await.into_result()?)
}
async fn save_tracked_users(&self, users: &[(&UserId, bool)]) -> Result<()> {
@@ -1110,6 +1183,30 @@ struct GossipRequestIndexedDbObject {
unsent: bool,
}
/// The objects we store in the inbound_group_sessions2 indexeddb object store
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct InboundGroupSessionIndexedDbObject {
/// (Possibly encrypted) serialisation of a
/// [`matrix_sdk_crypto::olm::group_sessions::PickledInboundGroupSession`]
/// structure.
pickled_session: Vec<u8>,
/// Whether the session data has yet to be backed up.
///
/// Since we only need to be able to find entries where this is `true`, we
/// skip serialization in cases where it is `false`. That has the effect
/// of omitting it from the indexeddb index.
///
/// We also use a custom serializer because bools can't be used as keys in
/// indexeddb.
#[serde(
default,
skip_serializing_if = "std::ops::Not::not",
with = "crate::serialize_bool_for_indexeddb"
)]
needs_backup: bool,
}
#[cfg(all(test, target_arch = "wasm32"))]
mod tests {
use matrix_sdk_crypto::cryptostore_integration_tests;