From 2f8f39795fb9e2c7d9b92c80262e4b7c71d6eebd Mon Sep 17 00:00:00 2001 From: Michael Goldenberg Date: Tue, 1 Jul 2025 19:23:14 -0400 Subject: [PATCH] feat(indexeddb): add IndexedDB-backed impl for EventCacheStore::handle_linked_chunk_updates Signed-off-by: Michael Goldenberg --- .../src/event_cache_store/error.rs | 20 ++- .../src/event_cache_store/mod.rs | 137 +++++++++++++++++- 2 files changed, 152 insertions(+), 5 deletions(-) diff --git a/crates/matrix-sdk-indexeddb/src/event_cache_store/error.rs b/crates/matrix-sdk-indexeddb/src/event_cache_store/error.rs index 8e22c2dbd..e144b8d2a 100644 --- a/crates/matrix-sdk-indexeddb/src/event_cache_store/error.rs +++ b/crates/matrix-sdk-indexeddb/src/event_cache_store/error.rs @@ -16,8 +16,11 @@ use matrix_sdk_base::{ event_cache::store::{EventCacheStore, EventCacheStoreError, MemoryStore}, SendOutsideWasm, SyncOutsideWasm, }; +use serde::de::Error; use thiserror::Error; +use crate::event_cache_store::transaction::IndexeddbEventCacheStoreTransactionError; + /// A trait that combines the necessary traits needed for asynchronous runtimes, /// but excludes them when running in a web environment - i.e., when /// `#[cfg(target_family = "wasm")]`. @@ -30,6 +33,8 @@ impl AsyncErrorDeps for T where T: std::error::Error + SendOutsideWasm + Sync pub enum IndexeddbEventCacheStoreError { #[error("DomException {name} ({code}): {message}")] DomException { name: String, message: String, code: u16 }, + #[error("transaction: {0}")] + Transaction(#[from] IndexeddbEventCacheStoreTransactionError), #[error("media store: {0}")] MemoryStore(::Error), } @@ -47,7 +52,20 @@ impl From for IndexeddbEventCacheStoreError { impl From for EventCacheStoreError { fn from(value: IndexeddbEventCacheStoreError) -> Self { match value { - IndexeddbEventCacheStoreError::DomException { .. } => Self::Backend(Box::new(value)), + IndexeddbEventCacheStoreError::DomException { .. } => { + Self::InvalidData { details: value.to_string() } + } + IndexeddbEventCacheStoreError::Transaction(ref inner) => match inner { + IndexeddbEventCacheStoreTransactionError::DomException { .. } => { + Self::InvalidData { details: value.to_string() } + } + IndexeddbEventCacheStoreTransactionError::Serialization(e) => { + Self::Serialization(serde_json::Error::custom(e.to_string())) + } + IndexeddbEventCacheStoreTransactionError::ItemIsNotUnique => { + Self::InvalidData { details: value.to_string() } + } + }, IndexeddbEventCacheStoreError::MemoryStore(inner) => inner, } } diff --git a/crates/matrix-sdk-indexeddb/src/event_cache_store/mod.rs b/crates/matrix-sdk-indexeddb/src/event_cache_store/mod.rs index aa1b2cbb7..eb7e2f128 100644 --- a/crates/matrix-sdk-indexeddb/src/event_cache_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/event_cache_store/mod.rs @@ -30,11 +30,14 @@ use matrix_sdk_base::{ media::MediaRequestParameters, }; use ruma::{events::relation::RelationType, EventId, MxcUri, OwnedEventId, RoomId}; +use tracing::trace; use web_sys::IdbTransactionMode; use crate::event_cache_store::{ + migrations::current::keys, serializer::IndexeddbEventCacheStoreSerializer, transaction::IndexeddbEventCacheStoreTransaction, + types::{ChunkType, InBandEvent}, }; mod builder; @@ -72,6 +75,20 @@ impl IndexeddbEventCacheStore { pub fn builder() -> IndexeddbEventCacheStoreBuilder { IndexeddbEventCacheStoreBuilder::default() } + + /// Initializes a new transaction on the underlying IndexedDB database and + /// returns a handle which can be used to combine database operations + /// into an atomic unit. + pub fn transaction<'a>( + &'a self, + stores: &[&str], + mode: IdbTransactionMode, + ) -> Result, IndexeddbEventCacheStoreError> { + Ok(IndexeddbEventCacheStoreTransaction::new( + self.inner.transaction_on_multi_with_mode(stores, mode)?, + &self.serializer, + )) + } } // Small hack to have the following macro invocation act as the appropriate @@ -121,10 +138,122 @@ impl_event_cache_store! { linked_chunk_id: LinkedChunkId<'_>, updates: Vec>, ) -> Result<(), IndexeddbEventCacheStoreError> { - self.memory_store - .handle_linked_chunk_updates(linked_chunk_id, updates) - .await - .map_err(IndexeddbEventCacheStoreError::MemoryStore) + let linked_chunk_id = linked_chunk_id.to_owned(); + let room_id = linked_chunk_id.room_id(); + + let transaction = self.transaction( + &[keys::LINKED_CHUNKS, keys::GAPS, keys::EVENTS], + IdbTransactionMode::Readwrite, + )?; + + for update in updates { + match update { + Update::NewItemsChunk { previous, new, next } => { + trace!(%room_id, "Inserting new chunk (prev={previous:?}, new={new:?}, next={next:?})"); + transaction + .add_chunk( + room_id, + &types::Chunk { + identifier: new.index(), + previous: previous.map(|i| i.index()), + next: next.map(|i| i.index()), + chunk_type: ChunkType::Event, + }, + ) + .await?; + } + Update::NewGapChunk { previous, new, next, gap } => { + trace!(%room_id, "Inserting new gap (prev={previous:?}, new={new:?}, next={next:?})"); + transaction + .add_item( + room_id, + &types::Gap { + chunk_identifier: new.index(), + prev_token: gap.prev_token, + }, + ) + .await?; + transaction + .add_chunk( + room_id, + &types::Chunk { + identifier: new.index(), + previous: previous.map(|i| i.index()), + next: next.map(|i| i.index()), + chunk_type: ChunkType::Gap, + }, + ) + .await?; + } + Update::RemoveChunk(chunk_id) => { + trace!("Removing chunk {chunk_id:?}"); + transaction.delete_chunk_by_id(room_id, &chunk_id).await?; + } + Update::PushItems { at, items } => { + let chunk_identifier = at.chunk_identifier().index(); + + trace!(%room_id, "pushing {} items @ {chunk_identifier}", items.len()); + + for (i, item) in items.into_iter().enumerate() { + transaction + .add_item( + room_id, + &types::Event::InBand(InBandEvent { + content: item, + position: types::Position { + chunk_identifier, + index: at.index() + i, + }, + }), + ) + .await?; + } + } + Update::ReplaceItem { at, item } => { + let chunk_id = at.chunk_identifier().index(); + let index = at.index(); + + trace!(%room_id, "replacing item @ {chunk_id}:{index}"); + + transaction + .put_event( + room_id, + &types::Event::InBand(InBandEvent { + content: item, + position: at.into(), + }), + ) + .await?; + } + Update::RemoveItem { at } => { + let chunk_id = at.chunk_identifier().index(); + let index = at.index(); + + trace!(%room_id, "removing item @ {chunk_id}:{index}"); + + transaction.delete_event_by_position(room_id, &at.into()).await?; + } + Update::DetachLastItems { at } => { + let chunk_id = at.chunk_identifier().index(); + let index = at.index(); + + trace!(%room_id, "detaching last items @ {chunk_id}:{index}"); + + transaction.delete_events_by_chunk_from_index(room_id, &at.into()).await?; + } + Update::StartReattachItems | Update::EndReattachItems => { + // Nothing? See sqlite implementation + } + Update::Clear => { + trace!(%room_id, "clearing room"); + transaction.delete_chunks_in_room(room_id).await?; + transaction.delete_events_in_room(room_id).await?; + transaction.delete_gaps_in_room(room_id).await?; + } + } + } + transaction.commit().await?; + Ok(()) } async fn load_all_chunks(