feat(event cache): don't restart back-pagination from the end if we had no prev-batch token

This commit is contained in:
Benjamin Bouvier
2024-12-16 16:44:36 +01:00
parent 36a3d3a8c4
commit 944a922001
4 changed files with 218 additions and 80 deletions

View File

@@ -1054,21 +1054,24 @@ async fn test_pending_edit_from_backpagination_doesnt_override_pending_edit_from
let mut h = PendingEditHelper::new().await;
let f = EventFactory::new();
let (_, mut timeline_stream) = h.timeline.subscribe().await;
// When I receive an edit live from a sync for an event I don't know about…
let original_event_id = event_id!("$original");
let edit_event_id = event_id!("$edit");
h.handle_sync(
JoinedRoomBuilder::new(&h.room_id).add_timeline_event(
f.text_msg("* hello")
.sender(&ALICE)
.event_id(edit_event_id)
.edit(original_event_id, RoomMessageEventContent::text_plain("[edit]").into()),
),
JoinedRoomBuilder::new(&h.room_id)
.add_timeline_event(
f.text_msg("* hello")
.sender(&ALICE)
.event_id(edit_event_id)
.edit(original_event_id, RoomMessageEventContent::text_plain("[edit]").into()),
)
.set_timeline_prev_batch("prev-batch-token".to_owned())
.set_timeline_limited(),
)
.await;
let (_, mut timeline_stream) = h.timeline.subscribe().await;
// And then I receive an edit from a back-pagination for the same event…
let edit_event_id2 = event_id!("$edit2");
h.handle_backpagination(
@@ -1084,7 +1087,7 @@ async fn test_pending_edit_from_backpagination_doesnt_override_pending_edit_from
.await;
// Nothing happens.
assert!(timeline_stream.next().now_or_never().is_none());
assert_matches!(timeline_stream.next().now_or_never(), None);
// And then I receive the original event after a bit…
h.handle_sync(

View File

@@ -62,7 +62,7 @@ mod pagination;
mod room;
pub mod paginator;
pub use pagination::{RoomPagination, TimelineHasBeenResetWhilePaginating};
pub use pagination::{PaginationToken, RoomPagination, TimelineHasBeenResetWhilePaginating};
pub use room::RoomEventCache;
/// An error observed in the [`EventCache`].

View File

@@ -35,6 +35,7 @@ use super::{
///
/// Can be created with [`super::RoomEventCache::pagination()`].
#[allow(missing_debug_implementations)]
#[derive(Clone)]
pub struct RoomPagination {
pub(super) inner: Arc<RoomEventCacheInner>,
}
@@ -123,6 +124,15 @@ impl RoomPagination {
let prev_token = self.get_or_wait_for_token(Some(DEFAULT_WAIT_FOR_TOKEN_DURATION)).await;
let prev_token = match prev_token {
PaginationToken::HasMore(token) => Some(token),
PaginationToken::None => None,
PaginationToken::HitEnd => {
debug!("Not back-paginating since we've reached the start of the timeline.");
return Ok(Some(BackPaginationOutcome { reached_start: true, events: Vec::new() }));
}
};
let paginator = &self.inner.paginator;
paginator.set_idle_state(PaginatorState::Idle, prev_token.clone(), None)?;
@@ -221,7 +231,7 @@ impl RoomPagination {
/// It will only wait if we *never* saw an initial previous-batch token.
/// Otherwise, it will immediately skip.
#[doc(hidden)]
pub async fn get_or_wait_for_token(&self, wait_time: Option<Duration>) -> Option<String> {
pub async fn get_or_wait_for_token(&self, wait_time: Option<Duration>) -> PaginationToken {
fn get_latest(events: &RoomEvents) -> Option<String> {
events.rchunks().find_map(|chunk| match chunk.content() {
ChunkContent::Gap(gap) => Some(gap.prev_token.clone()),
@@ -232,19 +242,35 @@ impl RoomPagination {
{
// Scope for the lock guard.
let state = self.inner.state.read().await;
// Check if the linked chunk contains any events. If so, absence of a gap means
// we've hit the start of the timeline. If not, absence of a gap
// means we've never received a pagination token from sync, and we
// should wait for one.
let has_events = state.events().events().next().is_some();
// Fast-path: we do have a previous-batch token already.
if let Some(found) = get_latest(state.events()) {
return Some(found);
return PaginationToken::HasMore(found);
}
// If we had events, and there was no gap, then we've hit the end of the
// timeline.
if has_events {
return PaginationToken::HitEnd;
}
// If we've already waited for an initial previous-batch token before,
// immediately abort.
if state.waited_for_initial_prev_token {
return None;
return PaginationToken::None;
}
}
// If the caller didn't set a wait time, return none early.
let wait_time = wait_time?;
let Some(wait_time) = wait_time else {
return PaginationToken::None;
};
// Otherwise, wait for a notification that we received a previous-batch token.
// Note the state lock is released while doing so, allowing other tasks to write
@@ -252,9 +278,17 @@ impl RoomPagination {
let _ = timeout(wait_time, self.inner.pagination_batch_token_notifier.notified()).await;
let mut state = self.inner.state.write().await;
let token = get_latest(state.events());
state.waited_for_initial_prev_token = true;
token
if let Some(token) = get_latest(state.events()) {
PaginationToken::HasMore(token)
} else if state.events().events().next().is_some() {
// See logic above, in the read lock guard scope.
PaginationToken::HitEnd
} else {
PaginationToken::None
}
}
/// Returns a subscriber to the pagination status used for the
@@ -281,8 +315,8 @@ impl RoomPagination {
}
/// Pagination token data, indicating in which state is the current pagination.
#[derive(Clone, Debug)]
pub(super) enum PaginationToken {
#[derive(Clone, Debug, PartialEq)]
pub enum PaginationToken {
/// We never had a pagination token, so we'll start back-paginating from the
/// end, or forward-paginating from the start.
None,
@@ -320,6 +354,7 @@ mod tests {
mod time_tests {
use std::time::{Duration, Instant};
use assert_matches::assert_matches;
use matrix_sdk_base::RoomState;
use matrix_sdk_test::{
async_test, event_factory::EventFactory, sync_timeline_event, ALICE,
@@ -328,7 +363,8 @@ mod tests {
use tokio::{spawn, time::sleep};
use crate::{
deserialized_responses::SyncTimelineEvent, event_cache::room::events::Gap,
deserialized_responses::SyncTimelineEvent,
event_cache::{pagination::PaginationToken, room::events::Gap},
test_utils::logged_in_client,
};
@@ -344,32 +380,15 @@ mod tests {
let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap();
// When I only have events in a room,
room_event_cache
.inner
.state
.write()
.await
.with_events_mut(|events| {
events.push_events([SyncTimelineEvent::new(sync_timeline_event!({
"sender": "b@z.h",
"type": "m.room.message",
"event_id": "$ida",
"origin_server_ts": 12344446,
"content": { "body":"yolo", "msgtype": "m.text" },
}))])
})
.await
.unwrap();
let pagination = room_event_cache.pagination();
// If I don't wait for the backpagination token,
// If I have a room with no events, and try to get a pagination token without
// waiting,
let found = pagination.get_or_wait_for_token(None).await;
// Then I don't find it.
assert!(found.is_none());
// Then I don't get any pagination token.
assert_matches!(found, PaginationToken::None);
// Reset waited_for_initial_prev_token state.
// Reset waited_for_initial_prev_token and event state.
pagination.inner.state.write().await.reset().await.unwrap();
// If I wait for a back-pagination token for 0 seconds,
@@ -377,7 +396,7 @@ mod tests {
let found = pagination.get_or_wait_for_token(Some(Duration::default())).await;
let waited = before.elapsed();
// then I don't get any,
assert!(found.is_none());
assert_matches!(found, PaginationToken::None);
// and I haven't waited long.
assert!(waited.as_secs() < 1);
@@ -389,12 +408,96 @@ mod tests {
let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(1))).await;
let waited = before.elapsed();
// then I still don't get any.
assert!(found.is_none());
assert_matches!(found, PaginationToken::None);
// and I've waited a bit.
assert!(waited.as_secs() < 2);
assert!(waited.as_secs() >= 1);
}
#[async_test]
async fn test_wait_hit_end_of_timeline() {
let client = logged_in_client(None).await;
let room_id = room_id!("!galette:saucisse.bzh");
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap();
let f = EventFactory::new().room(room_id).sender(*ALICE);
let pagination = room_event_cache.pagination();
// Add a previous event.
room_event_cache
.inner
.state
.write()
.await
.with_events_mut(|events| {
events
.push_events([f.text_msg("this is the start of the timeline").into_sync()]);
})
.await
.unwrap();
// If I have a room with events, and try to get a pagination token without
// waiting,
let found = pagination.get_or_wait_for_token(None).await;
// I've reached the start of the timeline.
assert_matches!(found, PaginationToken::HitEnd);
// If I wait for a back-pagination token for 0 seconds,
let before = Instant::now();
let found = pagination.get_or_wait_for_token(Some(Duration::default())).await;
let waited = before.elapsed();
// Then I still have reached the start of the timeline.
assert_matches!(found, PaginationToken::HitEnd);
// and I've waited very little.
assert!(waited.as_secs() < 1);
// If I wait for a back-pagination token for 1 second,
let before = Instant::now();
let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(1))).await;
let waited = before.elapsed();
// then I still don't get any.
assert_matches!(found, PaginationToken::HitEnd);
// and I've waited very little (there's no point in waiting in this case).
assert!(waited.as_secs() < 1);
// Now, reset state. We'll add an event *after* we've started waiting, this
// time.
room_event_cache.clear().await.unwrap();
spawn(async move {
sleep(Duration::from_secs(1)).await;
room_event_cache
.inner
.state
.write()
.await
.with_events_mut(|events| {
events.push_events([f
.text_msg("this is the start of the timeline")
.into_sync()]);
})
.await
.unwrap();
});
// If I wait for a pagination token,
let before = Instant::now();
let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(2))).await;
let waited = before.elapsed();
// since sync has returned all events, and no prior gap, I've hit the end.
assert_matches!(found, PaginationToken::HitEnd);
// and I've waited for the whole duration.
assert!(waited.as_secs() >= 2);
assert!(waited.as_secs() < 3);
}
#[async_test]
async fn test_wait_for_pagination_token_already_present() {
let client = logged_in_client(None).await;
@@ -435,14 +538,14 @@ mod tests {
// If I don't wait for a back-pagination token,
let found = pagination.get_or_wait_for_token(None).await;
// Then I get it.
assert_eq!(found.as_ref(), Some(&expected_token));
assert_eq!(found, PaginationToken::HasMore(expected_token.clone()));
// If I wait for a back-pagination token for 0 seconds,
let before = Instant::now();
let found = pagination.get_or_wait_for_token(Some(Duration::default())).await;
let waited = before.elapsed();
// then I do get one.
assert_eq!(found.as_ref(), Some(&expected_token));
assert_eq!(found, PaginationToken::HasMore(expected_token.clone()));
// and I haven't waited long.
assert!(waited.as_millis() < 100);
@@ -451,7 +554,7 @@ mod tests {
let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(1))).await;
let waited = before.elapsed();
// then I do get one.
assert_eq!(found, Some(expected_token));
assert_eq!(found, PaginationToken::HasMore(expected_token));
// and I haven't waited long.
assert!(waited.as_millis() < 100);
}
@@ -493,14 +596,14 @@ mod tests {
// Then first I don't get it (if I'm not waiting,)
let found = pagination.get_or_wait_for_token(None).await;
assert!(found.is_none());
assert_matches!(found, PaginationToken::None);
// And if I wait for the back-pagination token for 600ms,
let found = pagination.get_or_wait_for_token(Some(Duration::from_millis(600))).await;
let waited = before.elapsed();
// then I do get one eventually.
assert_eq!(found, Some(expected_token));
assert_eq!(found, PaginationToken::HasMore(expected_token));
// and I have waited between ~400 and ~1000 milliseconds.
assert!(waited.as_secs() < 1);
assert!(waited.as_millis() >= 400);
@@ -551,7 +654,7 @@ mod tests {
// Retrieving the pagination token will return the most recent one, not the old
// one.
let found = pagination.get_or_wait_for_token(None).await;
assert_eq!(found, Some(new_token));
assert_eq!(found, PaginationToken::HasMore(new_token));
}
}
}

View File

@@ -1,13 +1,14 @@
use std::{future::ready, ops::ControlFlow, time::Duration};
use assert_matches::assert_matches;
use assert_matches2::assert_let;
use futures_util::FutureExt as _;
use matrix_sdk::{
assert_let_timeout, assert_next_matches_with_timeout,
deserialized_responses::SyncTimelineEvent,
event_cache::{
paginator::PaginatorState, BackPaginationOutcome, EventCacheError, RoomEventCacheUpdate,
TimelineHasBeenResetWhilePaginating,
paginator::PaginatorState, BackPaginationOutcome, EventCacheError, PaginationToken,
RoomEventCacheUpdate, TimelineHasBeenResetWhilePaginating,
},
test_utils::{assert_event_matches_msg, mocks::MatrixMockServer},
};
@@ -16,7 +17,7 @@ use matrix_sdk_test::{
};
use ruma::{event_id, events::AnyTimelineEvent, room_id, serde::Raw, user_id};
use serde_json::json;
use tokio::{spawn, sync::broadcast};
use tokio::{spawn, sync::broadcast, time::sleep};
use wiremock::ResponseTemplate;
async fn once(
@@ -259,7 +260,7 @@ async fn test_backpaginate_once() {
// Then if I backpaginate,
let pagination = room_event_cache.pagination();
assert!(pagination.get_or_wait_for_token(None).await.is_some());
assert_matches!(pagination.get_or_wait_for_token(None).await, PaginationToken::HasMore(_));
pagination.run_backwards(20, once).await.unwrap()
};
@@ -351,7 +352,7 @@ async fn test_backpaginate_many_times_with_many_iterations() {
// Then if I backpaginate in a loop,
let pagination = room_event_cache.pagination();
while pagination.get_or_wait_for_token(None).await.is_some() {
while matches!(pagination.get_or_wait_for_token(None).await, PaginationToken::HasMore(_)) {
pagination
.run_backwards(20, |outcome, timeline_has_been_reset| {
num_paginations += 1;
@@ -470,7 +471,7 @@ async fn test_backpaginate_many_times_with_one_iteration() {
// Then if I backpaginate in a loop,
let pagination = room_event_cache.pagination();
while pagination.get_or_wait_for_token(None).await.is_some() {
while matches!(pagination.get_or_wait_for_token(None).await, PaginationToken::HasMore(_)) {
pagination
.run_backwards(20, |outcome, timeline_has_been_reset| {
num_paginations += 1;
@@ -606,8 +607,9 @@ async fn test_reset_while_backpaginating() {
// Run the pagination!
let pagination = room_event_cache.pagination();
let first_token = pagination.get_or_wait_for_token(None).await;
assert!(first_token.is_some());
assert_let!(
PaginationToken::HasMore(first_token) = pagination.get_or_wait_for_token(None).await
);
let backpagination = spawn({
let pagination = room_event_cache.pagination();
@@ -645,8 +647,10 @@ async fn test_reset_while_backpaginating() {
assert!(!events.is_empty());
// Now if we retrieve the oldest token, it's set to something else.
let second_token = pagination.get_or_wait_for_token(None).await.unwrap();
assert!(first_token.unwrap() != second_token);
assert_let!(
PaginationToken::HasMore(second_token) = pagination.get_or_wait_for_token(None).await
);
assert!(first_token != second_token);
assert_eq!(second_token, "third_backpagination");
}
@@ -687,7 +691,7 @@ async fn test_backpaginating_without_token() {
// We don't have a token.
let pagination = room_event_cache.pagination();
assert!(pagination.get_or_wait_for_token(None).await.is_none());
assert_eq!(pagination.get_or_wait_for_token(None).await, PaginationToken::None);
// If we try to back-paginate with a token, it will hit the end of the timeline
// and give us the resulting event.
@@ -850,19 +854,10 @@ async fn test_backpaginate_with_no_initial_events() {
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
// Start with a room with an event, but no prev-batch token.
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("hello").event_id(event_id!("$3"))),
)
.await;
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
wait_for_initial_events(events, &mut stream).await;
// The first back-pagination will return these two events.
//
// Note: it's important to return the same event that came from sync: since we
@@ -870,16 +865,37 @@ async fn test_backpaginate_with_no_initial_events() {
// from the end of the timeline, which must include the event we got from
// sync.
// We need to trigger the following conditions:
// - a back-pagination starts,
// - but then we get events from sync, before the back-pagination is done.
//
// The following things will happen:
// - We don't have a prev-batch token to start with, so the first
// back-pagination doesn't start
// before DEFAULT_WAIT_FOR_TOKEN_DURATION seconds.
// - While the back-pagination is actually running, we need a sync adding events
// to happen
// (after DEFAULT_WAIT_FOR_TOKEN_DURATION + 500 milliseconds).
// - The back-pagination finishes after this sync (after
// DEFAULT_WAIT_FOR_TOKEN_DURATION + 1
// second).
let wait_time = Duration::from_millis(500);
server
.mock_room_messages()
.ok(
"start-token-unused1".to_owned(),
Some("prev_batch".to_owned()),
vec![
f.text_msg("world").event_id(event_id!("$2")),
f.text_msg("hello").event_id(event_id!("$3")),
],
Vec::new(),
.respond_with(
ResponseTemplate::new(200)
.set_body_json(json!({
"chunk": vec![
f.text_msg("world").event_id(event_id!("$2")).into_raw_timeline(),
f.text_msg("hello").event_id(event_id!("$3")).into_raw_timeline(),
],
"start": "start-token-unused1",
"end": "prev_batch"
}))
// This is why we don't use `server.mock_room_messages()`.
// This delay has to be greater than the one used to return the sync response.
.set_delay(2 * wait_time),
)
.mock_once()
.mount()
@@ -904,17 +920,33 @@ async fn test_backpaginate_with_no_initial_events() {
// Run pagination: since there's no token, we'll wait a bit for a sync to return
// one, and since there's none, we'll end up starting from the end of the
// timeline.
pagination.run_backwards(20, once).await.unwrap();
let pagination_clone = pagination.clone();
let first_pagination = spawn(async move { pagination_clone.run_backwards(20, once).await });
// Make sure we've waited for the initial token long enough (3 seconds, as of
// 2024-12-16).
sleep(Duration::from_millis(3000) + wait_time).await;
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("hello").event_id(event_id!("$3"))),
)
.await;
first_pagination.await.expect("joining must work").expect("first backpagination must work");
// Second pagination will be instant.
pagination.run_backwards(20, once).await.unwrap();
// The linked chunk should contain the events in the correct order.
let (events, _stream) = room_event_cache.subscribe().await.unwrap();
assert_eq!(events.len(), 3, "{events:?}");
assert_event_matches_msg(&events[0], "oh well");
assert_event_matches_msg(&events[1], "hello");
assert_event_matches_msg(&events[2], "world");
assert_eq!(events.len(), 3);
}
#[async_test]