feat(event cache): implement RoomEventCacheState::shrink_to_last_chunk

This commit is contained in:
Benjamin Bouvier
2025-02-19 15:49:51 +01:00
parent 4f47868930
commit e64cb2c4f1
3 changed files with 270 additions and 39 deletions

View File

@@ -19,7 +19,7 @@ use matrix_sdk_base::{
event_cache::store::DEFAULT_CHUNK_CAPACITY,
linked_chunk::{
lazy_loader::{self, LazyLoaderError},
ChunkContent, RawChunk,
ChunkContent, ChunkIdentifierGenerator, RawChunk,
},
};
use matrix_sdk_common::linked_chunk::{
@@ -86,6 +86,18 @@ impl RoomEvents {
self.chunks.clear();
}
/// Replace the events with the given last chunk of events and generator.
///
/// This clears all the chunks in memory before resetting to the new chunk,
/// if provided.
pub(super) fn replace_with(
&mut self,
last_chunk: Option<RawChunk<Event, Gap>>,
chunk_identifier_generator: ChunkIdentifierGenerator,
) -> Result<(), LazyLoaderError> {
lazy_loader::replace_with(&mut self.chunks, last_chunk, chunk_identifier_generator)
}
/// If the given event is a redaction, try to retrieve the to-be-redacted
/// event in the chunk, and replace it by the redacted form.
#[instrument(skip_all)]

View File

@@ -461,6 +461,8 @@ impl RoomEventCacheInner {
})
.await?;
// If there was a previous batch token, and there's at least one non-duplicated
// We must do this *after* the above call to `.with_events_mut`, so the new
{
// Fill the AllEventsCache.
let mut all_events = self.all_events.write().await;
@@ -530,12 +532,12 @@ mod private {
use matrix_sdk_base::{
deserialized_responses::{TimelineEvent, TimelineEventKind},
event_cache::{store::EventCacheStoreLock, Event},
linked_chunk::{lazy_loader, ChunkContent, Update},
linked_chunk::{lazy_loader, ChunkContent, ChunkIdentifierGenerator, Update},
};
use matrix_sdk_common::executor::spawn;
use once_cell::sync::OnceCell;
use ruma::{serde::Raw, OwnedEventId, OwnedRoomId};
use tracing::{error, instrument, trace};
use tracing::{debug, error, instrument, trace};
use super::{events::RoomEvents, LoadMoreEventsBackwardsOutcome};
use crate::event_cache::{deduplicator::Deduplicator, EventCacheError};
@@ -740,6 +742,60 @@ mod private {
})
}
/// If storage is enabled, unload all the chunks, then reloads only the
/// last one.
///
/// Will return `Some` updates to be consumed by the caller, if and only
/// if storage is enabled. Otherwise, is a no-op.
#[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
pub(super) async fn shrink_to_last_chunk(
&mut self,
) -> Result<Option<Vec<VectorDiff<TimelineEvent>>>, EventCacheError> {
let Some(store) = self.store.get() else {
// No need to do anything if there's no storage; we'll already reset the
// timeline after a limited response.
// TODO: that might be a way to unify our code, though?
return Ok(None);
};
let store_lock = store.lock().await?;
// Attempt to load the last chunk.
let (last_chunk, chunk_identifier_generator) = match store_lock
.load_last_chunk(&self.room)
.await
{
Ok(pair) => pair,
Err(err) => {
// If loading the last chunk failed, clear the entire linked chunk.
error!("error when reloading a linked chunk from memory: {err}");
// Clear storage for this room.
store_lock.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
// Restart with an empty linked chunk.
(None, ChunkIdentifierGenerator::new_from_scratch())
}
};
debug!("unloading the linked chunk, and resetting it to its last chunk");
// Remove all the chunks from the linked chunks, except for the last one, and
// updates the chunk identifier generator.
if let Err(err) = self.events.replace_with(last_chunk, chunk_identifier_generator) {
error!("error when replacing the linked chunk: {err}");
return self.reset().await.map(Some);
}
// Don't propagate those updates to the store; this is only for the in-memory
// representation that we're doing this. Let's drain those store updates.
let _ = self.events.store_updates().take();
// However, we want to get updates as `VectorDiff`s, for the external listeners.
Ok(Some(self.events.updates_as_vector_diffs()))
}
/// Removes the bundled relations from an event, if they were present.
///
/// Only replaces the present if it contained bundled relations.
@@ -1767,4 +1823,122 @@ mod tests {
let related_event_id = related_events[0].event_id().unwrap();
assert_eq!(related_event_id, related_id);
}
#[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
#[async_test]
async fn test_shrink_to_last_chunk() {
use std::ops::Not as _;
use eyeball_im::VectorDiff;
use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
let room_id = room_id!("!galette:saucisse.bzh");
let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
let f = EventFactory::new().room(room_id);
let evid1 = event_id!("$1");
let evid2 = event_id!("$2");
let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
// Fill the event cache store with an initial linked chunk with 2 events chunks.
{
let store = client.event_cache_store();
let store = store.lock().await.unwrap();
store
.handle_linked_chunk_updates(
room_id,
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![ev1],
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(0)),
new: ChunkIdentifier::new(1),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(1), 0),
items: vec![ev2],
},
],
)
.await
.unwrap();
}
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
event_cache.enable_storage().unwrap();
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
// Sanity check: lazily loaded, so only includes one item at start.
let (events, mut stream) = room_event_cache.subscribe().await;
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_id().as_deref(), Some(evid2));
assert!(stream.is_empty());
// Force loading the full linked chunk by back-paginating.
let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
assert_eq!(outcome.events.len(), 1);
assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
assert!(outcome.reached_start.not());
// We also get an update about the loading from the store.
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
assert_eq!(value.event_id().as_deref(), Some(evid1));
});
assert!(stream.is_empty());
// Shrink the linked chunk to the last chunk.
let diffs = room_event_cache
.inner
.state
.write()
.await
.shrink_to_last_chunk()
.await
.expect("shrinking should succeed")
.expect("there must be updates");
// We receive updates about the changes to the linked chunk.
assert_eq!(diffs.len(), 2);
assert_matches!(&diffs[0], VectorDiff::Clear);
assert_matches!(&diffs[1], VectorDiff::Append { values} => {
assert_eq!(values.len(), 1);
assert_eq!(values[0].event_id().as_deref(), Some(evid2));
});
assert!(stream.is_empty());
// When reading the events, we do get only the last one.
let (events, _) = room_event_cache.subscribe().await;
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_id().as_deref(), Some(evid2));
// But if we back-paginate, we don't need access to network to find out about
// the previous event.
let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
assert_eq!(outcome.events.len(), 1);
assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
assert!(outcome.reached_start.not());
}
}

