From 68cb85a2b2403d14ad7e3b1f3fb08de7ebcb85cb Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 9 Dec 2024 15:45:09 +0100 Subject: [PATCH] refactor(event cache store): use a single transaction to handle all linked chunk updates at once Instead of one transaction per update. This ensures that if a single update fails, then none is taken into account. --- .../src/event_cache_store.rs | 213 +++++++++--------- 1 file changed, 103 insertions(+), 110 deletions(-) diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index 0a0cbe306..2e9514123 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -416,25 +416,27 @@ impl EventCacheStore for SqliteEventCacheStore { room_id: &RoomId, updates: Vec>, ) -> Result<(), Self::Error> { + // Use a single transaction throughout this function, so that either all updates + // work, or none is taken into account. let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id); + let room_id = room_id.to_owned(); + let this = self.clone(); - for up in updates { - match up { - Update::NewItemsChunk { previous, new, next } => { - let hashed_room_id = hashed_room_id.clone(); + self.acquire() + .await? + .with_transaction(move |txn| -> Result<_, Self::Error> { + for up in updates { + match up { + Update::NewItemsChunk { previous, new, next } => { + let previous = previous.as_ref().map(ChunkIdentifier::index); + let new = new.index(); + let next = next.as_ref().map(ChunkIdentifier::index); - let previous = previous.as_ref().map(ChunkIdentifier::index); - let new = new.index(); - let next = next.as_ref().map(ChunkIdentifier::index); + trace!( + %room_id, + "new events chunk (prev={previous:?}, i={new}, next={next:?})", + ); - trace!( - %room_id, - "new events chunk (prev={previous:?}, i={new}, next={next:?})", - ); - - self.acquire() - .await? - .with_transaction(move |txn| { insert_chunk( txn, &hashed_room_id, @@ -442,29 +444,22 @@ impl EventCacheStore for SqliteEventCacheStore { new, next, CHUNK_TYPE_EVENT_TYPE_STRING, - ) - }) - .await?; - } + )?; + } - Update::NewGapChunk { previous, new, next, gap } => { - let hashed_room_id = hashed_room_id.clone(); + Update::NewGapChunk { previous, new, next, gap } => { + let serialized = serde_json::to_vec(&gap.prev_token)?; + let prev_token = this.encode_value(serialized)?; - let serialized = serde_json::to_vec(&gap.prev_token)?; - let prev_token = self.encode_value(serialized)?; + let previous = previous.as_ref().map(ChunkIdentifier::index); + let new = new.index(); + let next = next.as_ref().map(ChunkIdentifier::index); - let previous = previous.as_ref().map(ChunkIdentifier::index); - let new = new.index(); - let next = next.as_ref().map(ChunkIdentifier::index); + trace!( + %room_id, + "new gap chunk (prev={previous:?}, i={new}, next={next:?})", + ); - trace!( - %room_id, - "new gap chunk (prev={previous:?}, i={new}, next={next:?})", - ); - - self.acquire() - .await? - .with_transaction(move |txn| -> rusqlite::Result<()> { // Insert the chunk as a gap. insert_chunk( txn, @@ -481,23 +476,15 @@ impl EventCacheStore for SqliteEventCacheStore { INSERT INTO gaps(chunk_id, room_id, prev_token) VALUES (?, ?, ?) "#, - (new, hashed_room_id, prev_token), + (new, &hashed_room_id, prev_token), )?; + } - Ok(()) - }) - .await?; - } + Update::RemoveChunk(chunk_identifier) => { + let chunk_id = chunk_identifier.index(); - Update::RemoveChunk(chunk_identifier) => { - let hashed_room_id = hashed_room_id.clone(); - let chunk_id = chunk_identifier.index(); + trace!(%room_id, "removing chunk @ {chunk_id}"); - trace!(%room_id, "removing chunk @ {chunk_id}"); - - self.acquire() - .await? - .with_transaction(move |txn| -> rusqlite::Result<()> { // Find chunk to delete. let (previous, next): (Option, Option) = txn.query_row( "SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?", @@ -517,30 +504,19 @@ impl EventCacheStore for SqliteEventCacheStore { // Now delete it, and let cascading delete corresponding entries in the // other data tables. - txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, hashed_room_id))?; + txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, &hashed_room_id))?; + } - Ok(()) - }) - .await?; - } + Update::PushItems { at, items } => { + let chunk_id = at.chunk_identifier().index(); - Update::PushItems { at, items } => { - let chunk_id = at.chunk_identifier().index(); - let hashed_room_id = hashed_room_id.clone(); + trace!(%room_id, "pushing items @ {chunk_id}"); - trace!(%room_id, "pushing items @ {chunk_id}"); - - let this = self.clone(); - - self.acquire() - .await? - .with_transaction(move |txn| -> Result<(), Self::Error> { for (i, event) in items.into_iter().enumerate() { let serialized = serde_json::to_vec(&event)?; let content = this.encode_value(serialized)?; - let event_id = - event.event_id().map(|event_id| event_id.to_string()); + let event_id = event.event_id().map(|event_id| event_id.to_string()); let index = at.index() + i; txn.execute( @@ -551,27 +527,18 @@ impl EventCacheStore for SqliteEventCacheStore { (chunk_id, &hashed_room_id, event_id, content, index), )?; } + } - Ok(()) - }) - .await?; - } + Update::RemoveItem { at } => { + let chunk_id = at.chunk_identifier().index(); + let index = at.index(); - Update::RemoveItem { at } => { - let hashed_room_id = hashed_room_id.clone(); - let chunk_id = at.chunk_identifier().index(); - let index = at.index(); + trace!(%room_id, "removing item @ {chunk_id}:{index}"); - trace!(%room_id, "removing item @ {chunk_id}:{index}"); - - self.acquire() - .await? - .with_transaction(move |txn| -> rusqlite::Result<()> { // Remove the entry. txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position = ?", (&hashed_room_id, chunk_id, index))?; - // Decrement the index of each item after the one we're going to - // remove. + // Decrement the index of each item after the one we're going to remove. txn.execute( r#" UPDATE events @@ -581,50 +548,37 @@ impl EventCacheStore for SqliteEventCacheStore { (&hashed_room_id, chunk_id, index) )?; - Ok(()) - }) - .await?; - } + } - Update::DetachLastItems { at } => { - let hashed_room_id = hashed_room_id.clone(); - let chunk_id = at.chunk_identifier().index(); - let index = at.index(); + Update::DetachLastItems { at } => { + let chunk_id = at.chunk_identifier().index(); + let index = at.index(); - trace!(%room_id, "truncating items >= {chunk_id}:{index}"); + trace!(%room_id, "truncating items >= {chunk_id}:{index}"); - self.acquire() - .await? - .with_transaction(move |txn| -> rusqlite::Result<()> { // Remove these entries. txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position >= ?", (&hashed_room_id, chunk_id, index))?; - Ok(()) - }) - .await?; - } + } - Update::Clear => { - let hashed_room_id = hashed_room_id.clone(); + Update::Clear => { + trace!(%room_id, "clearing items"); - trace!(%room_id, "clearing items"); - - self.acquire() - .await? - .with_transaction(move |txn| { // Remove chunks, and let cascading do its job. txn.execute( "DELETE FROM linked_chunks WHERE room_id = ?", (&hashed_room_id,), - ) - }) - .await?; + )?; + } + + Update::StartReattachItems | Update::EndReattachItems => { + // Nothing. + } + } } - Update::StartReattachItems | Update::EndReattachItems => { - // Nothing. - } - } - } + Ok(()) + }) + .await?; Ok(()) } @@ -1414,6 +1368,45 @@ mod tests { check_test_event(&events[0], "beaufort is the best"); }); } + + #[async_test] + async fn test_linked_chunk_update_is_a_transaction() { + let store = get_event_cache_store().await.expect("creating cache store failed"); + + let room_id = *DEFAULT_TEST_ROOM_ID; + + // Trigger a violation of the unique constraint on the (room id, chunk id) + // couple. + let err = store + .handle_linked_chunk_updates( + room_id, + vec![ + Update::NewItemsChunk { + previous: None, + new: ChunkIdentifier::new(42), + next: None, + }, + Update::NewItemsChunk { + previous: None, + new: ChunkIdentifier::new(42), + next: None, + }, + ], + ) + .await + .unwrap_err(); + + // The operation fails with a constraint violation error. + assert_matches!(err, crate::error::Error::Sqlite(err) => { + assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation)); + }); + + // If the updates have been handled transactionally, then no new chunks should + // have been added; failure of the second update leads to the first one being + // rolled back. + let chunks = store.load_chunks(room_id).await.unwrap(); + assert!(chunks.is_empty()); + } } #[cfg(test)]