task(sqlite): Implement load_last_chunk and last_previous_chunk.

This patch replaces `todo!()` by real implementations for the
`load_last_chunk` and `last_previous_chunk` methods.
This commit is contained in:
Ivan Enderlin
2025-02-11 00:21:08 +01:00
parent 334c66b0a0
commit e2a2f32e82

View File

@@ -210,8 +210,7 @@ impl TransactionExtForLinkedChunks for Transaction<'_> {
match chunk_type {
CHUNK_TYPE_GAP_TYPE_STRING => {
// It's a gap! There's at most one row for it in the database, so a
// call to `query_row` is sufficient.
// It's a gap!
let gap = self.load_gap_content(store, room_id, id)?;
Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
}
@@ -242,6 +241,8 @@ impl TransactionExtForLinkedChunks for Transaction<'_> {
room_id: &Key,
chunk_id: ChunkIdentifier,
) -> Result<Gap> {
// There's at most one row for it in the database, so a call to `query_row` is
// sufficient.
let encoded_prev_token: Vec<u8> = self.query_row(
"SELECT prev_token FROM gaps WHERE chunk_id = ? AND room_id = ?",
(chunk_id.index(), &room_id),
@@ -273,9 +274,9 @@ impl TransactionExtForLinkedChunks for Transaction<'_> {
{
let encoded_content = event_data?;
let serialized_content = store.decode_value(&encoded_content)?;
let sync_timeline_event = serde_json::from_slice(&serialized_content)?;
let event = serde_json::from_slice(&serialized_content)?;
events.push(sync_timeline_event);
events.push(event);
}
Ok(events)
@@ -613,17 +614,128 @@ impl EventCacheStore for SqliteEventCacheStore {
async fn load_last_chunk(
&self,
_room_id: &RoomId,
room_id: &RoomId,
) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
todo!()
let room_id = room_id.to_owned();
let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
let this = self.clone();
self
.acquire()
.await?
.with_transaction(move |txn| -> Result<_> {
// Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`.
let chunk_identifier_generator = match txn
.prepare(
"SELECT MAX(id) FROM linked_chunks WHERE room_id = ?"
)?
.query_row(
(&hashed_room_id,),
|row| {
// Read the `MAX(id)` as an `Option<u64>` instead
// of `u64` in case the `SELECT` returns nothing.
// Indeed, if it returns no line, the `MAX(id)` is
// set to `Null`.
row.get::<_, Option<u64>>(0)
}
)
.optional()?
.flatten()
{
Some(last_chunk_identifier) => {
ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
ChunkIdentifier::new(last_chunk_identifier)
)
},
None => ChunkIdentifierGenerator::new_from_scratch(),
};
// Find the last chunk.
let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
.prepare(
"SELECT id, previous, type FROM linked_chunks WHERE room_id = ? AND next IS NULL"
)?
.query_row(
(&hashed_room_id,),
|row| {
Ok((
row.get::<_, u64>(0)?,
row.get::<_, Option<u64>>(1)?,
row.get::<_, String>(2)?,
))
}
)
.optional()?
else {
// Chunk is not found.
return Ok((None, chunk_identifier_generator));
};
// Build the chunk.
let last_chunk = txn.rebuild_chunk(
&this,
&hashed_room_id,
previous_chunk,
chunk_identifier,
None,
&chunk_type
)?;
Ok((Some(last_chunk), chunk_identifier_generator))
})
.await
}
async fn load_previous_chunk(
&self,
_room_id: &RoomId,
_before_chunk_identifier: ChunkIdentifier,
room_id: &RoomId,
before_chunk_identifier: ChunkIdentifier,
) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
todo!()
let room_id = room_id.to_owned();
let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
let this = self.clone();
self
.acquire()
.await?
.with_transaction(move |txn| -> Result<_> {
// Find the chunk before the chunk identified by `before_chunk_identifier`.
let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
.prepare(
"SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ? AND next = ?"
)?
.query_row(
(&hashed_room_id, before_chunk_identifier.index()),
|row| {
Ok((
row.get::<_, u64>(0)?,
row.get::<_, Option<u64>>(1)?,
row.get::<_, Option<u64>>(2)?,
row.get::<_, String>(3)?,
))
}
)
.optional()?
else {
// Chunk is not found.
return Ok(None);
};
// Build the chunk.
let last_chunk = txn.rebuild_chunk(
&this,
&hashed_room_id,
previous_chunk,
chunk_identifier,
next_chunk,
&chunk_type
)?;
Ok(Some(last_chunk))
})
.await
}
async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error> {