fix(event cache): wait for the initial previous-batch token, if there wasn't any

This commit is contained in:
Benjamin Bouvier
2025-02-26 13:27:51 +01:00
parent f9f389d9ec
commit 4742aa298a
2 changed files with 87 additions and 4 deletions

View File

@@ -845,9 +845,16 @@ mod private {
}
Ok(None) => {
// No previous chunk: no events to insert. Better, it means we've reached
// the start of the timeline!
return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
// No previous chunk: no events to insert. This means one of two things:
// - either the linked chunk is at the start of the timeline,
// - or we haven't received any back-pagination token yet, and we should
// wait for one.
if self.waited_for_initial_prev_token {
return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
}
// If we haven't waited yet, we request to resolve the gap, once we get the
// previous-batch token from sync.
return Ok(LoadMoreEventsBackwardsOutcome::Gap);
}
Err(err) => {
@@ -1072,7 +1079,6 @@ mod private {
#[instrument(skip_all)]
async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
let updates = self.events.store_updates().take();
self.send_updates_to_store(updates).await
}
@@ -1190,8 +1196,19 @@ mod private {
func: F,
) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
func(&mut self.events);
self.propagate_changes().await?;
// If we've never waited for an initial previous-batch token, and we now have at
// least one gap in the chunk, no need to wait for a previous-batch token later.
if !self.waited_for_initial_prev_token
&& self.events.chunks().any(|chunk| chunk.is_gap())
{
self.waited_for_initial_prev_token = true;
}
let updates_as_vector_diffs = self.events.updates_as_vector_diffs();
Ok(updates_as_vector_diffs)
}
}

View File

@@ -833,6 +833,72 @@ async fn test_limited_timeline_resets_pagination() {
assert!(room_stream.is_empty());
}
#[async_test]
async fn test_persistent_storage_waits_for_pagination_token() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
// Immediately subscribe the event cache to sync updates.
event_cache.subscribe().unwrap();
// TODO: remove this test when persistent storage is enabled by default, as it's
// doing the same as the one above.
event_cache.enable_storage().unwrap();
// If I sync and get informed I've joined The Room, without a previous batch
// token,
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await;
assert!(events.is_empty());
assert!(room_stream.is_empty());
server
.mock_room_messages()
.ok(RoomMessagesResponseTemplate::default()
.events(vec![f.text_msg("hi").event_id(event_id!("$2")).into_raw_timeline()]))
.mock_once()
.mount()
.await;
// At the beginning, the paginator is in the initial state.
let pagination = room_event_cache.pagination();
let mut pagination_status = pagination.status();
assert_eq!(pagination_status.get(), RoomPaginationStatus::Idle { hit_timeline_start: false });
// If we try to back-paginate with a token, it will hit the end of the timeline
// and give us the resulting event.
let BackPaginationOutcome { events, reached_start } =
pagination.run_backwards_once(20).await.unwrap();
assert_eq!(events.len(), 1);
assert!(reached_start);
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 1);
assert_event_matches_msg(&events[0], "hi");
});
// And the paginator state delivers this as an update, and is internally
// consistent with it:
assert_next_matches_with_timeout!(
pagination_status,
RoomPaginationStatus::Idle { hit_timeline_start: true }
);
assert!(room_stream.is_empty());
}
#[async_test]
async fn test_limited_timeline_with_storage() {
let server = MatrixMockServer::new().await;