mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-19 06:04:31 -04:00
refactor(event cache): consolidate logic around returning the previous gap token
This commit is contained in:
@@ -178,38 +178,70 @@ impl RoomPagination {
|
||||
// to load from storage first, then from network if storage indicated
|
||||
// there's no previous events chunk to load.
|
||||
|
||||
match self.inner.state.write().await.load_more_events_backwards().await? {
|
||||
LoadMoreEventsBackwardsOutcome::Gap => {
|
||||
// We have a gap, so resolve it with a network back-pagination.
|
||||
}
|
||||
loop {
|
||||
let mut state_guard = self.inner.state.write().await;
|
||||
|
||||
LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
|
||||
return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }))
|
||||
}
|
||||
match state_guard.load_more_events_backwards().await? {
|
||||
LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken => {
|
||||
const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
|
||||
|
||||
LoadMoreEventsBackwardsOutcome::Events {
|
||||
events,
|
||||
timeline_event_diffs,
|
||||
reached_start,
|
||||
} => {
|
||||
if !timeline_event_diffs.is_empty() {
|
||||
let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
|
||||
diffs: timeline_event_diffs,
|
||||
origin: EventsOrigin::Pagination,
|
||||
});
|
||||
// Release the state guard while waiting, to not deadlock the sync task.
|
||||
drop(state_guard);
|
||||
|
||||
// Otherwise, wait for a notification that we received a previous-batch token.
|
||||
trace!("waiting for a pagination token…");
|
||||
let _ = timeout(
|
||||
self.inner.pagination_batch_token_notifier.notified(),
|
||||
DEFAULT_WAIT_FOR_TOKEN_DURATION,
|
||||
)
|
||||
.await;
|
||||
trace!("done waiting");
|
||||
|
||||
self.inner.state.write().await.waited_for_initial_prev_token = true;
|
||||
|
||||
// Retry!
|
||||
//
|
||||
// Note: the next call to `load_more_events_backwards` can't return
|
||||
// `WaitForInitialPrevToken` because we've just set to
|
||||
// `waited_for_initial_prev_token`, so this is not an infinite loop.
|
||||
//
|
||||
// Note 2: not a recursive call, because recursive and async have a bad time
|
||||
// together.
|
||||
continue;
|
||||
}
|
||||
|
||||
return Ok(Some(BackPaginationOutcome {
|
||||
LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
|
||||
// We have a gap, so resolve it with a network back-pagination.
|
||||
drop(state_guard);
|
||||
return self.paginate_backwards_with_network(batch_size, prev_token).await;
|
||||
}
|
||||
|
||||
LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
|
||||
return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
|
||||
}
|
||||
|
||||
LoadMoreEventsBackwardsOutcome::Events {
|
||||
events,
|
||||
timeline_event_diffs,
|
||||
reached_start,
|
||||
// This is a backwards pagination. `BackPaginationOutcome` expects events to
|
||||
// be in “reverse order”.
|
||||
events: events.into_iter().rev().collect(),
|
||||
}));
|
||||
} => {
|
||||
if !timeline_event_diffs.is_empty() {
|
||||
let _ =
|
||||
self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
|
||||
diffs: timeline_event_diffs,
|
||||
origin: EventsOrigin::Pagination,
|
||||
});
|
||||
}
|
||||
|
||||
return Ok(Some(BackPaginationOutcome {
|
||||
reached_start,
|
||||
// This is a backwards pagination. `BackPaginationOutcome` expects events to
|
||||
// be in “reverse order”.
|
||||
events: events.into_iter().rev().collect(),
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Alright, try network.
|
||||
self.paginate_backwards_with_network(batch_size).await
|
||||
}
|
||||
|
||||
/// Run a single pagination request (/messages) to the server.
|
||||
@@ -221,20 +253,8 @@ impl RoomPagination {
|
||||
async fn paginate_backwards_with_network(
|
||||
&self,
|
||||
batch_size: u16,
|
||||
prev_token: Option<String>,
|
||||
) -> Result<Option<BackPaginationOutcome>> {
|
||||
const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
|
||||
|
||||
let prev_token = self.get_or_wait_for_token(Some(DEFAULT_WAIT_FOR_TOKEN_DURATION)).await;
|
||||
|
||||
let prev_token = match prev_token {
|
||||
PaginationToken::HasMore(token) => Some(token),
|
||||
PaginationToken::None => None,
|
||||
PaginationToken::HitEnd => {
|
||||
debug!("Not back-paginating since we've reached the start of the timeline.");
|
||||
return Ok(Some(BackPaginationOutcome { reached_start: true, events: Vec::new() }));
|
||||
}
|
||||
};
|
||||
|
||||
let (events, new_gap) = {
|
||||
let Some(room) = self.inner.weak_room.get() else {
|
||||
// The client is shutting down, return an empty default response.
|
||||
@@ -264,12 +284,12 @@ impl RoomPagination {
|
||||
|
||||
// Check that the previous token still exists; otherwise it's a sign that the
|
||||
// room's timeline has been cleared.
|
||||
let prev_gap_id = if let Some(token) = prev_token {
|
||||
let gap_id = state.events().chunk_identifier(|chunk| {
|
||||
let prev_gap_chunk_id = if let Some(token) = prev_token {
|
||||
let gap_chunk_id = state.events().chunk_identifier(|chunk| {
|
||||
matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token)
|
||||
});
|
||||
|
||||
if gap_id.is_none() {
|
||||
if gap_chunk_id.is_none() {
|
||||
// We got a previous-batch token from the linked chunk *before* running the
|
||||
// request, but it is missing *after* completing the
|
||||
// request.
|
||||
@@ -279,12 +299,14 @@ impl RoomPagination {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
gap_id
|
||||
gap_chunk_id
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
self.handle_network_pagination_result(state, events, new_gap, prev_gap_id).await.map(Some)
|
||||
self.handle_network_pagination_result(state, events, new_gap, prev_gap_chunk_id)
|
||||
.await
|
||||
.map(Some)
|
||||
}
|
||||
|
||||
/// Handle the result of a successful network back-pagination.
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use as_variant::as_variant;
|
||||
use eyeball_im::VectorDiff;
|
||||
pub use matrix_sdk_base::event_cache::{Event, Gap};
|
||||
use matrix_sdk_base::{
|
||||
@@ -345,6 +346,15 @@ impl RoomEvents {
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Return the latest gap, if any.
|
||||
///
|
||||
/// Latest means "closest to the end", or, since events are ordered
|
||||
/// according to the sync ordering, this means "the most recent one".
|
||||
pub fn rgap(&self) -> Option<Gap> {
|
||||
self.rchunks()
|
||||
.find_map(|chunk| as_variant!(chunk.content(), ChunkContent::Gap(gap) => gap.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
// Private implementations, implementation specific.
|
||||
|
||||
@@ -631,11 +631,15 @@ impl RoomEventCacheInner {
|
||||
}
|
||||
|
||||
/// Internal type to represent the output of
|
||||
/// `RoomEventCacheState::load_more_events_backwards`.
|
||||
/// [`RoomEventCacheState::load_more_events_backwards`].
|
||||
#[derive(Debug)]
|
||||
pub(super) enum LoadMoreEventsBackwardsOutcome {
|
||||
/// A gap has been inserted.
|
||||
Gap,
|
||||
Gap {
|
||||
/// The previous batch token to be used as the "end" parameter in the
|
||||
/// back-pagination request.
|
||||
prev_token: Option<String>,
|
||||
},
|
||||
|
||||
/// The start of the timeline has been reached.
|
||||
StartOfTimeline,
|
||||
@@ -646,6 +650,9 @@ pub(super) enum LoadMoreEventsBackwardsOutcome {
|
||||
timeline_event_diffs: Vec<VectorDiff<TimelineEvent>>,
|
||||
reached_start: bool,
|
||||
},
|
||||
|
||||
/// The caller must wait for the initial previous-batch token, and retry.
|
||||
WaitForInitialPrevToken,
|
||||
}
|
||||
|
||||
// Use a private module to hide `events` to this parent module.
|
||||
@@ -811,21 +818,52 @@ mod private {
|
||||
Ok((deduplication_outcome, all_duplicates))
|
||||
}
|
||||
|
||||
/// Given a fully-loaded linked chunk with no gaps, return the
|
||||
/// [`LoadMoreEventsBackwardsOutcome`] expected for this room's cache.
|
||||
fn conclude_load_more_for_fully_loaded_chunk(&mut self) -> LoadMoreEventsBackwardsOutcome {
|
||||
// If we never received events for this room, this means we've never
|
||||
// received a sync for that room, because every room must have at least a
|
||||
// room creation event. Otherwise, we have reached the start of the
|
||||
// timeline.
|
||||
if self.events.events().next().is_some() {
|
||||
// If there's at least one event, this means we've reached the start of the
|
||||
// timeline, since the chunk is fully loaded.
|
||||
LoadMoreEventsBackwardsOutcome::StartOfTimeline
|
||||
} else if !self.waited_for_initial_prev_token {
|
||||
// There's no events. Since we haven't yet, wait for an initial previous-token.
|
||||
LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken
|
||||
} else {
|
||||
// Otherwise, we've already waited, *and* received no previous-batch token from
|
||||
// the sync, *and* there are still no events in the fully-loaded
|
||||
// chunk: start back-pagination from the end of the room.
|
||||
LoadMoreEventsBackwardsOutcome::Gap { prev_token: None }
|
||||
}
|
||||
}
|
||||
|
||||
/// Load more events backwards if the last chunk is **not** a gap.
|
||||
#[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
|
||||
pub(in super::super) async fn load_more_events_backwards(
|
||||
&mut self,
|
||||
) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
|
||||
let Some(store) = self.store.get() else {
|
||||
// No store: no events to insert. Pretend the caller has to act as if a gap was
|
||||
// present.
|
||||
return Ok(LoadMoreEventsBackwardsOutcome::Gap);
|
||||
// No store to reload events from. Pretend the caller has to act as if a gap was
|
||||
// present. Limited syncs will always clear and push a gap, in this mode.
|
||||
// There's no lazy-loading.
|
||||
|
||||
// Look for a gap in the in-memory chunk, iterating in reverse so as to get the
|
||||
// most recent one.
|
||||
if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) {
|
||||
return Ok(LoadMoreEventsBackwardsOutcome::Gap {
|
||||
prev_token: Some(prev_token),
|
||||
});
|
||||
}
|
||||
|
||||
return Ok(self.conclude_load_more_for_fully_loaded_chunk());
|
||||
};
|
||||
|
||||
// If any in-memory chunk is a gap, don't load more events, and let the caller
|
||||
// resolve the gap.
|
||||
if self.events.chunks().any(|chunk| chunk.is_gap()) {
|
||||
return Ok(LoadMoreEventsBackwardsOutcome::Gap);
|
||||
if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) {
|
||||
return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
|
||||
}
|
||||
|
||||
// Because `first_chunk` is `not `Send`, get this information before the
|
||||
@@ -833,57 +871,43 @@ mod private {
|
||||
let first_chunk_identifier =
|
||||
self.events.chunks().next().expect("a linked chunk is never empty").identifier();
|
||||
|
||||
let room_id = &self.room;
|
||||
let store = store.lock().await?;
|
||||
|
||||
// The first chunk is not a gap, we can load its previous chunk.
|
||||
let new_first_chunk =
|
||||
match store.load_previous_chunk(room_id, first_chunk_identifier).await {
|
||||
match store.load_previous_chunk(&self.room, first_chunk_identifier).await {
|
||||
Ok(Some(new_first_chunk)) => {
|
||||
// All good, let's continue with this chunk.
|
||||
new_first_chunk
|
||||
}
|
||||
|
||||
Ok(None) => {
|
||||
// No previous chunk: no events to insert. This means one of two things:
|
||||
// - either the linked chunk is at the start of the timeline,
|
||||
// - or we haven't received any back-pagination token yet, and we should
|
||||
// wait for one.
|
||||
if self.waited_for_initial_prev_token {
|
||||
return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
|
||||
}
|
||||
// If we haven't waited yet, we request to resolve the gap, once we get the
|
||||
// previous-batch token from sync.
|
||||
return Ok(LoadMoreEventsBackwardsOutcome::Gap);
|
||||
// There's no previous chunk. The chunk is now fully-loaded. Conclude.
|
||||
return Ok(self.conclude_load_more_for_fully_loaded_chunk());
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
error!("error when loading the previous chunk of a linked chunk: {err}");
|
||||
|
||||
// Clear storage for this room.
|
||||
store.handle_linked_chunk_updates(room_id, vec![Update::Clear]).await?;
|
||||
store.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
|
||||
|
||||
// Return the error.
|
||||
return Err(err.into());
|
||||
}
|
||||
};
|
||||
|
||||
let events = match &new_first_chunk.content {
|
||||
ChunkContent::Gap(_) => None,
|
||||
ChunkContent::Items(events) => {
|
||||
// We've reached the start on disk, if and only if, there was no chunk prior to
|
||||
// the one we just loaded.
|
||||
let reached_start = new_first_chunk.previous.is_none();
|
||||
let chunk_content = new_first_chunk.content.clone();
|
||||
|
||||
Some((events.clone(), reached_start))
|
||||
}
|
||||
};
|
||||
// We've reached the start on disk, if and only if, there was no chunk prior to
|
||||
// the one we just loaded.
|
||||
let reached_start = new_first_chunk.previous.is_none();
|
||||
|
||||
if let Err(err) = self.events.insert_new_chunk_as_first(new_first_chunk) {
|
||||
error!("error when inserting the previous chunk into its linked chunk: {err}");
|
||||
|
||||
// Clear storage for this room.
|
||||
store.handle_linked_chunk_updates(room_id, vec![Update::Clear]).await?;
|
||||
store.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
|
||||
|
||||
// Return the error.
|
||||
return Err(err.into());
|
||||
@@ -896,9 +920,12 @@ mod private {
|
||||
// However, we want to get updates as `VectorDiff`s.
|
||||
let timeline_event_diffs = self.events.updates_as_vector_diffs();
|
||||
|
||||
Ok(match events {
|
||||
None => LoadMoreEventsBackwardsOutcome::Gap,
|
||||
Some((events, reached_start)) => LoadMoreEventsBackwardsOutcome::Events {
|
||||
Ok(match chunk_content {
|
||||
ChunkContent::Gap(gap) => {
|
||||
LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
|
||||
}
|
||||
|
||||
ChunkContent::Items(events) => LoadMoreEventsBackwardsOutcome::Events {
|
||||
events,
|
||||
timeline_event_diffs,
|
||||
reached_start,
|
||||
@@ -2027,7 +2054,7 @@ mod tests {
|
||||
// But if I manually reload more of the chunk, the gap will be present.
|
||||
assert_matches!(
|
||||
state.load_more_events_backwards().await.unwrap(),
|
||||
LoadMoreEventsBackwardsOutcome::Gap
|
||||
LoadMoreEventsBackwardsOutcome::Gap { .. }
|
||||
);
|
||||
|
||||
num_gaps = 0;
|
||||
|
||||
@@ -738,8 +738,8 @@ async fn test_backpaginating_without_token() {
|
||||
assert!(reached_start);
|
||||
|
||||
// And we get notified about the new event.
|
||||
assert_event_matches_msg(&events[0], "hi");
|
||||
assert_eq!(events.len(), 1);
|
||||
assert_event_matches_msg(&events[0], "hi");
|
||||
|
||||
assert_let_timeout!(
|
||||
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
|
||||
|
||||
Reference in New Issue
Block a user