View File

@@ -1175,6 +1175,8 @@ async fn test_no_gap_stored_after_deduplicated_sync() {
drop(events);
let pagination = room_event_cache.pagination();
// Backpagination will return nothing.
server
.mock_room_messages()
@@ -1183,13 +1185,17 @@ async fn test_no_gap_stored_after_deduplicated_sync() {
.mount()
.await;
let pagination = room_event_cache.pagination();
// Run pagination once: it will consume the unique gap we had.
pagination.run_backwards_once(20).await.unwrap();
// The first sync was limited, so we have unloaded the full linked chunk, and it
// only contains the events returned by the sync.
//
// The first back-pagination will hit the network, and let us know we've reached
// the end of the room.
let outcome = pagination.run_backwards_once(20).await.unwrap();
assert!(outcome.reached_start);
assert!(outcome.events.is_empty());
// Now simulate that the sync returns the same events (which can happen with
// simplified sliding sync).
// simplified sliding sync), also as a limited sync.
server
.sync_room(
&client,
@@ -1202,18 +1208,43 @@ async fn test_no_gap_stored_after_deduplicated_sync() {
assert!(stream.is_empty());
// If this back-pagination fails, that's because we've stored a gap that's
// useless. It should be short-circuited because there's no previous gap.
{
let (events, _) = room_event_cache.subscribe().await;
assert_event_matches_msg(&events[0], "hello");
assert_event_matches_msg(&events[1], "world");
assert_event_matches_msg(&events[2], "sup");
assert_eq!(events.len(), 3);
}
// If any of the following back-paginations fail with a network error, that's
// because we've stored a gap that's useless. All back-paginations must be
// loading from the store.
//
// The sync was limited, which unloaded the linked chunk, and reloaded only the
// final events chunk.
//
// There's an empty events chunk at the start of *every* linked chunk, so the
// next pagination will return it, and since the chunk is lazily loaded, the
// pagination doesn't know *yet* it's reached the start of the linked chunk.
let outcome = pagination.run_backwards_once(20).await.unwrap();
assert!(outcome.events.is_empty());
assert!(!outcome.reached_start);
{
let (events, _) = room_event_cache.subscribe().await;
assert_event_matches_msg(&events[0], "hello");
assert_event_matches_msg(&events[1], "world");
assert_event_matches_msg(&events[2], "sup");
assert_eq!(events.len(), 3);
}
// Now, lazy-loading notices we've reached the start of the chunk, and reports
// it as such.
let outcome = pagination.run_backwards_once(20).await.unwrap();
assert!(outcome.events.is_empty());
assert!(outcome.reached_start);
let (events, stream) = room_event_cache.subscribe().await;
assert_event_matches_msg(&events[0], "hello");
assert_event_matches_msg(&events[1], "world");
assert_event_matches_msg(&events[2], "sup");
assert_eq!(events.len(), 3);
assert!(stream.is_empty());
}
#[async_test]
@@ -1273,10 +1304,11 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() {
);
assert_eq!(diffs.len(), 2);
// `$ev3` is duplicated, the older `$ev3` event is removed
assert_matches!(&diffs[0], VectorDiff::Remove { index } => {
assert_eq!(*index, 0);
});
// The linked chunk is unloaded, because of the limited sync with a gap:
// It's first cleared…
assert_matches!(&diffs[0], VectorDiff::Clear);
// Then the latest event chunk is reloaded.
// `$ev1`, `$ev2` and `$ev3` are added.
assert_matches!(&diffs[1], VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 3);
@@ -1296,6 +1328,24 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() {
.mount()
.await;
// Run pagination once: it will consume prev-batch2 first, which is the most
// recent token, which returns an empty batch, thus indicating the start of the
// room; but we still have a chunk in storage, so it appears like it's not the
// start *yet*.
let pagination = room_event_cache.pagination();
let outcome = pagination.run_backwards_once(20).await.unwrap();
assert!(outcome.reached_start);
assert!(outcome.events.is_empty());
assert!(stream.is_empty());
// Next, we lazy-load a next chunk from the store, and get the initial, empty
// default events chunk.
let outcome = pagination.run_backwards_once(20).await.unwrap();
assert!(outcome.reached_start.not());
assert!(outcome.events.is_empty());
assert!(stream.is_empty());
// For prev-batch, the back-pagination returns two events we already know, and a
// previous batch token.
server
@@ -1310,30 +1360,25 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() {
.mount()
.await;
let pagination = room_event_cache.pagination();
// Run pagination once: it will consume prev-batch2 first, which is the most
// recent token.
let outcome = pagination.run_backwards_once(20).await.unwrap();
// The pagination is empty: no new event.
assert!(outcome.reached_start);
assert!(outcome.events.is_empty());
assert!(stream.is_empty());
// Run pagination a second time: it will consume prev-batch, which is the least
// recent token.
let outcome = pagination.run_backwards_once(20).await.unwrap();
// The pagination contains deduplicated events; they are all deduplicated; the
// gap is replaced by zero event: nothing happens.
assert!(outcome.reached_start.not());
assert!(outcome.events.is_empty());
assert!(stream.is_empty());
// If this back-pagination fails, that's because we've stored a gap that's
// useless. It should be short-circuited because storing the previous gap was
// useless.
// If this back-pagination fails, that's because it's trying to hit network. In
// that case, it means we stored the gap with the prev-batch3 token, while
// we shouldn't have to, since it is useless; all events were deduplicated
// from the previous pagination.
// Instead, we're lazy-loading an empty events chunks.
let outcome = pagination.run_backwards_once(20).await.unwrap();
assert!(outcome.reached_start.not());
assert!(outcome.events.is_empty());
assert!(stream.is_empty());
// And finally hit the start of the timeline.
let outcome = pagination.run_backwards_once(20).await.unwrap();
assert!(outcome.reached_start);
assert!(outcome.events.is_empty());