refactor(event cache store): use a single transaction to handle all linked chunk updates at once

Instead of one transaction per update. This ensures that if a single
update fails, then none is taken into account.
This commit is contained in:
Benjamin Bouvier
2024-12-09 15:45:09 +01:00
parent 72fcc50f80
commit 68cb85a2b2

View File

@@ -416,25 +416,27 @@ impl EventCacheStore for SqliteEventCacheStore {
room_id: &RoomId,
updates: Vec<Update<Event, Gap>>,
) -> Result<(), Self::Error> {
// Use a single transaction throughout this function, so that either all updates
// work, or none is taken into account.
let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
let room_id = room_id.to_owned();
let this = self.clone();
for up in updates {
match up {
Update::NewItemsChunk { previous, new, next } => {
let hashed_room_id = hashed_room_id.clone();
self.acquire()
.await?
.with_transaction(move |txn| -> Result<_, Self::Error> {
for up in updates {
match up {
Update::NewItemsChunk { previous, new, next } => {
let previous = previous.as_ref().map(ChunkIdentifier::index);
let new = new.index();
let next = next.as_ref().map(ChunkIdentifier::index);
let previous = previous.as_ref().map(ChunkIdentifier::index);
let new = new.index();
let next = next.as_ref().map(ChunkIdentifier::index);
trace!(
%room_id,
"new events chunk (prev={previous:?}, i={new}, next={next:?})",
);
trace!(
%room_id,
"new events chunk (prev={previous:?}, i={new}, next={next:?})",
);
self.acquire()
.await?
.with_transaction(move |txn| {
insert_chunk(
txn,
&hashed_room_id,
@@ -442,29 +444,22 @@ impl EventCacheStore for SqliteEventCacheStore {
new,
next,
CHUNK_TYPE_EVENT_TYPE_STRING,
)
})
.await?;
}
)?;
}
Update::NewGapChunk { previous, new, next, gap } => {
let hashed_room_id = hashed_room_id.clone();
Update::NewGapChunk { previous, new, next, gap } => {
let serialized = serde_json::to_vec(&gap.prev_token)?;
let prev_token = this.encode_value(serialized)?;
let serialized = serde_json::to_vec(&gap.prev_token)?;
let prev_token = self.encode_value(serialized)?;
let previous = previous.as_ref().map(ChunkIdentifier::index);
let new = new.index();
let next = next.as_ref().map(ChunkIdentifier::index);
let previous = previous.as_ref().map(ChunkIdentifier::index);
let new = new.index();
let next = next.as_ref().map(ChunkIdentifier::index);
trace!(
%room_id,
"new gap chunk (prev={previous:?}, i={new}, next={next:?})",
);
trace!(
%room_id,
"new gap chunk (prev={previous:?}, i={new}, next={next:?})",
);
self.acquire()
.await?
.with_transaction(move |txn| -> rusqlite::Result<()> {
// Insert the chunk as a gap.
insert_chunk(
txn,
@@ -481,23 +476,15 @@ impl EventCacheStore for SqliteEventCacheStore {
INSERT INTO gaps(chunk_id, room_id, prev_token)
VALUES (?, ?, ?)
"#,
(new, hashed_room_id, prev_token),
(new, &hashed_room_id, prev_token),
)?;
}
Ok(())
})
.await?;
}
Update::RemoveChunk(chunk_identifier) => {
let chunk_id = chunk_identifier.index();
Update::RemoveChunk(chunk_identifier) => {
let hashed_room_id = hashed_room_id.clone();
let chunk_id = chunk_identifier.index();
trace!(%room_id, "removing chunk @ {chunk_id}");
trace!(%room_id, "removing chunk @ {chunk_id}");
self.acquire()
.await?
.with_transaction(move |txn| -> rusqlite::Result<()> {
// Find chunk to delete.
let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
"SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?",
@@ -517,30 +504,19 @@ impl EventCacheStore for SqliteEventCacheStore {
// Now delete it, and let cascading delete corresponding entries in the
// other data tables.
txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, hashed_room_id))?;
txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, &hashed_room_id))?;
}
Ok(())
})
.await?;
}
Update::PushItems { at, items } => {
let chunk_id = at.chunk_identifier().index();
Update::PushItems { at, items } => {
let chunk_id = at.chunk_identifier().index();
let hashed_room_id = hashed_room_id.clone();
trace!(%room_id, "pushing items @ {chunk_id}");
trace!(%room_id, "pushing items @ {chunk_id}");
let this = self.clone();
self.acquire()
.await?
.with_transaction(move |txn| -> Result<(), Self::Error> {
for (i, event) in items.into_iter().enumerate() {
let serialized = serde_json::to_vec(&event)?;
let content = this.encode_value(serialized)?;
let event_id =
event.event_id().map(|event_id| event_id.to_string());
let event_id = event.event_id().map(|event_id| event_id.to_string());
let index = at.index() + i;
txn.execute(
@@ -551,27 +527,18 @@ impl EventCacheStore for SqliteEventCacheStore {
(chunk_id, &hashed_room_id, event_id, content, index),
)?;
}
}
Ok(())
})
.await?;
}
Update::RemoveItem { at } => {
let chunk_id = at.chunk_identifier().index();
let index = at.index();
Update::RemoveItem { at } => {
let hashed_room_id = hashed_room_id.clone();
let chunk_id = at.chunk_identifier().index();
let index = at.index();
trace!(%room_id, "removing item @ {chunk_id}:{index}");
trace!(%room_id, "removing item @ {chunk_id}:{index}");
self.acquire()
.await?
.with_transaction(move |txn| -> rusqlite::Result<()> {
// Remove the entry.
txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position = ?", (&hashed_room_id, chunk_id, index))?;
// Decrement the index of each item after the one we're going to
// remove.
// Decrement the index of each item after the one we're going to remove.
txn.execute(
r#"
UPDATE events
@@ -581,50 +548,37 @@ impl EventCacheStore for SqliteEventCacheStore {
(&hashed_room_id, chunk_id, index)
)?;
Ok(())
})
.await?;
}
}
Update::DetachLastItems { at } => {
let hashed_room_id = hashed_room_id.clone();
let chunk_id = at.chunk_identifier().index();
let index = at.index();
Update::DetachLastItems { at } => {
let chunk_id = at.chunk_identifier().index();
let index = at.index();
trace!(%room_id, "truncating items >= {chunk_id}:{index}");
trace!(%room_id, "truncating items >= {chunk_id}:{index}");
self.acquire()
.await?
.with_transaction(move |txn| -> rusqlite::Result<()> {
// Remove these entries.
txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position >= ?", (&hashed_room_id, chunk_id, index))?;
Ok(())
})
.await?;
}
}
Update::Clear => {
let hashed_room_id = hashed_room_id.clone();
Update::Clear => {
trace!(%room_id, "clearing items");
trace!(%room_id, "clearing items");
self.acquire()
.await?
.with_transaction(move |txn| {
// Remove chunks, and let cascading do its job.
txn.execute(
"DELETE FROM linked_chunks WHERE room_id = ?",
(&hashed_room_id,),
)
})
.await?;
)?;
}
Update::StartReattachItems | Update::EndReattachItems => {
// Nothing.
}
}
}
Update::StartReattachItems | Update::EndReattachItems => {
// Nothing.
}
}
}
Ok(())
})
.await?;
Ok(())
}
@@ -1414,6 +1368,45 @@ mod tests {
check_test_event(&events[0], "beaufort is the best");
});
}
#[async_test]
async fn test_linked_chunk_update_is_a_transaction() {
let store = get_event_cache_store().await.expect("creating cache store failed");
let room_id = *DEFAULT_TEST_ROOM_ID;
// Trigger a violation of the unique constraint on the (room id, chunk id)
// couple.
let err = store
.handle_linked_chunk_updates(
room_id,
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(42),
next: None,
},
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(42),
next: None,
},
],
)
.await
.unwrap_err();
// The operation fails with a constraint violation error.
assert_matches!(err, crate::error::Error::Sqlite(err) => {
assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
});
// If the updates have been handled transactionally, then no new chunks should
// have been added; failure of the second update leads to the first one being
// rolled back.
let chunks = store.load_chunks(room_id).await.unwrap();
assert!(chunks.is_empty());
}
}
#[cfg(test)]