Merge pull request #3491 from Hywan/feat-sdk-event-cache-pagination-through-update-2

feat(sdk): Add `RoomPagination::run_backwards(…, until)`, take 2
This commit is contained in:
Ivan Enderlin
2024-06-03 10:28:49 +02:00
committed by GitHub
4 changed files with 272 additions and 55 deletions

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::ControlFlow;
use async_rx::StreamExt as _;
use async_stream::stream;
use futures_core::Stream;
@@ -67,49 +69,44 @@ impl super::Timeline {
pub async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result<bool> {
let pagination = self.event_cache.pagination();
loop {
let result = pagination.run_backwards(batch_size).await;
let result = pagination
.run_backwards(
batch_size,
|BackPaginationOutcome { events, reached_start },
_timeline_has_been_reset| async move {
let num_events = events.len();
trace!("Back-pagination succeeded with {num_events} events");
let event_cache_outcome = match result {
Ok(outcome) => outcome,
// TODO(hywan): Remove, and let spread events via
// `matrix_sdk::event_cache::RoomEventCacheUpdate` from
// `matrix_sdk::event_cache::RoomPagination::run_backwards`.
self.inner
.add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination)
.await;
Err(EventCacheError::BackpaginationError(
PaginatorError::InvalidPreviousState {
actual: PaginatorState::Paginating, ..
},
)) => {
warn!("Another pagination request is already happening, returning early");
return Ok(false);
}
if num_events == 0 && !reached_start {
// As an exceptional contract: if there were no events in the response,
// and we've not hit the start of the timeline, retry until we get
// some events or reach the start of the timeline.
return ControlFlow::Continue(());
}
Err(err) => return Err(err),
};
ControlFlow::Break(reached_start)
},
)
.await;
let BackPaginationOutcome { events, reached_start } = event_cache_outcome;
let num_events = events.len();
trace!("Back-pagination succeeded with {num_events} events");
self.inner
.add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination)
.await;
if reached_start {
return Ok(true);
match result {
Err(EventCacheError::BackpaginationError(PaginatorError::InvalidPreviousState {
actual: PaginatorState::Paginating,
..
})) => {
warn!("Another pagination request is already happening, returning early");
Ok(false)
}
if num_events == 0 {
// As an exceptional contract: if there were no events in the response,
// and we've not hit the start of the timeline, retry until we get
// some events or reach the start of the timeline.
continue;
}
// Exit the inner loop, and ask for another limit.
break;
result => result,
}
Ok(false)
}
/// Subscribe to the back-pagination status of a live timeline.

View File

@@ -76,7 +76,7 @@ mod pagination;
mod store;
pub mod paginator;
pub use pagination::RoomPagination;
pub use pagination::{RoomPagination, TimelineHasBeenResetWhilePaginating};
/// An error observed in the [`EventCache`].
#[derive(thiserror::Error, Debug)]

View File

@@ -14,7 +14,7 @@
//! A sub-object for running pagination tasks on a given room.
use std::{sync::Arc, time::Duration};
use std::{future::Future, ops::ControlFlow, sync::Arc, time::Duration};
use eyeball::Subscriber;
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
@@ -59,17 +59,75 @@ impl RoomPagination {
/// This automatically takes care of waiting for a pagination token from
/// sync, if we haven't done that before.
///
/// The `until` argument is an async closure that returns a [`ControlFlow`]
/// to decide whether a new pagination must be run or not. It's helpful when
/// the server replies with e.g. a certain set of events, but we would like
/// more, or the event we are looking for isn't part of this set: in this
/// case, `until` returns [`ControlFlow::Continue`], otherwise it returns
/// [`ControlFlow::Break`]. `until` receives [`BackPaginationOutcome`] as
/// its sole argument.
///
/// # Errors
///
/// It may return an error if the pagination token used during
/// back-pagination has disappeared while we started the pagination. In
/// that case, it's desirable to call the method again.
#[instrument(skip(self))]
pub async fn run_backwards(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
///
/// # Example
///
/// To do a single run:
///
/// ```rust
/// use std::ops::ControlFlow;
///
/// use matrix_sdk::event_cache::{
/// BackPaginationOutcome,
/// RoomPagination,
/// TimelineHasBeenResetWhilePaginating
/// };
///
/// # async fn foo(room_pagination: RoomPagination) {
/// let result = room_pagination.run_backwards(
/// 42,
/// |BackPaginationOutcome { events, reached_start },
/// _timeline_has_been_reset: TimelineHasBeenResetWhilePaginating| async move {
/// // Do something with `events` and `reached_start` maybe?
/// let _ = events;
/// let _ = reached_start;
///
/// ControlFlow::Break(())
/// }
/// ).await;
/// # }
#[instrument(skip(self, until))]
pub async fn run_backwards<Until, Break, UntilFuture>(
&self,
batch_size: u16,
mut until: Until,
) -> Result<Break>
where
Until: FnMut(BackPaginationOutcome, TimelineHasBeenResetWhilePaginating) -> UntilFuture,
UntilFuture: Future<Output = ControlFlow<Break, ()>>,
{
let mut timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No;
loop {
if let Some(result) = self.run_backwards_impl(batch_size).await? {
return Ok(result);
if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
match until(outcome, timeline_has_been_reset).await {
ControlFlow::Continue(()) => {
trace!("back-pagination continues");
timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No;
continue;
}
ControlFlow::Break(value) => return Ok(value),
}
}
timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::Yes;
debug!("back-pagination has been internally restarted because of a timeline reset.");
}
}
@@ -259,6 +317,16 @@ impl RoomPagination {
}
}
/// A type representing whether the timeline has been reset.
#[derive(Debug)]
pub enum TimelineHasBeenResetWhilePaginating {
/// The timeline has been reset.
Yes,
/// The timeline has not been reset.
No,
}
#[cfg(test)]
mod tests {
// Those tests require time to work, and it does not on wasm32.

View File

@@ -1,8 +1,11 @@
use std::time::Duration;
use std::{future::ready, ops::ControlFlow, time::Duration};
use assert_matches2::{assert_let, assert_matches};
use matrix_sdk::{
event_cache::{BackPaginationOutcome, EventCacheError, RoomEventCacheUpdate},
event_cache::{
BackPaginationOutcome, EventCacheError, RoomEventCacheUpdate,
TimelineHasBeenResetWhilePaginating,
},
test_utils::{assert_event_matches_msg, events::EventFactory, logged_in_client_with_server},
};
use matrix_sdk_test::{
@@ -24,6 +27,13 @@ use wiremock::{
use crate::mock_sync;
async fn once(
outcome: BackPaginationOutcome,
_timeline_has_been_reset: TimelineHasBeenResetWhilePaginating,
) -> ControlFlow<BackPaginationOutcome, ()> {
ControlFlow::Break(outcome)
}
#[async_test]
async fn test_must_explicitly_subscribe() {
let (client, server) = logged_in_client_with_server().await;
@@ -362,7 +372,7 @@ async fn test_backpaginate_once() {
assert!(pagination.get_or_wait_for_token().await.is_some());
pagination.run_backwards(20).await.unwrap()
pagination.run_backwards(20, once).await.unwrap()
};
// I'll get all the previous events, in "reverse" order (same as the response).
@@ -377,7 +387,7 @@ async fn test_backpaginate_once() {
}
#[async_test]
async fn test_backpaginate_multiple_iterations() {
async fn test_backpaginate_many_times_with_many_iterations() {
let (client, server) = logged_in_client_with_server().await;
let event_cache = client.event_cache();
@@ -425,6 +435,7 @@ async fn test_backpaginate_multiple_iterations() {
}
let mut num_iterations = 0;
let mut num_paginations = 0;
let mut global_events = Vec::new();
let mut global_reached_start = false;
@@ -449,19 +460,149 @@ async fn test_backpaginate_multiple_iterations() {
// Then if I backpaginate in a loop,
let pagination = room_event_cache.pagination();
while pagination.get_or_wait_for_token().await.is_some() {
let BackPaginationOutcome { reached_start, events } =
pagination.run_backwards(20).await.unwrap();
pagination
.run_backwards(20, |outcome, timeline_has_been_reset| {
num_paginations += 1;
if !global_reached_start {
global_reached_start = reached_start;
}
global_events.extend(events);
assert_matches!(timeline_has_been_reset, TimelineHasBeenResetWhilePaginating::No);
if !global_reached_start {
global_reached_start = outcome.reached_start;
}
global_events.extend(outcome.events);
ready(ControlFlow::Break(()))
})
.await
.unwrap();
num_iterations += 1;
}
// I'll get all the previous events,
assert_eq!(num_iterations, 2);
assert_eq!(num_iterations, 2); // in two iterations…
assert_eq!(num_paginations, 2); // … we get two paginations.
assert!(global_reached_start);
assert_event_matches_msg(&global_events[0], "world");
assert_event_matches_msg(&global_events[1], "hello");
assert_event_matches_msg(&global_events[2], "oh well");
assert_eq!(global_events.len(), 3);
// And next time I'll open the room, I'll get the events in the right order.
let (events, _receiver) = room_event_cache.subscribe().await.unwrap();
assert_event_matches_msg(&events[0], "oh well");
assert_event_matches_msg(&events[1], "hello");
assert_event_matches_msg(&events[2], "world");
assert_event_matches_msg(&events[3], "heyo");
assert_eq!(events.len(), 4);
assert!(room_stream.is_empty());
}
#[async_test]
async fn test_backpaginate_many_times_with_one_iteration() {
let (client, server) = logged_in_client_with_server().await;
let event_cache = client.event_cache();
// Immediately subscribe the event cache to sync updates.
event_cache.subscribe().unwrap();
// If I sync and get informed I've joined The Room, and get a previous batch
// token,
let room_id = room_id!("!omelette:fromage.fr");
let event_builder = EventBuilder::new();
let mut sync_builder = SyncResponseBuilder::new();
{
sync_builder.add_joined_room(
JoinedRoomBuilder::new(room_id)
// Note to self: a timeline must have at least single event to be properly
// serialized.
.add_timeline_event(event_builder.make_sync_message_event(
user_id!("@a:b.c"),
RoomMessageEventContent::text_plain("heyo"),
))
.set_timeline_prev_batch("prev_batch".to_owned()),
);
let response_body = sync_builder.build_json_sync_response();
mock_sync(&server, response_body, None).await;
client.sync_once(Default::default()).await.unwrap();
server.reset().await;
}
let (room_event_cache, _drop_handles) =
client.get_room(room_id).unwrap().event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
// This is racy: either the initial message has been processed by the event
// cache (and no room updates will happen in this case), or it hasn't, and
// the stream will return the next message soon.
if events.is_empty() {
let _ = room_stream.recv().await.expect("read error");
} else {
assert_eq!(events.len(), 1);
}
let mut num_iterations = 0;
let mut num_paginations = 0;
let mut global_events = Vec::new();
let mut global_reached_start = false;
// The first back-pagination will return these two.
mock_messages(
&server,
"prev_batch",
Some("prev_batch2"),
non_sync_events!(event_builder, [ (room_id, "$2": "world"), (room_id, "$3": "hello") ]),
)
.await;
// The second round of back-pagination will return this one.
mock_messages(
&server,
"prev_batch2",
None,
non_sync_events!(event_builder, [ (room_id, "$4": "oh well"), ]),
)
.await;
// Then if I backpaginate in a loop,
let pagination = room_event_cache.pagination();
while pagination.get_or_wait_for_token().await.is_some() {
pagination
.run_backwards(20, |outcome, timeline_has_been_reset| {
num_paginations += 1;
assert_matches!(timeline_has_been_reset, TimelineHasBeenResetWhilePaginating::No);
if !global_reached_start {
global_reached_start = outcome.reached_start;
}
global_events.extend(outcome.events);
ready(if outcome.reached_start {
ControlFlow::Break(())
} else {
ControlFlow::Continue(())
})
})
.await
.unwrap();
num_iterations += 1;
}
// I'll get all the previous events,
assert_eq!(num_iterations, 1); // in one iteration…
assert_eq!(num_paginations, 2); // … we get two paginations!
assert!(global_reached_start);
assert_event_matches_msg(&global_events[0], "world");
@@ -586,7 +727,18 @@ async fn test_reset_while_backpaginating() {
let backpagination = spawn({
let pagination = room_event_cache.pagination();
async move { pagination.run_backwards(20).await }
async move {
pagination
.run_backwards(20, |outcome, timeline_has_been_reset| {
assert_matches!(
timeline_has_been_reset,
TimelineHasBeenResetWhilePaginating::Yes
);
ready(ControlFlow::Break(outcome))
})
.await
}
});
// Receive the sync response (which clears the timeline).
@@ -656,7 +808,7 @@ async fn test_backpaginating_without_token() {
// If we try to back-paginate with a token, it will hit the end of the timeline
// and give us the resulting event.
let BackPaginationOutcome { events, reached_start } =
pagination.run_backwards(20).await.unwrap();
pagination.run_backwards(20, once).await.unwrap();
assert!(reached_start);