mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-02-15 18:12:57 -05:00
feat(event cache): don't restart back-pagination from the end if we had no prev-batch token
This commit is contained in:
@@ -1054,21 +1054,24 @@ async fn test_pending_edit_from_backpagination_doesnt_override_pending_edit_from
|
||||
let mut h = PendingEditHelper::new().await;
|
||||
let f = EventFactory::new();
|
||||
|
||||
let (_, mut timeline_stream) = h.timeline.subscribe().await;
|
||||
|
||||
// When I receive an edit live from a sync for an event I don't know about…
|
||||
let original_event_id = event_id!("$original");
|
||||
let edit_event_id = event_id!("$edit");
|
||||
h.handle_sync(
|
||||
JoinedRoomBuilder::new(&h.room_id).add_timeline_event(
|
||||
f.text_msg("* hello")
|
||||
.sender(&ALICE)
|
||||
.event_id(edit_event_id)
|
||||
.edit(original_event_id, RoomMessageEventContent::text_plain("[edit]").into()),
|
||||
),
|
||||
JoinedRoomBuilder::new(&h.room_id)
|
||||
.add_timeline_event(
|
||||
f.text_msg("* hello")
|
||||
.sender(&ALICE)
|
||||
.event_id(edit_event_id)
|
||||
.edit(original_event_id, RoomMessageEventContent::text_plain("[edit]").into()),
|
||||
)
|
||||
.set_timeline_prev_batch("prev-batch-token".to_owned())
|
||||
.set_timeline_limited(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let (_, mut timeline_stream) = h.timeline.subscribe().await;
|
||||
|
||||
// And then I receive an edit from a back-pagination for the same event…
|
||||
let edit_event_id2 = event_id!("$edit2");
|
||||
h.handle_backpagination(
|
||||
@@ -1084,7 +1087,7 @@ async fn test_pending_edit_from_backpagination_doesnt_override_pending_edit_from
|
||||
.await;
|
||||
|
||||
// Nothing happens.
|
||||
assert!(timeline_stream.next().now_or_never().is_none());
|
||||
assert_matches!(timeline_stream.next().now_or_never(), None);
|
||||
|
||||
// And then I receive the original event after a bit…
|
||||
h.handle_sync(
|
||||
|
||||
@@ -62,7 +62,7 @@ mod pagination;
|
||||
mod room;
|
||||
|
||||
pub mod paginator;
|
||||
pub use pagination::{RoomPagination, TimelineHasBeenResetWhilePaginating};
|
||||
pub use pagination::{PaginationToken, RoomPagination, TimelineHasBeenResetWhilePaginating};
|
||||
pub use room::RoomEventCache;
|
||||
|
||||
/// An error observed in the [`EventCache`].
|
||||
|
||||
@@ -35,6 +35,7 @@ use super::{
|
||||
///
|
||||
/// Can be created with [`super::RoomEventCache::pagination()`].
|
||||
#[allow(missing_debug_implementations)]
|
||||
#[derive(Clone)]
|
||||
pub struct RoomPagination {
|
||||
pub(super) inner: Arc<RoomEventCacheInner>,
|
||||
}
|
||||
@@ -123,6 +124,15 @@ impl RoomPagination {
|
||||
|
||||
let prev_token = self.get_or_wait_for_token(Some(DEFAULT_WAIT_FOR_TOKEN_DURATION)).await;
|
||||
|
||||
let prev_token = match prev_token {
|
||||
PaginationToken::HasMore(token) => Some(token),
|
||||
PaginationToken::None => None,
|
||||
PaginationToken::HitEnd => {
|
||||
debug!("Not back-paginating since we've reached the start of the timeline.");
|
||||
return Ok(Some(BackPaginationOutcome { reached_start: true, events: Vec::new() }));
|
||||
}
|
||||
};
|
||||
|
||||
let paginator = &self.inner.paginator;
|
||||
|
||||
paginator.set_idle_state(PaginatorState::Idle, prev_token.clone(), None)?;
|
||||
@@ -221,7 +231,7 @@ impl RoomPagination {
|
||||
/// It will only wait if we *never* saw an initial previous-batch token.
|
||||
/// Otherwise, it will immediately skip.
|
||||
#[doc(hidden)]
|
||||
pub async fn get_or_wait_for_token(&self, wait_time: Option<Duration>) -> Option<String> {
|
||||
pub async fn get_or_wait_for_token(&self, wait_time: Option<Duration>) -> PaginationToken {
|
||||
fn get_latest(events: &RoomEvents) -> Option<String> {
|
||||
events.rchunks().find_map(|chunk| match chunk.content() {
|
||||
ChunkContent::Gap(gap) => Some(gap.prev_token.clone()),
|
||||
@@ -232,19 +242,35 @@ impl RoomPagination {
|
||||
{
|
||||
// Scope for the lock guard.
|
||||
let state = self.inner.state.read().await;
|
||||
|
||||
// Check if the linked chunk contains any events. If so, absence of a gap means
|
||||
// we've hit the start of the timeline. If not, absence of a gap
|
||||
// means we've never received a pagination token from sync, and we
|
||||
// should wait for one.
|
||||
let has_events = state.events().events().next().is_some();
|
||||
|
||||
// Fast-path: we do have a previous-batch token already.
|
||||
if let Some(found) = get_latest(state.events()) {
|
||||
return Some(found);
|
||||
return PaginationToken::HasMore(found);
|
||||
}
|
||||
|
||||
// If we had events, and there was no gap, then we've hit the end of the
|
||||
// timeline.
|
||||
if has_events {
|
||||
return PaginationToken::HitEnd;
|
||||
}
|
||||
|
||||
// If we've already waited for an initial previous-batch token before,
|
||||
// immediately abort.
|
||||
if state.waited_for_initial_prev_token {
|
||||
return None;
|
||||
return PaginationToken::None;
|
||||
}
|
||||
}
|
||||
|
||||
// If the caller didn't set a wait time, return none early.
|
||||
let wait_time = wait_time?;
|
||||
let Some(wait_time) = wait_time else {
|
||||
return PaginationToken::None;
|
||||
};
|
||||
|
||||
// Otherwise, wait for a notification that we received a previous-batch token.
|
||||
// Note the state lock is released while doing so, allowing other tasks to write
|
||||
@@ -252,9 +278,17 @@ impl RoomPagination {
|
||||
let _ = timeout(wait_time, self.inner.pagination_batch_token_notifier.notified()).await;
|
||||
|
||||
let mut state = self.inner.state.write().await;
|
||||
let token = get_latest(state.events());
|
||||
|
||||
state.waited_for_initial_prev_token = true;
|
||||
token
|
||||
|
||||
if let Some(token) = get_latest(state.events()) {
|
||||
PaginationToken::HasMore(token)
|
||||
} else if state.events().events().next().is_some() {
|
||||
// See logic above, in the read lock guard scope.
|
||||
PaginationToken::HitEnd
|
||||
} else {
|
||||
PaginationToken::None
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a subscriber to the pagination status used for the
|
||||
@@ -281,8 +315,8 @@ impl RoomPagination {
|
||||
}
|
||||
|
||||
/// Pagination token data, indicating in which state is the current pagination.
|
||||
#[derive(Clone, Debug)]
|
||||
pub(super) enum PaginationToken {
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum PaginationToken {
|
||||
/// We never had a pagination token, so we'll start back-paginating from the
|
||||
/// end, or forward-paginating from the start.
|
||||
None,
|
||||
@@ -320,6 +354,7 @@ mod tests {
|
||||
mod time_tests {
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use matrix_sdk_base::RoomState;
|
||||
use matrix_sdk_test::{
|
||||
async_test, event_factory::EventFactory, sync_timeline_event, ALICE,
|
||||
@@ -328,7 +363,8 @@ mod tests {
|
||||
use tokio::{spawn, time::sleep};
|
||||
|
||||
use crate::{
|
||||
deserialized_responses::SyncTimelineEvent, event_cache::room::events::Gap,
|
||||
deserialized_responses::SyncTimelineEvent,
|
||||
event_cache::{pagination::PaginationToken, room::events::Gap},
|
||||
test_utils::logged_in_client,
|
||||
};
|
||||
|
||||
@@ -344,32 +380,15 @@ mod tests {
|
||||
|
||||
let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap();
|
||||
|
||||
// When I only have events in a room,
|
||||
room_event_cache
|
||||
.inner
|
||||
.state
|
||||
.write()
|
||||
.await
|
||||
.with_events_mut(|events| {
|
||||
events.push_events([SyncTimelineEvent::new(sync_timeline_event!({
|
||||
"sender": "b@z.h",
|
||||
"type": "m.room.message",
|
||||
"event_id": "$ida",
|
||||
"origin_server_ts": 12344446,
|
||||
"content": { "body":"yolo", "msgtype": "m.text" },
|
||||
}))])
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let pagination = room_event_cache.pagination();
|
||||
|
||||
// If I don't wait for the backpagination token,
|
||||
// If I have a room with no events, and try to get a pagination token without
|
||||
// waiting,
|
||||
let found = pagination.get_or_wait_for_token(None).await;
|
||||
// Then I don't find it.
|
||||
assert!(found.is_none());
|
||||
// Then I don't get any pagination token.
|
||||
assert_matches!(found, PaginationToken::None);
|
||||
|
||||
// Reset waited_for_initial_prev_token state.
|
||||
// Reset waited_for_initial_prev_token and event state.
|
||||
pagination.inner.state.write().await.reset().await.unwrap();
|
||||
|
||||
// If I wait for a back-pagination token for 0 seconds,
|
||||
@@ -377,7 +396,7 @@ mod tests {
|
||||
let found = pagination.get_or_wait_for_token(Some(Duration::default())).await;
|
||||
let waited = before.elapsed();
|
||||
// then I don't get any,
|
||||
assert!(found.is_none());
|
||||
assert_matches!(found, PaginationToken::None);
|
||||
// and I haven't waited long.
|
||||
assert!(waited.as_secs() < 1);
|
||||
|
||||
@@ -389,12 +408,96 @@ mod tests {
|
||||
let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(1))).await;
|
||||
let waited = before.elapsed();
|
||||
// then I still don't get any.
|
||||
assert!(found.is_none());
|
||||
assert_matches!(found, PaginationToken::None);
|
||||
// and I've waited a bit.
|
||||
assert!(waited.as_secs() < 2);
|
||||
assert!(waited.as_secs() >= 1);
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_wait_hit_end_of_timeline() {
|
||||
let client = logged_in_client(None).await;
|
||||
let room_id = room_id!("!galette:saucisse.bzh");
|
||||
client.base_client().get_or_create_room(room_id, RoomState::Joined);
|
||||
|
||||
let event_cache = client.event_cache();
|
||||
|
||||
event_cache.subscribe().unwrap();
|
||||
|
||||
let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap();
|
||||
|
||||
let f = EventFactory::new().room(room_id).sender(*ALICE);
|
||||
let pagination = room_event_cache.pagination();
|
||||
|
||||
// Add a previous event.
|
||||
room_event_cache
|
||||
.inner
|
||||
.state
|
||||
.write()
|
||||
.await
|
||||
.with_events_mut(|events| {
|
||||
events
|
||||
.push_events([f.text_msg("this is the start of the timeline").into_sync()]);
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// If I have a room with events, and try to get a pagination token without
|
||||
// waiting,
|
||||
let found = pagination.get_or_wait_for_token(None).await;
|
||||
// I've reached the start of the timeline.
|
||||
assert_matches!(found, PaginationToken::HitEnd);
|
||||
|
||||
// If I wait for a back-pagination token for 0 seconds,
|
||||
let before = Instant::now();
|
||||
let found = pagination.get_or_wait_for_token(Some(Duration::default())).await;
|
||||
let waited = before.elapsed();
|
||||
// Then I still have reached the start of the timeline.
|
||||
assert_matches!(found, PaginationToken::HitEnd);
|
||||
// and I've waited very little.
|
||||
assert!(waited.as_secs() < 1);
|
||||
|
||||
// If I wait for a back-pagination token for 1 second,
|
||||
let before = Instant::now();
|
||||
let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(1))).await;
|
||||
let waited = before.elapsed();
|
||||
// then I still don't get any.
|
||||
assert_matches!(found, PaginationToken::HitEnd);
|
||||
// and I've waited very little (there's no point in waiting in this case).
|
||||
assert!(waited.as_secs() < 1);
|
||||
|
||||
// Now, reset state. We'll add an event *after* we've started waiting, this
|
||||
// time.
|
||||
room_event_cache.clear().await.unwrap();
|
||||
|
||||
spawn(async move {
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
|
||||
room_event_cache
|
||||
.inner
|
||||
.state
|
||||
.write()
|
||||
.await
|
||||
.with_events_mut(|events| {
|
||||
events.push_events([f
|
||||
.text_msg("this is the start of the timeline")
|
||||
.into_sync()]);
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
// If I wait for a pagination token,
|
||||
let before = Instant::now();
|
||||
let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(2))).await;
|
||||
let waited = before.elapsed();
|
||||
// since sync has returned all events, and no prior gap, I've hit the end.
|
||||
assert_matches!(found, PaginationToken::HitEnd);
|
||||
// and I've waited for the whole duration.
|
||||
assert!(waited.as_secs() >= 2);
|
||||
assert!(waited.as_secs() < 3);
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_wait_for_pagination_token_already_present() {
|
||||
let client = logged_in_client(None).await;
|
||||
@@ -435,14 +538,14 @@ mod tests {
|
||||
// If I don't wait for a back-pagination token,
|
||||
let found = pagination.get_or_wait_for_token(None).await;
|
||||
// Then I get it.
|
||||
assert_eq!(found.as_ref(), Some(&expected_token));
|
||||
assert_eq!(found, PaginationToken::HasMore(expected_token.clone()));
|
||||
|
||||
// If I wait for a back-pagination token for 0 seconds,
|
||||
let before = Instant::now();
|
||||
let found = pagination.get_or_wait_for_token(Some(Duration::default())).await;
|
||||
let waited = before.elapsed();
|
||||
// then I do get one.
|
||||
assert_eq!(found.as_ref(), Some(&expected_token));
|
||||
assert_eq!(found, PaginationToken::HasMore(expected_token.clone()));
|
||||
// and I haven't waited long.
|
||||
assert!(waited.as_millis() < 100);
|
||||
|
||||
@@ -451,7 +554,7 @@ mod tests {
|
||||
let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(1))).await;
|
||||
let waited = before.elapsed();
|
||||
// then I do get one.
|
||||
assert_eq!(found, Some(expected_token));
|
||||
assert_eq!(found, PaginationToken::HasMore(expected_token));
|
||||
// and I haven't waited long.
|
||||
assert!(waited.as_millis() < 100);
|
||||
}
|
||||
@@ -493,14 +596,14 @@ mod tests {
|
||||
|
||||
// Then first I don't get it (if I'm not waiting,)
|
||||
let found = pagination.get_or_wait_for_token(None).await;
|
||||
assert!(found.is_none());
|
||||
assert_matches!(found, PaginationToken::None);
|
||||
|
||||
// And if I wait for the back-pagination token for 600ms,
|
||||
let found = pagination.get_or_wait_for_token(Some(Duration::from_millis(600))).await;
|
||||
let waited = before.elapsed();
|
||||
|
||||
// then I do get one eventually.
|
||||
assert_eq!(found, Some(expected_token));
|
||||
assert_eq!(found, PaginationToken::HasMore(expected_token));
|
||||
// and I have waited between ~400 and ~1000 milliseconds.
|
||||
assert!(waited.as_secs() < 1);
|
||||
assert!(waited.as_millis() >= 400);
|
||||
@@ -551,7 +654,7 @@ mod tests {
|
||||
// Retrieving the pagination token will return the most recent one, not the old
|
||||
// one.
|
||||
let found = pagination.get_or_wait_for_token(None).await;
|
||||
assert_eq!(found, Some(new_token));
|
||||
assert_eq!(found, PaginationToken::HasMore(new_token));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
use std::{future::ready, ops::ControlFlow, time::Duration};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use assert_matches2::assert_let;
|
||||
use futures_util::FutureExt as _;
|
||||
use matrix_sdk::{
|
||||
assert_let_timeout, assert_next_matches_with_timeout,
|
||||
deserialized_responses::SyncTimelineEvent,
|
||||
event_cache::{
|
||||
paginator::PaginatorState, BackPaginationOutcome, EventCacheError, RoomEventCacheUpdate,
|
||||
TimelineHasBeenResetWhilePaginating,
|
||||
paginator::PaginatorState, BackPaginationOutcome, EventCacheError, PaginationToken,
|
||||
RoomEventCacheUpdate, TimelineHasBeenResetWhilePaginating,
|
||||
},
|
||||
test_utils::{assert_event_matches_msg, mocks::MatrixMockServer},
|
||||
};
|
||||
@@ -16,7 +17,7 @@ use matrix_sdk_test::{
|
||||
};
|
||||
use ruma::{event_id, events::AnyTimelineEvent, room_id, serde::Raw, user_id};
|
||||
use serde_json::json;
|
||||
use tokio::{spawn, sync::broadcast};
|
||||
use tokio::{spawn, sync::broadcast, time::sleep};
|
||||
use wiremock::ResponseTemplate;
|
||||
|
||||
async fn once(
|
||||
@@ -259,7 +260,7 @@ async fn test_backpaginate_once() {
|
||||
// Then if I backpaginate,
|
||||
let pagination = room_event_cache.pagination();
|
||||
|
||||
assert!(pagination.get_or_wait_for_token(None).await.is_some());
|
||||
assert_matches!(pagination.get_or_wait_for_token(None).await, PaginationToken::HasMore(_));
|
||||
|
||||
pagination.run_backwards(20, once).await.unwrap()
|
||||
};
|
||||
@@ -351,7 +352,7 @@ async fn test_backpaginate_many_times_with_many_iterations() {
|
||||
|
||||
// Then if I backpaginate in a loop,
|
||||
let pagination = room_event_cache.pagination();
|
||||
while pagination.get_or_wait_for_token(None).await.is_some() {
|
||||
while matches!(pagination.get_or_wait_for_token(None).await, PaginationToken::HasMore(_)) {
|
||||
pagination
|
||||
.run_backwards(20, |outcome, timeline_has_been_reset| {
|
||||
num_paginations += 1;
|
||||
@@ -470,7 +471,7 @@ async fn test_backpaginate_many_times_with_one_iteration() {
|
||||
|
||||
// Then if I backpaginate in a loop,
|
||||
let pagination = room_event_cache.pagination();
|
||||
while pagination.get_or_wait_for_token(None).await.is_some() {
|
||||
while matches!(pagination.get_or_wait_for_token(None).await, PaginationToken::HasMore(_)) {
|
||||
pagination
|
||||
.run_backwards(20, |outcome, timeline_has_been_reset| {
|
||||
num_paginations += 1;
|
||||
@@ -606,8 +607,9 @@ async fn test_reset_while_backpaginating() {
|
||||
// Run the pagination!
|
||||
let pagination = room_event_cache.pagination();
|
||||
|
||||
let first_token = pagination.get_or_wait_for_token(None).await;
|
||||
assert!(first_token.is_some());
|
||||
assert_let!(
|
||||
PaginationToken::HasMore(first_token) = pagination.get_or_wait_for_token(None).await
|
||||
);
|
||||
|
||||
let backpagination = spawn({
|
||||
let pagination = room_event_cache.pagination();
|
||||
@@ -645,8 +647,10 @@ async fn test_reset_while_backpaginating() {
|
||||
assert!(!events.is_empty());
|
||||
|
||||
// Now if we retrieve the oldest token, it's set to something else.
|
||||
let second_token = pagination.get_or_wait_for_token(None).await.unwrap();
|
||||
assert!(first_token.unwrap() != second_token);
|
||||
assert_let!(
|
||||
PaginationToken::HasMore(second_token) = pagination.get_or_wait_for_token(None).await
|
||||
);
|
||||
assert!(first_token != second_token);
|
||||
assert_eq!(second_token, "third_backpagination");
|
||||
}
|
||||
|
||||
@@ -687,7 +691,7 @@ async fn test_backpaginating_without_token() {
|
||||
|
||||
// We don't have a token.
|
||||
let pagination = room_event_cache.pagination();
|
||||
assert!(pagination.get_or_wait_for_token(None).await.is_none());
|
||||
assert_eq!(pagination.get_or_wait_for_token(None).await, PaginationToken::None);
|
||||
|
||||
// If we try to back-paginate with a token, it will hit the end of the timeline
|
||||
// and give us the resulting event.
|
||||
@@ -850,19 +854,10 @@ async fn test_backpaginate_with_no_initial_events() {
|
||||
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
|
||||
|
||||
// Start with a room with an event, but no prev-batch token.
|
||||
let room = server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(room_id)
|
||||
.add_timeline_event(f.text_msg("hello").event_id(event_id!("$3"))),
|
||||
)
|
||||
.await;
|
||||
let room = server.sync_joined_room(&client, room_id).await;
|
||||
|
||||
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
|
||||
|
||||
let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
|
||||
wait_for_initial_events(events, &mut stream).await;
|
||||
|
||||
// The first back-pagination will return these two events.
|
||||
//
|
||||
// Note: it's important to return the same event that came from sync: since we
|
||||
@@ -870,16 +865,37 @@ async fn test_backpaginate_with_no_initial_events() {
|
||||
// from the end of the timeline, which must include the event we got from
|
||||
// sync.
|
||||
|
||||
// We need to trigger the following conditions:
|
||||
// - a back-pagination starts,
|
||||
// - but then we get events from sync, before the back-pagination is done.
|
||||
//
|
||||
// The following things will happen:
|
||||
// - We don't have a prev-batch token to start with, so the first
|
||||
// back-pagination doesn't start
|
||||
// before DEFAULT_WAIT_FOR_TOKEN_DURATION seconds.
|
||||
// - While the back-pagination is actually running, we need a sync adding events
|
||||
// to happen
|
||||
// (after DEFAULT_WAIT_FOR_TOKEN_DURATION + 500 milliseconds).
|
||||
// - The back-pagination finishes after this sync (after
|
||||
// DEFAULT_WAIT_FOR_TOKEN_DURATION + 1
|
||||
// second).
|
||||
|
||||
let wait_time = Duration::from_millis(500);
|
||||
server
|
||||
.mock_room_messages()
|
||||
.ok(
|
||||
"start-token-unused1".to_owned(),
|
||||
Some("prev_batch".to_owned()),
|
||||
vec![
|
||||
f.text_msg("world").event_id(event_id!("$2")),
|
||||
f.text_msg("hello").event_id(event_id!("$3")),
|
||||
],
|
||||
Vec::new(),
|
||||
.respond_with(
|
||||
ResponseTemplate::new(200)
|
||||
.set_body_json(json!({
|
||||
"chunk": vec![
|
||||
f.text_msg("world").event_id(event_id!("$2")).into_raw_timeline(),
|
||||
f.text_msg("hello").event_id(event_id!("$3")).into_raw_timeline(),
|
||||
],
|
||||
"start": "start-token-unused1",
|
||||
"end": "prev_batch"
|
||||
}))
|
||||
// This is why we don't use `server.mock_room_messages()`.
|
||||
// This delay has to be greater than the one used to return the sync response.
|
||||
.set_delay(2 * wait_time),
|
||||
)
|
||||
.mock_once()
|
||||
.mount()
|
||||
@@ -904,17 +920,33 @@ async fn test_backpaginate_with_no_initial_events() {
|
||||
// Run pagination: since there's no token, we'll wait a bit for a sync to return
|
||||
// one, and since there's none, we'll end up starting from the end of the
|
||||
// timeline.
|
||||
pagination.run_backwards(20, once).await.unwrap();
|
||||
let pagination_clone = pagination.clone();
|
||||
|
||||
let first_pagination = spawn(async move { pagination_clone.run_backwards(20, once).await });
|
||||
|
||||
// Make sure we've waited for the initial token long enough (3 seconds, as of
|
||||
// 2024-12-16).
|
||||
sleep(Duration::from_millis(3000) + wait_time).await;
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(room_id)
|
||||
.add_timeline_event(f.text_msg("hello").event_id(event_id!("$3"))),
|
||||
)
|
||||
.await;
|
||||
|
||||
first_pagination.await.expect("joining must work").expect("first backpagination must work");
|
||||
|
||||
// Second pagination will be instant.
|
||||
pagination.run_backwards(20, once).await.unwrap();
|
||||
|
||||
// The linked chunk should contain the events in the correct order.
|
||||
let (events, _stream) = room_event_cache.subscribe().await.unwrap();
|
||||
|
||||
assert_eq!(events.len(), 3, "{events:?}");
|
||||
assert_event_matches_msg(&events[0], "oh well");
|
||||
assert_event_matches_msg(&events[1], "hello");
|
||||
assert_event_matches_msg(&events[2], "world");
|
||||
assert_eq!(events.len(), 3);
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
|
||||
Reference in New Issue
Block a user