From 87a6037924074fda6568b2b8a4a3e57aeda26f6d Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 27 Feb 2025 15:17:14 +0100 Subject: [PATCH] refactor(event cache): consolidate logic around returning the previous gap token --- .../matrix-sdk/src/event_cache/pagination.rs | 108 +++++++++++------- .../matrix-sdk/src/event_cache/room/events.rs | 10 ++ crates/matrix-sdk/src/event_cache/room/mod.rs | 97 ++++++++++------ .../tests/integration/event_cache.rs | 2 +- 4 files changed, 138 insertions(+), 79 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index 1978d7f4c..c0c16da07 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -178,38 +178,70 @@ impl RoomPagination { // to load from storage first, then from network if storage indicated // there's no previous events chunk to load. - match self.inner.state.write().await.load_more_events_backwards().await? { - LoadMoreEventsBackwardsOutcome::Gap => { - // We have a gap, so resolve it with a network back-pagination. - } + loop { + let mut state_guard = self.inner.state.write().await; - LoadMoreEventsBackwardsOutcome::StartOfTimeline => { - return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] })) - } + match state_guard.load_more_events_backwards().await? { + LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken => { + const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3); - LoadMoreEventsBackwardsOutcome::Events { - events, - timeline_event_diffs, - reached_start, - } => { - if !timeline_event_diffs.is_empty() { - let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { - diffs: timeline_event_diffs, - origin: EventsOrigin::Pagination, - }); + // Release the state guard while waiting, to not deadlock the sync task. + drop(state_guard); + + // Otherwise, wait for a notification that we received a previous-batch token. + trace!("waiting for a pagination token…"); + let _ = timeout( + self.inner.pagination_batch_token_notifier.notified(), + DEFAULT_WAIT_FOR_TOKEN_DURATION, + ) + .await; + trace!("done waiting"); + + self.inner.state.write().await.waited_for_initial_prev_token = true; + + // Retry! + // + // Note: the next call to `load_more_events_backwards` can't return + // `WaitForInitialPrevToken` because we've just set to + // `waited_for_initial_prev_token`, so this is not an infinite loop. + // + // Note 2: not a recursive call, because recursive and async have a bad time + // together. + continue; } - return Ok(Some(BackPaginationOutcome { + LoadMoreEventsBackwardsOutcome::Gap { prev_token } => { + // We have a gap, so resolve it with a network back-pagination. + drop(state_guard); + return self.paginate_backwards_with_network(batch_size, prev_token).await; + } + + LoadMoreEventsBackwardsOutcome::StartOfTimeline => { + return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] })); + } + + LoadMoreEventsBackwardsOutcome::Events { + events, + timeline_event_diffs, reached_start, - // This is a backwards pagination. `BackPaginationOutcome` expects events to - // be in “reverse order”. - events: events.into_iter().rev().collect(), - })); + } => { + if !timeline_event_diffs.is_empty() { + let _ = + self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { + diffs: timeline_event_diffs, + origin: EventsOrigin::Pagination, + }); + } + + return Ok(Some(BackPaginationOutcome { + reached_start, + // This is a backwards pagination. `BackPaginationOutcome` expects events to + // be in “reverse order”. + events: events.into_iter().rev().collect(), + })); + } } } - - // Alright, try network. - self.paginate_backwards_with_network(batch_size).await } /// Run a single pagination request (/messages) to the server. @@ -221,20 +253,8 @@ impl RoomPagination { async fn paginate_backwards_with_network( &self, batch_size: u16, + prev_token: Option, ) -> Result> { - const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3); - - let prev_token = self.get_or_wait_for_token(Some(DEFAULT_WAIT_FOR_TOKEN_DURATION)).await; - - let prev_token = match prev_token { - PaginationToken::HasMore(token) => Some(token), - PaginationToken::None => None, - PaginationToken::HitEnd => { - debug!("Not back-paginating since we've reached the start of the timeline."); - return Ok(Some(BackPaginationOutcome { reached_start: true, events: Vec::new() })); - } - }; - let (events, new_gap) = { let Some(room) = self.inner.weak_room.get() else { // The client is shutting down, return an empty default response. @@ -264,12 +284,12 @@ impl RoomPagination { // Check that the previous token still exists; otherwise it's a sign that the // room's timeline has been cleared. - let prev_gap_id = if let Some(token) = prev_token { - let gap_id = state.events().chunk_identifier(|chunk| { + let prev_gap_chunk_id = if let Some(token) = prev_token { + let gap_chunk_id = state.events().chunk_identifier(|chunk| { matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token) }); - if gap_id.is_none() { + if gap_chunk_id.is_none() { // We got a previous-batch token from the linked chunk *before* running the // request, but it is missing *after* completing the // request. @@ -279,12 +299,14 @@ impl RoomPagination { return Ok(None); } - gap_id + gap_chunk_id } else { None }; - self.handle_network_pagination_result(state, events, new_gap, prev_gap_id).await.map(Some) + self.handle_network_pagination_result(state, events, new_gap, prev_gap_chunk_id) + .await + .map(Some) } /// Handle the result of a successful network back-pagination. diff --git a/crates/matrix-sdk/src/event_cache/room/events.rs b/crates/matrix-sdk/src/event_cache/room/events.rs index af740d5c5..94727e18c 100644 --- a/crates/matrix-sdk/src/event_cache/room/events.rs +++ b/crates/matrix-sdk/src/event_cache/room/events.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use as_variant::as_variant; use eyeball_im::VectorDiff; pub use matrix_sdk_base::event_cache::{Event, Gap}; use matrix_sdk_base::{ @@ -345,6 +346,15 @@ impl RoomEvents { result } + + /// Return the latest gap, if any. + /// + /// Latest means "closest to the end", or, since events are ordered + /// according to the sync ordering, this means "the most recent one". + pub fn rgap(&self) -> Option { + self.rchunks() + .find_map(|chunk| as_variant!(chunk.content(), ChunkContent::Gap(gap) => gap.clone())) + } } // Private implementations, implementation specific. diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index af7da5559..25080ce2e 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -631,11 +631,15 @@ impl RoomEventCacheInner { } /// Internal type to represent the output of -/// `RoomEventCacheState::load_more_events_backwards`. +/// [`RoomEventCacheState::load_more_events_backwards`]. #[derive(Debug)] pub(super) enum LoadMoreEventsBackwardsOutcome { /// A gap has been inserted. - Gap, + Gap { + /// The previous batch token to be used as the "end" parameter in the + /// back-pagination request. + prev_token: Option, + }, /// The start of the timeline has been reached. StartOfTimeline, @@ -646,6 +650,9 @@ pub(super) enum LoadMoreEventsBackwardsOutcome { timeline_event_diffs: Vec>, reached_start: bool, }, + + /// The caller must wait for the initial previous-batch token, and retry. + WaitForInitialPrevToken, } // Use a private module to hide `events` to this parent module. @@ -811,21 +818,52 @@ mod private { Ok((deduplication_outcome, all_duplicates)) } + /// Given a fully-loaded linked chunk with no gaps, return the + /// [`LoadMoreEventsBackwardsOutcome`] expected for this room's cache. + fn conclude_load_more_for_fully_loaded_chunk(&mut self) -> LoadMoreEventsBackwardsOutcome { + // If we never received events for this room, this means we've never + // received a sync for that room, because every room must have at least a + // room creation event. Otherwise, we have reached the start of the + // timeline. + if self.events.events().next().is_some() { + // If there's at least one event, this means we've reached the start of the + // timeline, since the chunk is fully loaded. + LoadMoreEventsBackwardsOutcome::StartOfTimeline + } else if !self.waited_for_initial_prev_token { + // There's no events. Since we haven't yet, wait for an initial previous-token. + LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken + } else { + // Otherwise, we've already waited, *and* received no previous-batch token from + // the sync, *and* there are still no events in the fully-loaded + // chunk: start back-pagination from the end of the room. + LoadMoreEventsBackwardsOutcome::Gap { prev_token: None } + } + } + /// Load more events backwards if the last chunk is **not** a gap. - #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"] pub(in super::super) async fn load_more_events_backwards( &mut self, ) -> Result { let Some(store) = self.store.get() else { - // No store: no events to insert. Pretend the caller has to act as if a gap was - // present. - return Ok(LoadMoreEventsBackwardsOutcome::Gap); + // No store to reload events from. Pretend the caller has to act as if a gap was + // present. Limited syncs will always clear and push a gap, in this mode. + // There's no lazy-loading. + + // Look for a gap in the in-memory chunk, iterating in reverse so as to get the + // most recent one. + if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) { + return Ok(LoadMoreEventsBackwardsOutcome::Gap { + prev_token: Some(prev_token), + }); + } + + return Ok(self.conclude_load_more_for_fully_loaded_chunk()); }; // If any in-memory chunk is a gap, don't load more events, and let the caller // resolve the gap. - if self.events.chunks().any(|chunk| chunk.is_gap()) { - return Ok(LoadMoreEventsBackwardsOutcome::Gap); + if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) { + return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) }); } // Because `first_chunk` is `not `Send`, get this information before the @@ -833,57 +871,43 @@ mod private { let first_chunk_identifier = self.events.chunks().next().expect("a linked chunk is never empty").identifier(); - let room_id = &self.room; let store = store.lock().await?; // The first chunk is not a gap, we can load its previous chunk. let new_first_chunk = - match store.load_previous_chunk(room_id, first_chunk_identifier).await { + match store.load_previous_chunk(&self.room, first_chunk_identifier).await { Ok(Some(new_first_chunk)) => { // All good, let's continue with this chunk. new_first_chunk } Ok(None) => { - // No previous chunk: no events to insert. This means one of two things: - // - either the linked chunk is at the start of the timeline, - // - or we haven't received any back-pagination token yet, and we should - // wait for one. - if self.waited_for_initial_prev_token { - return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline); - } - // If we haven't waited yet, we request to resolve the gap, once we get the - // previous-batch token from sync. - return Ok(LoadMoreEventsBackwardsOutcome::Gap); + // There's no previous chunk. The chunk is now fully-loaded. Conclude. + return Ok(self.conclude_load_more_for_fully_loaded_chunk()); } Err(err) => { error!("error when loading the previous chunk of a linked chunk: {err}"); // Clear storage for this room. - store.handle_linked_chunk_updates(room_id, vec![Update::Clear]).await?; + store.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?; // Return the error. return Err(err.into()); } }; - let events = match &new_first_chunk.content { - ChunkContent::Gap(_) => None, - ChunkContent::Items(events) => { - // We've reached the start on disk, if and only if, there was no chunk prior to - // the one we just loaded. - let reached_start = new_first_chunk.previous.is_none(); + let chunk_content = new_first_chunk.content.clone(); - Some((events.clone(), reached_start)) - } - }; + // We've reached the start on disk, if and only if, there was no chunk prior to + // the one we just loaded. + let reached_start = new_first_chunk.previous.is_none(); if let Err(err) = self.events.insert_new_chunk_as_first(new_first_chunk) { error!("error when inserting the previous chunk into its linked chunk: {err}"); // Clear storage for this room. - store.handle_linked_chunk_updates(room_id, vec![Update::Clear]).await?; + store.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?; // Return the error. return Err(err.into()); @@ -896,9 +920,12 @@ mod private { // However, we want to get updates as `VectorDiff`s. let timeline_event_diffs = self.events.updates_as_vector_diffs(); - Ok(match events { - None => LoadMoreEventsBackwardsOutcome::Gap, - Some((events, reached_start)) => LoadMoreEventsBackwardsOutcome::Events { + Ok(match chunk_content { + ChunkContent::Gap(gap) => { + LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) } + } + + ChunkContent::Items(events) => LoadMoreEventsBackwardsOutcome::Events { events, timeline_event_diffs, reached_start, @@ -2027,7 +2054,7 @@ mod tests { // But if I manually reload more of the chunk, the gap will be present. assert_matches!( state.load_more_events_backwards().await.unwrap(), - LoadMoreEventsBackwardsOutcome::Gap + LoadMoreEventsBackwardsOutcome::Gap { .. } ); num_gaps = 0; diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index 146992843..b2b7683e9 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -738,8 +738,8 @@ async fn test_backpaginating_without_token() { assert!(reached_start); // And we get notified about the new event. - assert_event_matches_msg(&events[0], "hi"); assert_eq!(events.len(), 1); + assert_event_matches_msg(&events[0], "hi"); assert_let_timeout!( Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()