diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index e0aeefa86..c02ee8a94 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -390,181 +390,181 @@ impl EventCacheStore for SqliteEventCacheStore { let this = self.clone(); with_immediate_transaction(self.acquire().await?, move |txn| { - for up in updates { - match up { - Update::NewItemsChunk { previous, new, next } => { - let previous = previous.as_ref().map(ChunkIdentifier::index); - let new = new.index(); - let next = next.as_ref().map(ChunkIdentifier::index); + for up in updates { + match up { + Update::NewItemsChunk { previous, new, next } => { + let previous = previous.as_ref().map(ChunkIdentifier::index); + let new = new.index(); + let next = next.as_ref().map(ChunkIdentifier::index); - trace!( - %room_id, - "new events chunk (prev={previous:?}, i={new}, next={next:?})", - ); + trace!( + %room_id, + "new events chunk (prev={previous:?}, i={new}, next={next:?})", + ); - insert_chunk( - txn, - &hashed_room_id, - previous, - new, - next, - CHUNK_TYPE_EVENT_TYPE_STRING, - )?; + insert_chunk( + txn, + &hashed_room_id, + previous, + new, + next, + CHUNK_TYPE_EVENT_TYPE_STRING, + )?; + } + + Update::NewGapChunk { previous, new, next, gap } => { + let serialized = serde_json::to_vec(&gap.prev_token)?; + let prev_token = this.encode_value(serialized)?; + + let previous = previous.as_ref().map(ChunkIdentifier::index); + let new = new.index(); + let next = next.as_ref().map(ChunkIdentifier::index); + + trace!( + %room_id, + "new gap chunk (prev={previous:?}, i={new}, next={next:?})", + ); + + // Insert the chunk as a gap. + insert_chunk( + txn, + &hashed_room_id, + previous, + new, + next, + CHUNK_TYPE_GAP_TYPE_STRING, + )?; + + // Insert the gap's value. + txn.execute( + r#" + INSERT INTO gaps(chunk_id, room_id, prev_token) + VALUES (?, ?, ?) + "#, + (new, &hashed_room_id, prev_token), + )?; + } + + Update::RemoveChunk(chunk_identifier) => { + let chunk_id = chunk_identifier.index(); + + trace!(%room_id, "removing chunk @ {chunk_id}"); + + // Find chunk to delete. + let (previous, next): (Option, Option) = txn.query_row( + "SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?", + (chunk_id, &hashed_room_id), + |row| Ok((row.get(0)?, row.get(1)?)) + )?; + + // Replace its previous' next to its own next. + if let Some(previous) = previous { + txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?", (next, previous, &hashed_room_id))?; } - Update::NewGapChunk { previous, new, next, gap } => { - let serialized = serde_json::to_vec(&gap.prev_token)?; - let prev_token = this.encode_value(serialized)?; - - let previous = previous.as_ref().map(ChunkIdentifier::index); - let new = new.index(); - let next = next.as_ref().map(ChunkIdentifier::index); - - trace!( - %room_id, - "new gap chunk (prev={previous:?}, i={new}, next={next:?})", - ); - - // Insert the chunk as a gap. - insert_chunk( - txn, - &hashed_room_id, - previous, - new, - next, - CHUNK_TYPE_GAP_TYPE_STRING, - )?; - - // Insert the gap's value. - txn.execute( - r#" - INSERT INTO gaps(chunk_id, room_id, prev_token) - VALUES (?, ?, ?) - "#, - (new, &hashed_room_id, prev_token), - )?; + // Replace its next' previous to its own previous. + if let Some(next) = next { + txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?", (previous, next, &hashed_room_id))?; } - Update::RemoveChunk(chunk_identifier) => { - let chunk_id = chunk_identifier.index(); + // Now delete it, and let cascading delete corresponding entries in the + // other data tables. + txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, &hashed_room_id))?; + } - trace!(%room_id, "removing chunk @ {chunk_id}"); + Update::PushItems { at, items } => { + let chunk_id = at.chunk_identifier().index(); - // Find chunk to delete. - let (previous, next): (Option, Option) = txn.query_row( - "SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?", - (chunk_id, &hashed_room_id), - |row| Ok((row.get(0)?, row.get(1)?)) - )?; - - // Replace its previous' next to its own next. - if let Some(previous) = previous { - txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?", (next, previous, &hashed_room_id))?; - } - - // Replace its next' previous to its own previous. - if let Some(next) = next { - txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?", (previous, next, &hashed_room_id))?; - } - - // Now delete it, and let cascading delete corresponding entries in the - // other data tables. - txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, &hashed_room_id))?; - } - - Update::PushItems { at, items } => { - let chunk_id = at.chunk_identifier().index(); - - trace!(%room_id, "pushing {} items @ {chunk_id}", items.len()); - - for (i, event) in items.into_iter().enumerate() { - let serialized = serde_json::to_vec(&event)?; - let content = this.encode_value(serialized)?; - - let event_id = event.event_id().map(|event_id| event_id.to_string()); - let index = at.index() + i; - - txn.execute( - r#" - INSERT INTO events(chunk_id, room_id, event_id, content, position) - VALUES (?, ?, ?, ?, ?) - "#, - (chunk_id, &hashed_room_id, event_id, content, index), - )?; - } - } - - Update::ReplaceItem { at, item: event } => { - let chunk_id = at.chunk_identifier().index(); - let index = at.index(); - - trace!(%room_id, "replacing item @ {chunk_id}:{index}"); + trace!(%room_id, "pushing {} items @ {chunk_id}", items.len()); + for (i, event) in items.into_iter().enumerate() { let serialized = serde_json::to_vec(&event)?; let content = this.encode_value(serialized)?; - // The event id should be the same, but just in case it changed… let event_id = event.event_id().map(|event_id| event_id.to_string()); + let index = at.index() + i; txn.execute( r#" - UPDATE events - SET content = ?, event_id = ? - WHERE room_id = ? AND chunk_id = ? AND position = ? + INSERT INTO events(chunk_id, room_id, event_id, content, position) + VALUES (?, ?, ?, ?, ?) "#, - (content, event_id, &hashed_room_id, chunk_id, index,) + (chunk_id, &hashed_room_id, event_id, content, index), )?; } - - Update::RemoveItem { at } => { - let chunk_id = at.chunk_identifier().index(); - let index = at.index(); - - trace!(%room_id, "removing item @ {chunk_id}:{index}"); - - // Remove the entry. - txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position = ?", (&hashed_room_id, chunk_id, index))?; - - // Decrement the index of each item after the one we're going to remove. - txn.execute( - r#" - UPDATE events - SET position = position - 1 - WHERE room_id = ? AND chunk_id = ? AND position > ? - "#, - (&hashed_room_id, chunk_id, index) - )?; - - } - - Update::DetachLastItems { at } => { - let chunk_id = at.chunk_identifier().index(); - let index = at.index(); - - trace!(%room_id, "truncating items >= {chunk_id}:{index}"); - - // Remove these entries. - txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position >= ?", (&hashed_room_id, chunk_id, index))?; - } - - Update::Clear => { - trace!(%room_id, "clearing items"); - - // Remove chunks, and let cascading do its job. - txn.execute( - "DELETE FROM linked_chunks WHERE room_id = ?", - (&hashed_room_id,), - )?; - } - - Update::StartReattachItems | Update::EndReattachItems => { - // Nothing. - } } - } - Ok(()) - }) + Update::ReplaceItem { at, item: event } => { + let chunk_id = at.chunk_identifier().index(); + let index = at.index(); + + trace!(%room_id, "replacing item @ {chunk_id}:{index}"); + + let serialized = serde_json::to_vec(&event)?; + let content = this.encode_value(serialized)?; + + // The event id should be the same, but just in case it changed… + let event_id = event.event_id().map(|event_id| event_id.to_string()); + + txn.execute( + r#" + UPDATE events + SET content = ?, event_id = ? + WHERE room_id = ? AND chunk_id = ? AND position = ? + "#, + (content, event_id, &hashed_room_id, chunk_id, index,) + )?; + } + + Update::RemoveItem { at } => { + let chunk_id = at.chunk_identifier().index(); + let index = at.index(); + + trace!(%room_id, "removing item @ {chunk_id}:{index}"); + + // Remove the entry. + txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position = ?", (&hashed_room_id, chunk_id, index))?; + + // Decrement the index of each item after the one we're going to remove. + txn.execute( + r#" + UPDATE events + SET position = position - 1 + WHERE room_id = ? AND chunk_id = ? AND position > ? + "#, + (&hashed_room_id, chunk_id, index) + )?; + + } + + Update::DetachLastItems { at } => { + let chunk_id = at.chunk_identifier().index(); + let index = at.index(); + + trace!(%room_id, "truncating items >= {chunk_id}:{index}"); + + // Remove these entries. + txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position >= ?", (&hashed_room_id, chunk_id, index))?; + } + + Update::Clear => { + trace!(%room_id, "clearing items"); + + // Remove chunks, and let cascading do its job. + txn.execute( + "DELETE FROM linked_chunks WHERE room_id = ?", + (&hashed_room_id,), + )?; + } + + Update::StartReattachItems | Update::EndReattachItems => { + // Nothing. + } + } + } + + Ok(()) + }) .await?; Ok(())