diff --git a/crates/matrix-sdk/src/event_cache/room/events.rs b/crates/matrix-sdk/src/event_cache/room/events.rs index a650ae9fb..a09f6d699 100644 --- a/crates/matrix-sdk/src/event_cache/room/events.rs +++ b/crates/matrix-sdk/src/event_cache/room/events.rs @@ -19,7 +19,7 @@ use matrix_sdk_base::{ event_cache::store::DEFAULT_CHUNK_CAPACITY, linked_chunk::{ lazy_loader::{self, LazyLoaderError}, - ChunkContent, RawChunk, + ChunkContent, ChunkIdentifierGenerator, RawChunk, }, }; use matrix_sdk_common::linked_chunk::{ @@ -86,6 +86,18 @@ impl RoomEvents { self.chunks.clear(); } + /// Replace the events with the given last chunk of events and generator. + /// + /// This clears all the chunks in memory before resetting to the new chunk, + /// if provided. + pub(super) fn replace_with( + &mut self, + last_chunk: Option>, + chunk_identifier_generator: ChunkIdentifierGenerator, + ) -> Result<(), LazyLoaderError> { + lazy_loader::replace_with(&mut self.chunks, last_chunk, chunk_identifier_generator) + } + /// If the given event is a redaction, try to retrieve the to-be-redacted /// event in the chunk, and replace it by the redacted form. #[instrument(skip_all)] diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index cde3008f5..7047f7d5e 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -461,6 +461,8 @@ impl RoomEventCacheInner { }) .await?; + // If there was a previous batch token, and there's at least one non-duplicated + // We must do this *after* the above call to `.with_events_mut`, so the new { // Fill the AllEventsCache. let mut all_events = self.all_events.write().await; @@ -530,12 +532,12 @@ mod private { use matrix_sdk_base::{ deserialized_responses::{TimelineEvent, TimelineEventKind}, event_cache::{store::EventCacheStoreLock, Event}, - linked_chunk::{lazy_loader, ChunkContent, Update}, + linked_chunk::{lazy_loader, ChunkContent, ChunkIdentifierGenerator, Update}, }; use matrix_sdk_common::executor::spawn; use once_cell::sync::OnceCell; use ruma::{serde::Raw, OwnedEventId, OwnedRoomId}; - use tracing::{error, instrument, trace}; + use tracing::{debug, error, instrument, trace}; use super::{events::RoomEvents, LoadMoreEventsBackwardsOutcome}; use crate::event_cache::{deduplicator::Deduplicator, EventCacheError}; @@ -740,6 +742,60 @@ mod private { }) } + /// If storage is enabled, unload all the chunks, then reloads only the + /// last one. + /// + /// Will return `Some` updates to be consumed by the caller, if and only + /// if storage is enabled. Otherwise, is a no-op. + #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"] + pub(super) async fn shrink_to_last_chunk( + &mut self, + ) -> Result>>, EventCacheError> { + let Some(store) = self.store.get() else { + // No need to do anything if there's no storage; we'll already reset the + // timeline after a limited response. + // TODO: that might be a way to unify our code, though? + return Ok(None); + }; + + let store_lock = store.lock().await?; + + // Attempt to load the last chunk. + let (last_chunk, chunk_identifier_generator) = match store_lock + .load_last_chunk(&self.room) + .await + { + Ok(pair) => pair, + + Err(err) => { + // If loading the last chunk failed, clear the entire linked chunk. + error!("error when reloading a linked chunk from memory: {err}"); + + // Clear storage for this room. + store_lock.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?; + + // Restart with an empty linked chunk. + (None, ChunkIdentifierGenerator::new_from_scratch()) + } + }; + + debug!("unloading the linked chunk, and resetting it to its last chunk"); + + // Remove all the chunks from the linked chunks, except for the last one, and + // updates the chunk identifier generator. + if let Err(err) = self.events.replace_with(last_chunk, chunk_identifier_generator) { + error!("error when replacing the linked chunk: {err}"); + return self.reset().await.map(Some); + } + + // Don't propagate those updates to the store; this is only for the in-memory + // representation that we're doing this. Let's drain those store updates. + let _ = self.events.store_updates().take(); + + // However, we want to get updates as `VectorDiff`s, for the external listeners. + Ok(Some(self.events.updates_as_vector_diffs())) + } + /// Removes the bundled relations from an event, if they were present. /// /// Only replaces the present if it contained bundled relations. @@ -1767,4 +1823,122 @@ mod tests { let related_event_id = related_events[0].event_id().unwrap(); assert_eq!(related_event_id, related_id); } + + #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support. + #[async_test] + async fn test_shrink_to_last_chunk() { + use std::ops::Not as _; + + use eyeball_im::VectorDiff; + + use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate}; + + let room_id = room_id!("!galette:saucisse.bzh"); + + let client = MockClientBuilder::new("http://localhost".to_owned()).build().await; + + let f = EventFactory::new().room(room_id); + + let evid1 = event_id!("$1"); + let evid2 = event_id!("$2"); + + let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event(); + let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event(); + + // Fill the event cache store with an initial linked chunk with 2 events chunks. + { + let store = client.event_cache_store(); + let store = store.lock().await.unwrap(); + store + .handle_linked_chunk_updates( + room_id, + vec![ + Update::NewItemsChunk { + previous: None, + new: ChunkIdentifier::new(0), + next: None, + }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(0), 0), + items: vec![ev1], + }, + Update::NewItemsChunk { + previous: Some(ChunkIdentifier::new(0)), + new: ChunkIdentifier::new(1), + next: None, + }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(1), 0), + items: vec![ev2], + }, + ], + ) + .await + .unwrap(); + } + + let event_cache = client.event_cache(); + event_cache.subscribe().unwrap(); + event_cache.enable_storage().unwrap(); + + client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); + let room = client.get_room(room_id).unwrap(); + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + // Sanity check: lazily loaded, so only includes one item at start. + let (events, mut stream) = room_event_cache.subscribe().await; + assert_eq!(events.len(), 1); + assert_eq!(events[0].event_id().as_deref(), Some(evid2)); + assert!(stream.is_empty()); + + // Force loading the full linked chunk by back-paginating. + let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap(); + assert_eq!(outcome.events.len(), 1); + assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1)); + assert!(outcome.reached_start.not()); + + // We also get an update about the loading from the store. + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv() + ); + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => { + assert_eq!(value.event_id().as_deref(), Some(evid1)); + }); + + assert!(stream.is_empty()); + + // Shrink the linked chunk to the last chunk. + let diffs = room_event_cache + .inner + .state + .write() + .await + .shrink_to_last_chunk() + .await + .expect("shrinking should succeed") + .expect("there must be updates"); + + // We receive updates about the changes to the linked chunk. + assert_eq!(diffs.len(), 2); + assert_matches!(&diffs[0], VectorDiff::Clear); + assert_matches!(&diffs[1], VectorDiff::Append { values} => { + assert_eq!(values.len(), 1); + assert_eq!(values[0].event_id().as_deref(), Some(evid2)); + }); + + assert!(stream.is_empty()); + + // When reading the events, we do get only the last one. + let (events, _) = room_event_cache.subscribe().await; + assert_eq!(events.len(), 1); + assert_eq!(events[0].event_id().as_deref(), Some(evid2)); + + // But if we back-paginate, we don't need access to network to find out about + // the previous event. + let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap(); + assert_eq!(outcome.events.len(), 1); + assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1)); + assert!(outcome.reached_start.not()); + } } diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index 757326b9c..e0e72ebd5 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -1175,6 +1175,8 @@ async fn test_no_gap_stored_after_deduplicated_sync() { drop(events); + let pagination = room_event_cache.pagination(); + // Backpagination will return nothing. server .mock_room_messages() @@ -1183,13 +1185,17 @@ async fn test_no_gap_stored_after_deduplicated_sync() { .mount() .await; - let pagination = room_event_cache.pagination(); - - // Run pagination once: it will consume the unique gap we had. - pagination.run_backwards_once(20).await.unwrap(); + // The first sync was limited, so we have unloaded the full linked chunk, and it + // only contains the events returned by the sync. + // + // The first back-pagination will hit the network, and let us know we've reached + // the end of the room. + let outcome = pagination.run_backwards_once(20).await.unwrap(); + assert!(outcome.reached_start); + assert!(outcome.events.is_empty()); // Now simulate that the sync returns the same events (which can happen with - // simplified sliding sync). + // simplified sliding sync), also as a limited sync. server .sync_room( &client, @@ -1202,18 +1208,43 @@ async fn test_no_gap_stored_after_deduplicated_sync() { assert!(stream.is_empty()); - // If this back-pagination fails, that's because we've stored a gap that's - // useless. It should be short-circuited because there's no previous gap. + { + let (events, _) = room_event_cache.subscribe().await; + assert_event_matches_msg(&events[0], "hello"); + assert_event_matches_msg(&events[1], "world"); + assert_event_matches_msg(&events[2], "sup"); + assert_eq!(events.len(), 3); + } + + // If any of the following back-paginations fail with a network error, that's + // because we've stored a gap that's useless. All back-paginations must be + // loading from the store. + // + // The sync was limited, which unloaded the linked chunk, and reloaded only the + // final events chunk. + // + // There's an empty events chunk at the start of *every* linked chunk, so the + // next pagination will return it, and since the chunk is lazily loaded, the + // pagination doesn't know *yet* it's reached the start of the linked chunk. + let outcome = pagination.run_backwards_once(20).await.unwrap(); + assert!(outcome.events.is_empty()); + assert!(!outcome.reached_start); + + { + let (events, _) = room_event_cache.subscribe().await; + assert_event_matches_msg(&events[0], "hello"); + assert_event_matches_msg(&events[1], "world"); + assert_event_matches_msg(&events[2], "sup"); + assert_eq!(events.len(), 3); + } + + // Now, lazy-loading notices we've reached the start of the chunk, and reports + // it as such. + + let outcome = pagination.run_backwards_once(20).await.unwrap(); + assert!(outcome.events.is_empty()); assert!(outcome.reached_start); - - let (events, stream) = room_event_cache.subscribe().await; - assert_event_matches_msg(&events[0], "hello"); - assert_event_matches_msg(&events[1], "world"); - assert_event_matches_msg(&events[2], "sup"); - assert_eq!(events.len(), 3); - - assert!(stream.is_empty()); } #[async_test] @@ -1273,10 +1304,11 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() { ); assert_eq!(diffs.len(), 2); - // `$ev3` is duplicated, the older `$ev3` event is removed - assert_matches!(&diffs[0], VectorDiff::Remove { index } => { - assert_eq!(*index, 0); - }); + // The linked chunk is unloaded, because of the limited sync with a gap: + // It's first cleared… + assert_matches!(&diffs[0], VectorDiff::Clear); + + // Then the latest event chunk is reloaded. // `$ev1`, `$ev2` and `$ev3` are added. assert_matches!(&diffs[1], VectorDiff::Append { values: events } => { assert_eq!(events.len(), 3); @@ -1296,6 +1328,24 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() { .mount() .await; + // Run pagination once: it will consume prev-batch2 first, which is the most + // recent token, which returns an empty batch, thus indicating the start of the + // room; but we still have a chunk in storage, so it appears like it's not the + // start *yet*. + let pagination = room_event_cache.pagination(); + + let outcome = pagination.run_backwards_once(20).await.unwrap(); + assert!(outcome.reached_start); + assert!(outcome.events.is_empty()); + assert!(stream.is_empty()); + + // Next, we lazy-load a next chunk from the store, and get the initial, empty + // default events chunk. + let outcome = pagination.run_backwards_once(20).await.unwrap(); + assert!(outcome.reached_start.not()); + assert!(outcome.events.is_empty()); + assert!(stream.is_empty()); + // For prev-batch, the back-pagination returns two events we already know, and a // previous batch token. server @@ -1310,30 +1360,25 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() { .mount() .await; - let pagination = room_event_cache.pagination(); - - // Run pagination once: it will consume prev-batch2 first, which is the most - // recent token. - let outcome = pagination.run_backwards_once(20).await.unwrap(); - - // The pagination is empty: no new event. - assert!(outcome.reached_start); - assert!(outcome.events.is_empty()); - assert!(stream.is_empty()); - // Run pagination a second time: it will consume prev-batch, which is the least // recent token. let outcome = pagination.run_backwards_once(20).await.unwrap(); - - // The pagination contains deduplicated events; they are all deduplicated; the - // gap is replaced by zero event: nothing happens. assert!(outcome.reached_start.not()); assert!(outcome.events.is_empty()); assert!(stream.is_empty()); - // If this back-pagination fails, that's because we've stored a gap that's - // useless. It should be short-circuited because storing the previous gap was - // useless. + // If this back-pagination fails, that's because it's trying to hit network. In + // that case, it means we stored the gap with the prev-batch3 token, while + // we shouldn't have to, since it is useless; all events were deduplicated + // from the previous pagination. + + // Instead, we're lazy-loading an empty events chunks. + let outcome = pagination.run_backwards_once(20).await.unwrap(); + assert!(outcome.reached_start.not()); + assert!(outcome.events.is_empty()); + assert!(stream.is_empty()); + + // And finally hit the start of the timeline. let outcome = pagination.run_backwards_once(20).await.unwrap(); assert!(outcome.reached_start); assert!(outcome.events.is_empty());