paginator: select how many events to retrieve from the /messages query

This commit is contained in:
Benjamin Bouvier
2024-04-22 16:39:17 +02:00
parent 8e2fdd9200
commit c471ee42ab

View File

@@ -209,30 +209,42 @@ impl Paginator {
Ok(StartFromResult { events, has_prev, has_next })
}
/// Runs a backward pagination, from the current state of the object.
/// Runs a backward pagination (requesting `num_events` to the server), from
/// the current state of the object.
///
/// Will return immediately if we have already hit the start of the
/// timeline.
///
/// May return an error if it's already paginating, or if the call to
/// /messages failed.
pub async fn paginate_backward(&self) -> Result<PaginationResult, PaginatorError> {
self.paginate(Direction::Backward, &self.prev_batch_token).await
pub async fn paginate_backward(
&self,
num_events: UInt,
) -> Result<PaginationResult, PaginatorError> {
self.paginate(Direction::Backward, num_events, &self.prev_batch_token).await
}
/// Runs a forward pagination, from the current state of the object.
/// Runs a forward pagination (requesting `num_events` to the server), from
/// the current state of the object.
///
/// Will return immediately if we have already hit the end of the timeline.
///
/// May return an error if it's already paginating, or if the call to
/// /messages failed.
pub async fn paginate_forward(&self) -> Result<PaginationResult, PaginatorError> {
self.paginate(Direction::Forward, &self.next_batch_token).await
pub async fn paginate_forward(
&self,
num_events: UInt,
) -> Result<PaginationResult, PaginatorError> {
self.paginate(Direction::Forward, num_events, &self.next_batch_token).await
}
/// Paginate in the given direction, requesting `num_events` events to the
/// server, using the `token_lock` to read from and write the pagination
/// token.
async fn paginate(
&self,
dir: Direction,
num_events: UInt,
token_lock: &Mutex<Option<String>>,
) -> Result<PaginationResult, PaginatorError> {
self.check_state(PaginatorState::Idle)?;
@@ -255,8 +267,10 @@ impl Paginator {
});
}
let response_result =
self.room.messages(MessagesOptions::new(dir).from(token.as_deref())).await;
let mut options = MessagesOptions::new(dir).from(token.as_deref());
options.limit = num_events;
let response_result = self.room.messages(options).await;
// In case of error, reset the state to idle.
let response = match response_result {
@@ -461,15 +475,31 @@ mod tests {
self.room_ready.notified().await;
}
let limit = u64::from(opts.limit) as usize;
let (end, events) = match opts.dir {
Direction::Backward => (
self.prev_batch_token.lock().await.clone(),
self.prev_events.lock().await.clone(),
),
Direction::Forward => (
self.next_batch_token.lock().await.clone(),
self.next_events.lock().await.clone(),
),
Direction::Backward => {
let events = self.prev_events.lock().await;
let events = if events.is_empty() {
Vec::new()
} else {
let len = events.len();
let take_before = limit.min(len);
// Subtract is safe because take_before <= len
events[len - take_before..len].to_vec()
};
(self.prev_batch_token.lock().await.clone(), events)
}
Direction::Forward => {
let events = self.next_events.lock().await;
let events = if events.is_empty() {
Vec::new()
} else {
events[0..limit.min(events.len())].to_vec()
};
(self.next_batch_token.lock().await.clone(), events)
}
};
return Ok(Messages {
@@ -605,7 +635,8 @@ mod tests {
*room.prev_batch_token.lock().await = Some("prev2".to_owned());
// When I backpaginate, I get the events I expect.
let prev = paginator.paginate_backward().await.expect("paginate backward should work");
let prev =
paginator.paginate_backward(uint!(100)).await.expect("paginate backward should work");
assert!(!prev.hit_end_of_timeline);
assert_eq!(prev.events.len(), 1);
assert_event_matches_msg(&prev.events[0], "previous");
@@ -616,7 +647,7 @@ mod tests {
*room.prev_batch_token.lock().await = None;
let prev = paginator
.paginate_backward()
.paginate_backward(uint!(100))
.await
.expect("paginate backward the second time should work");
assert!(prev.hit_end_of_timeline);
@@ -626,13 +657,57 @@ mod tests {
// I've hit the start of the timeline, but back-paginating again will
// return immediately.
let prev = paginator
.paginate_backward()
.paginate_backward(uint!(100))
.await
.expect("paginate backward the third time should work");
assert!(prev.hit_end_of_timeline);
assert!(prev.events.is_empty());
}
#[async_test]
async fn test_paginate_backward_with_limit() {
// Prepare test data.
let room = Box::new(TestRoom::new(false, *ROOM_ID, *USER_ID));
let event_id = event_id!("$yoyoyo");
let event_factory = &room.event_factory;
*room.target_event_text.lock().await = "initial".to_owned();
*room.prev_batch_token.lock().await = Some("prev".to_owned());
// When I call `Paginator::start_from`, it works,
let paginator = Arc::new(Paginator::new(room.clone()));
let context =
paginator.start_from(event_id, uint!(100)).await.expect("start_from should work");
// And I get the events I expected.
assert_eq!(context.events.len(), 1);
assert_event_matches_msg(&context.events[0], "initial");
assert_eq!(context.events[0].event.deserialize().unwrap().event_id(), event_id);
// There's a previous batch.
assert!(context.has_prev);
assert!(!context.has_next);
// Preparing data for the next back-pagination.
*room.prev_events.lock().await = (0..100)
.rev()
.map(|i| {
TimelineEvent::new(event_factory.text_msg(format!("prev{i}")).into_raw_timeline())
})
.collect();
*room.prev_batch_token.lock().await = None;
// When I backpaginate and request 100 events, I get only 10 events.
let prev =
paginator.paginate_backward(uint!(10)).await.expect("paginate backward should work");
assert!(prev.hit_end_of_timeline);
assert_eq!(prev.events.len(), 10);
for i in 0..10 {
assert_event_matches_msg(&prev.events[i], &format!("prev{}", 9 - i));
}
}
#[async_test]
async fn test_paginate_forward() {
// Prepare test data.
@@ -663,7 +738,8 @@ mod tests {
*room.next_batch_token.lock().await = Some("next2".to_owned());
// When I forward-paginate, I get the events I expect.
let next = paginator.paginate_forward().await.expect("paginate forward should work");
let next =
paginator.paginate_forward(uint!(100)).await.expect("paginate forward should work");
assert!(!next.hit_end_of_timeline);
assert_eq!(next.events.len(), 1);
assert_event_matches_msg(&next.events[0], "next");
@@ -674,7 +750,7 @@ mod tests {
*room.next_batch_token.lock().await = None;
let next = paginator
.paginate_forward()
.paginate_forward(uint!(100))
.await
.expect("paginate forward the second time should work");
assert!(next.hit_end_of_timeline);
@@ -684,7 +760,7 @@ mod tests {
// I've hit the start of the timeline, but back-paginating again will
// return immediately.
let next = paginator
.paginate_forward()
.paginate_forward(uint!(100))
.await
.expect("paginate forward the third time should work");
assert!(next.hit_end_of_timeline);
@@ -709,7 +785,7 @@ mod tests {
// Attempting to run pagination must fail and not change the state.
assert_invalid_state(
paginator.paginate_backward(),
paginator.paginate_backward(uint!(100)),
PaginatorState::Idle,
PaginatorState::Initial,
)
@@ -733,7 +809,7 @@ mod tests {
.await;
assert_invalid_state(
paginator.paginate_backward(),
paginator.paginate_backward(uint!(100)),
PaginatorState::Idle,
PaginatorState::FetchingTargetEvent,
)
@@ -752,7 +828,7 @@ mod tests {
assert!(state.next().now_or_never().is_none());
let p = paginator.clone();
let join_handle = spawn(async move { p.paginate_backward().await });
let join_handle = spawn(async move { p.paginate_backward(uint!(100)).await });
assert_eq!(state.next().await, Some(PaginatorState::Paginating));
@@ -765,14 +841,14 @@ mod tests {
.await;
assert_invalid_state(
paginator.paginate_backward(),
paginator.paginate_backward(uint!(100)),
PaginatorState::Idle,
PaginatorState::Paginating,
)
.await;
assert_invalid_state(
paginator.paginate_forward(),
paginator.paginate_forward(uint!(100)),
PaginatorState::Idle,
PaginatorState::Paginating,
)