refactor(event cache): handle pagination status in a single location

This commit is contained in:
Benjamin Bouvier
2025-02-26 12:02:02 +01:00
parent 86b5cb4dba
commit 061a2f739a
2 changed files with 54 additions and 27 deletions

View File

@@ -126,7 +126,51 @@ impl RoomPagination {
}
}
/// Paginate from either the storage or the network, and let pagination
/// status observers know about updates.
async fn run_backwards_impl(&self, batch_size: u16) -> Result<Option<BackPaginationOutcome>> {
// There is at least one gap that must be resolved; reach the network.
// First, ensure there's no other ongoing back-pagination.
let status_observable = &self.inner.pagination_status;
let prev_status = status_observable.set(RoomPaginationStatus::Paginating);
if !matches!(prev_status, RoomPaginationStatus::Idle { .. }) {
return Err(EventCacheError::AlreadyBackpaginating);
}
let reset_status_on_drop_guard = ResetStatusOnDrop {
prev_status: Some(prev_status),
pagination_status: status_observable.clone(),
};
match self.paginate_backwards_impl(batch_size).await? {
Some(outcome) => {
// Back-pagination's over and successful, don't reset the status to the previous
// value.
reset_status_on_drop_guard.disarm();
// Notify subscribers that pagination ended.
status_observable
.set(RoomPaginationStatus::Idle { hit_timeline_start: outcome.reached_start });
Ok(Some(outcome))
}
None => {
// We keep the previous status value, because we haven't obtained more
// information about the pagination.
Ok(None)
}
}
}
/// Paginate from either the storage or the network.
///
/// This method isn't concerned with setting the pagination status; only the
/// caller is.
async fn paginate_backwards_impl(
&self,
batch_size: u16,
) -> Result<Option<BackPaginationOutcome>> {
// A linked chunk might not be entirely loaded (if it's been lazy-loaded). Try
// to load from storage first, then from network if storage indicated
// there's no previous events chunk to load.
@@ -165,25 +209,18 @@ impl RoomPagination {
self.paginate_backwards_with_network(batch_size).await
}
/// Run a single pagination request (/messages) to the server.
///
/// If there's no previous-batch token, it will wait for one for a short
/// while to get one, or if it's already done so or seen a
/// previous-batch token before, it will immediately indicate
/// it's reached the end of the timeline.
async fn paginate_backwards_with_network(
&self,
batch_size: u16,
) -> Result<Option<BackPaginationOutcome>> {
const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
// There is at least one gap that must be resolved; reach the network.
// First, ensure there's no other ongoing back-pagination.
let prev_status = self.inner.pagination_status.set(RoomPaginationStatus::Paginating);
if !matches!(prev_status, RoomPaginationStatus::Idle { .. }) {
return Err(EventCacheError::AlreadyBackpaginating);
}
let reset_status_on_drop_guard = ResetStatusOnDrop {
prev_status: Some(prev_status),
pagination_status: self.inner.pagination_status.clone(),
};
let prev_token = self.get_or_wait_for_token(Some(DEFAULT_WAIT_FOR_TOKEN_DURATION)).await;
let prev_token = match prev_token {
@@ -222,11 +259,11 @@ impl RoomPagination {
matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token)
});
// We got a previous-batch token from the linked chunk *before* running the
// request, which is missing from the linked chunk *after*
// completing the request. It may be a sign the linked chunk has
// been reset, and it's an error in any case.
if gap_id.is_none() {
// We got a previous-batch token from the linked chunk *before* running the
// request, which is missing from the linked chunk *after* completing the
// request. It may be a sign the linked chunk has been reset,
// and it's an error in any case.
return Ok(None);
}
@@ -355,14 +392,6 @@ impl RoomPagination {
let backpagination_outcome = BackPaginationOutcome { events, reached_start };
// Back-pagination's over; time to disarm the status guard.
reset_status_on_drop_guard.disarm();
// Notify subscribers that pagination ended.
self.inner
.pagination_status
.set(RoomPaginationStatus::Idle { hit_timeline_start: reached_start });
if !sync_timeline_events_diffs.is_empty() {
let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
diffs: sync_timeline_events_diffs,

View File

@@ -848,8 +848,6 @@ mod private {
Ok(None) => {
// No previous chunk: no events to insert. Better, it means we've reached
// the start of the timeline!
self.pagination_status
.set(RoomPaginationStatus::Idle { hit_timeline_start: true });
return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
}