mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-12 01:45:29 -04:00
feat(sdk): The EventCache loads only the last chunk when initialised.
This patch updates `RoomEventCacheState::new` to load a single chunks of events instead of all events. It solves bugs where all events were loaded, while removing the gaps in between, thus the `Timeline` wasn't able to load the missing events to fill the gaps.
This commit is contained in:
@@ -38,6 +38,7 @@ use eyeball_im::VectorDiff;
|
||||
use matrix_sdk_base::{
|
||||
deserialized_responses::{AmbiguityChange, TimelineEvent},
|
||||
event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
|
||||
linked_chunk::LinkedChunkBuilderError,
|
||||
store_locks::LockStoreError,
|
||||
sync::RoomUpdates,
|
||||
};
|
||||
@@ -109,6 +110,12 @@ pub enum EventCacheError {
|
||||
/// times where we try to use the client.
|
||||
#[error("The owning client of the event cache has been dropped.")]
|
||||
ClientDropped,
|
||||
|
||||
/// An error happening when interacting with the [`LinkedChunkBuilder`].
|
||||
///
|
||||
/// [`LinkedChunkBuilder`]: matrix_sdk_common::linked_chunk::LinkedChunkBuilder
|
||||
#[error(transparent)]
|
||||
LinkedChunkBuilder(#[from] LinkedChunkBuilderError),
|
||||
}
|
||||
|
||||
/// A result using the [`EventCacheError`].
|
||||
|
||||
@@ -526,21 +526,15 @@ mod private {
|
||||
use eyeball_im::VectorDiff;
|
||||
use matrix_sdk_base::{
|
||||
deserialized_responses::{TimelineEvent, TimelineEventKind},
|
||||
event_cache::{
|
||||
store::{
|
||||
EventCacheStoreError, EventCacheStoreLock, EventCacheStoreLockGuard,
|
||||
DEFAULT_CHUNK_CAPACITY,
|
||||
},
|
||||
Event, Gap,
|
||||
},
|
||||
linked_chunk::{LinkedChunk, LinkedChunkBuilder, RawChunk, Update},
|
||||
event_cache::{store::EventCacheStoreLock, Event},
|
||||
linked_chunk::{LinkedChunkBuilder, Update},
|
||||
};
|
||||
use matrix_sdk_common::executor::spawn;
|
||||
use once_cell::sync::OnceCell;
|
||||
use ruma::{serde::Raw, OwnedEventId, OwnedRoomId, RoomId};
|
||||
use ruma::{serde::Raw, OwnedEventId, OwnedRoomId};
|
||||
use tracing::{error, instrument, trace};
|
||||
|
||||
use super::{chunk_debug_string, events::RoomEvents};
|
||||
use super::events::RoomEvents;
|
||||
use crate::event_cache::{deduplicator::Deduplicator, EventCacheError};
|
||||
|
||||
/// State for a single room's event cache.
|
||||
@@ -571,47 +565,39 @@ mod private {
|
||||
}
|
||||
|
||||
impl RoomEventCacheState {
|
||||
async fn try_reload_linked_chunk(
|
||||
room: &RoomId,
|
||||
locked: &EventCacheStoreLockGuard<'_>,
|
||||
) -> Result<Option<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>>, EventCacheError>
|
||||
{
|
||||
let raw_chunks = locked.load_all_chunks(room).await?;
|
||||
|
||||
let mut builder = LinkedChunkBuilder::from_raw_parts(raw_chunks.clone());
|
||||
|
||||
builder.with_update_history();
|
||||
|
||||
Ok(builder.build().map_err(|err| {
|
||||
// Show a debug string representing the known chunks.
|
||||
if tracing::enabled!(tracing::Level::TRACE) {
|
||||
trace!("couldn't build a linked chunk from the raw parts. Raw chunks:");
|
||||
for line in raw_chunks_debug_string(raw_chunks) {
|
||||
trace!("{line}");
|
||||
}
|
||||
}
|
||||
|
||||
EventCacheStoreError::InvalidData { details: err.to_string() }
|
||||
})?)
|
||||
}
|
||||
|
||||
/// Create a new state, or reload it from storage if it's been enabled.
|
||||
///
|
||||
/// Not all events are going to be loaded. Only a portion of them. The
|
||||
/// [`RoomEvents`] relies on a [`LinkedChunk`] to store all events. Only
|
||||
/// the last chunk will be loaded. It means the events are loaded from
|
||||
/// the most recent to the oldest. To load more events, see
|
||||
/// [`Self::load_more_events_backwards`].
|
||||
///
|
||||
/// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
|
||||
pub async fn new(
|
||||
room: OwnedRoomId,
|
||||
room_id: OwnedRoomId,
|
||||
store: Arc<OnceCell<EventCacheStoreLock>>,
|
||||
) -> Result<Self, EventCacheError> {
|
||||
let (events, deduplicator) = if let Some(store) = store.get() {
|
||||
let locked = store.lock().await?;
|
||||
let store_lock = store.lock().await?;
|
||||
|
||||
// Try to reload a linked chunk from storage. If it fails, log the error and
|
||||
// restart with a fresh, empty linked chunk.
|
||||
let linked_chunk = match Self::try_reload_linked_chunk(&room, &locked).await {
|
||||
let linked_chunk = match store_lock
|
||||
.load_last_chunk(&room_id)
|
||||
.await
|
||||
.map_err(EventCacheError::from)
|
||||
.and_then(|(last_chunk, chunk_identifier_generator)| {
|
||||
LinkedChunkBuilder::from_last_chunk(last_chunk, chunk_identifier_generator)
|
||||
.map_err(EventCacheError::from)
|
||||
}) {
|
||||
Ok(linked_chunk) => linked_chunk,
|
||||
|
||||
Err(err) => {
|
||||
error!("error when reloading a linked chunk from memory: {err}");
|
||||
|
||||
// Clear storage for this room.
|
||||
locked.handle_linked_chunk_updates(&room, vec![Update::Clear]).await?;
|
||||
store_lock
|
||||
.handle_linked_chunk_updates(&room_id, vec![Update::Clear])
|
||||
.await?;
|
||||
|
||||
// Restart with an empty linked chunk.
|
||||
None
|
||||
@@ -619,17 +605,23 @@ mod private {
|
||||
};
|
||||
|
||||
(
|
||||
RoomEvents::with_initial_chunks(linked_chunk),
|
||||
Deduplicator::new_store_based(room.clone(), store.clone()),
|
||||
RoomEvents::with_initial_linked_chunk(linked_chunk),
|
||||
Deduplicator::new_store_based(room_id.clone(), store.clone()),
|
||||
)
|
||||
} else {
|
||||
(RoomEvents::default(), Deduplicator::new_memory_based())
|
||||
};
|
||||
|
||||
Ok(Self { room, store, events, deduplicator, waited_for_initial_prev_token: false })
|
||||
Ok(Self {
|
||||
room: room_id,
|
||||
store,
|
||||
events,
|
||||
deduplicator,
|
||||
waited_for_initial_prev_token: false,
|
||||
})
|
||||
}
|
||||
|
||||
/// Deduplicate `events` considering all events in `Self::chunks`.
|
||||
/// Deduplicate `events` considering all events in `Self::events`.
|
||||
///
|
||||
/// The returned tuple contains:
|
||||
/// - all events (duplicated or not) with an ID
|
||||
@@ -755,7 +747,7 @@ mod private {
|
||||
spawn(async move {
|
||||
let store = store.lock().await?;
|
||||
|
||||
if let Err(err) = locked.handle_linked_chunk_updates(&room_id, updates).await {
|
||||
if let Err(err) = store.handle_linked_chunk_updates(&room_id, updates).await {
|
||||
error!("unable to handle linked chunk updates: {err}");
|
||||
}
|
||||
|
||||
@@ -801,72 +793,6 @@ mod private {
|
||||
Ok((output, updates_as_vector_diffs))
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a debug string over multiple lines (one String per line),
|
||||
/// offering a debug representation of a [`RawChunk`] loaded from disk.
|
||||
fn raw_chunks_debug_string(mut raw_chunks: Vec<RawChunk<Event, Gap>>) -> Vec<String> {
|
||||
let mut result = Vec::new();
|
||||
|
||||
// Sort the chunks by id, for the output to be deterministic.
|
||||
raw_chunks.sort_by_key(|c| c.identifier.index());
|
||||
|
||||
for c in raw_chunks {
|
||||
let content = chunk_debug_string(&c.content);
|
||||
|
||||
let prev =
|
||||
c.previous.map_or_else(|| "<none>".to_owned(), |prev| prev.index().to_string());
|
||||
let next = c.next.map_or_else(|| "<none>".to_owned(), |prev| prev.index().to_string());
|
||||
|
||||
let line =
|
||||
format!("chunk #{} (prev={prev}, next={next}): {content}", c.identifier.index());
|
||||
|
||||
result.push(line);
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use matrix_sdk_base::{
|
||||
event_cache::Gap,
|
||||
linked_chunk::{ChunkContent, ChunkIdentifier as CId, RawChunk},
|
||||
};
|
||||
use matrix_sdk_test::{event_factory::EventFactory, ALICE, DEFAULT_TEST_ROOM_ID};
|
||||
use ruma::event_id;
|
||||
|
||||
use super::raw_chunks_debug_string;
|
||||
|
||||
#[test]
|
||||
fn test_raw_chunks_debug_string() {
|
||||
let mut raws = Vec::new();
|
||||
let f = EventFactory::new().room(&DEFAULT_TEST_ROOM_ID).sender(*ALICE);
|
||||
|
||||
raws.push(RawChunk {
|
||||
content: ChunkContent::Items(vec![
|
||||
f.text_msg("hey")
|
||||
.event_id(event_id!("$123456789101112131415617181920"))
|
||||
.into_event(),
|
||||
f.text_msg("you").event_id(event_id!("$2")).into_event(),
|
||||
]),
|
||||
identifier: CId::new(1),
|
||||
previous: Some(CId::new(0)),
|
||||
next: None,
|
||||
});
|
||||
|
||||
raws.push(RawChunk {
|
||||
content: ChunkContent::Gap(Gap { prev_token: "prev-token".to_owned() }),
|
||||
identifier: CId::new(0),
|
||||
previous: None,
|
||||
next: Some(CId::new(1)),
|
||||
});
|
||||
|
||||
let output = raw_chunks_debug_string(raws);
|
||||
assert_eq!(output.len(), 2);
|
||||
assert_eq!(&output[0], "chunk #0 (prev=<none>, next=1): gap['prev-token']");
|
||||
assert_eq!(&output[1], "chunk #1 (prev=0, next=<none>): events[$12345678, $2]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) use private::RoomEventCacheState;
|
||||
|
||||
Reference in New Issue
Block a user