feat(indexeddb): add IndexedDB-backed impl for EventCacheStore::handle_linked_chunk_updates

Signed-off-by: Michael Goldenberg <m@mgoldenberg.net>
This commit is contained in:
Michael Goldenberg
2025-07-01 19:23:14 -04:00
committed by Ivan Enderlin
parent b1ef15c346
commit 2f8f39795f
2 changed files with 152 additions and 5 deletions

View File

@@ -16,8 +16,11 @@ use matrix_sdk_base::{
event_cache::store::{EventCacheStore, EventCacheStoreError, MemoryStore},
SendOutsideWasm, SyncOutsideWasm,
};
use serde::de::Error;
use thiserror::Error;
use crate::event_cache_store::transaction::IndexeddbEventCacheStoreTransactionError;
/// A trait that combines the necessary traits needed for asynchronous runtimes,
/// but excludes them when running in a web environment - i.e., when
/// `#[cfg(target_family = "wasm")]`.
@@ -30,6 +33,8 @@ impl<T> AsyncErrorDeps for T where T: std::error::Error + SendOutsideWasm + Sync
pub enum IndexeddbEventCacheStoreError {
#[error("DomException {name} ({code}): {message}")]
DomException { name: String, message: String, code: u16 },
#[error("transaction: {0}")]
Transaction(#[from] IndexeddbEventCacheStoreTransactionError),
#[error("media store: {0}")]
MemoryStore(<MemoryStore as EventCacheStore>::Error),
}
@@ -47,7 +52,20 @@ impl From<web_sys::DomException> for IndexeddbEventCacheStoreError {
impl From<IndexeddbEventCacheStoreError> for EventCacheStoreError {
fn from(value: IndexeddbEventCacheStoreError) -> Self {
match value {
IndexeddbEventCacheStoreError::DomException { .. } => Self::Backend(Box::new(value)),
IndexeddbEventCacheStoreError::DomException { .. } => {
Self::InvalidData { details: value.to_string() }
}
IndexeddbEventCacheStoreError::Transaction(ref inner) => match inner {
IndexeddbEventCacheStoreTransactionError::DomException { .. } => {
Self::InvalidData { details: value.to_string() }
}
IndexeddbEventCacheStoreTransactionError::Serialization(e) => {
Self::Serialization(serde_json::Error::custom(e.to_string()))
}
IndexeddbEventCacheStoreTransactionError::ItemIsNotUnique => {
Self::InvalidData { details: value.to_string() }
}
},
IndexeddbEventCacheStoreError::MemoryStore(inner) => inner,
}
}

View File

@@ -30,11 +30,14 @@ use matrix_sdk_base::{
media::MediaRequestParameters,
};
use ruma::{events::relation::RelationType, EventId, MxcUri, OwnedEventId, RoomId};
use tracing::trace;
use web_sys::IdbTransactionMode;
use crate::event_cache_store::{
migrations::current::keys,
serializer::IndexeddbEventCacheStoreSerializer,
transaction::IndexeddbEventCacheStoreTransaction,
types::{ChunkType, InBandEvent},
};
mod builder;
@@ -72,6 +75,20 @@ impl IndexeddbEventCacheStore {
pub fn builder() -> IndexeddbEventCacheStoreBuilder {
IndexeddbEventCacheStoreBuilder::default()
}
/// Initializes a new transaction on the underlying IndexedDB database and
/// returns a handle which can be used to combine database operations
/// into an atomic unit.
pub fn transaction<'a>(
&'a self,
stores: &[&str],
mode: IdbTransactionMode,
) -> Result<IndexeddbEventCacheStoreTransaction<'a>, IndexeddbEventCacheStoreError> {
Ok(IndexeddbEventCacheStoreTransaction::new(
self.inner.transaction_on_multi_with_mode(stores, mode)?,
&self.serializer,
))
}
}
// Small hack to have the following macro invocation act as the appropriate
@@ -121,10 +138,122 @@ impl_event_cache_store! {
linked_chunk_id: LinkedChunkId<'_>,
updates: Vec<Update<Event, Gap>>,
) -> Result<(), IndexeddbEventCacheStoreError> {
self.memory_store
.handle_linked_chunk_updates(linked_chunk_id, updates)
.await
.map_err(IndexeddbEventCacheStoreError::MemoryStore)
let linked_chunk_id = linked_chunk_id.to_owned();
let room_id = linked_chunk_id.room_id();
let transaction = self.transaction(
&[keys::LINKED_CHUNKS, keys::GAPS, keys::EVENTS],
IdbTransactionMode::Readwrite,
)?;
for update in updates {
match update {
Update::NewItemsChunk { previous, new, next } => {
trace!(%room_id, "Inserting new chunk (prev={previous:?}, new={new:?}, next={next:?})");
transaction
.add_chunk(
room_id,
&types::Chunk {
identifier: new.index(),
previous: previous.map(|i| i.index()),
next: next.map(|i| i.index()),
chunk_type: ChunkType::Event,
},
)
.await?;
}
Update::NewGapChunk { previous, new, next, gap } => {
trace!(%room_id, "Inserting new gap (prev={previous:?}, new={new:?}, next={next:?})");
transaction
.add_item(
room_id,
&types::Gap {
chunk_identifier: new.index(),
prev_token: gap.prev_token,
},
)
.await?;
transaction
.add_chunk(
room_id,
&types::Chunk {
identifier: new.index(),
previous: previous.map(|i| i.index()),
next: next.map(|i| i.index()),
chunk_type: ChunkType::Gap,
},
)
.await?;
}
Update::RemoveChunk(chunk_id) => {
trace!("Removing chunk {chunk_id:?}");
transaction.delete_chunk_by_id(room_id, &chunk_id).await?;
}
Update::PushItems { at, items } => {
let chunk_identifier = at.chunk_identifier().index();
trace!(%room_id, "pushing {} items @ {chunk_identifier}", items.len());
for (i, item) in items.into_iter().enumerate() {
transaction
.add_item(
room_id,
&types::Event::InBand(InBandEvent {
content: item,
position: types::Position {
chunk_identifier,
index: at.index() + i,
},
}),
)
.await?;
}
}
Update::ReplaceItem { at, item } => {
let chunk_id = at.chunk_identifier().index();
let index = at.index();
trace!(%room_id, "replacing item @ {chunk_id}:{index}");
transaction
.put_event(
room_id,
&types::Event::InBand(InBandEvent {
content: item,
position: at.into(),
}),
)
.await?;
}
Update::RemoveItem { at } => {
let chunk_id = at.chunk_identifier().index();
let index = at.index();
trace!(%room_id, "removing item @ {chunk_id}:{index}");
transaction.delete_event_by_position(room_id, &at.into()).await?;
}
Update::DetachLastItems { at } => {
let chunk_id = at.chunk_identifier().index();
let index = at.index();
trace!(%room_id, "detaching last items @ {chunk_id}:{index}");
transaction.delete_events_by_chunk_from_index(room_id, &at.into()).await?;
}
Update::StartReattachItems | Update::EndReattachItems => {
// Nothing? See sqlite implementation
}
Update::Clear => {
trace!(%room_id, "clearing room");
transaction.delete_chunks_in_room(room_id).await?;
transaction.delete_events_in_room(room_id).await?;
transaction.delete_gaps_in_room(room_id).await?;
}
}
}
transaction.commit().await?;
Ok(())
}
async fn load_all_chunks(