From 392fd004d9986bfd0d757ba755638f3f5ab52d76 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 28 May 2024 18:12:53 +0200 Subject: [PATCH] =?UTF-8?q?Revert=20"feat(sdk):=20Add=20`RoomPagination::r?= =?UTF-8?q?un=5Fbackwards(=E2=80=A6,=20until)`"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../matrix-sdk-ui/src/timeline/pagination.rs | 69 +++---- crates/matrix-sdk/src/event_cache/mod.rs | 2 +- .../matrix-sdk/src/event_cache/pagination.rs | 78 +------- .../tests/integration/event_cache.rs | 178 ++---------------- 4 files changed, 55 insertions(+), 272 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/pagination.rs b/crates/matrix-sdk-ui/src/timeline/pagination.rs index dd18392f6..7d4595b2e 100644 --- a/crates/matrix-sdk-ui/src/timeline/pagination.rs +++ b/crates/matrix-sdk-ui/src/timeline/pagination.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::ControlFlow; - use async_rx::StreamExt as _; use async_stream::stream; use futures_core::Stream; @@ -69,44 +67,49 @@ impl super::Timeline { pub async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result { let pagination = self.event_cache.pagination(); - let result = pagination - .run_backwards( - batch_size, - |BackPaginationOutcome { events, reached_start }, - _timeline_has_been_reset| async move { - let num_events = events.len(); - trace!("Back-pagination succeeded with {num_events} events"); + loop { + let result = pagination.run_backwards(batch_size).await; - // TODO(hywan): Remove, and let spread events via - // `matrix_sdk::event_cache::RoomEventCacheUpdate` from - // `matrix_sdk::event_cache::RoomPagination::run_backwards`. - self.inner - .add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination) - .await; + let event_cache_outcome = match result { + Ok(outcome) => outcome, - if num_events == 0 && !reached_start { - // As an exceptional contract: if there were no events in the response, - // and we've not hit the start of the timeline, retry until we get - // some events or reach the start of the timeline. - return ControlFlow::Continue(()); - } + Err(EventCacheError::BackpaginationError( + PaginatorError::InvalidPreviousState { + actual: PaginatorState::Paginating, .. + }, + )) => { + warn!("Another pagination request is already happening, returning early"); + return Ok(false); + } - ControlFlow::Break(reached_start) - }, - ) - .await; + Err(err) => return Err(err), + }; - match result { - Err(EventCacheError::BackpaginationError(PaginatorError::InvalidPreviousState { - actual: PaginatorState::Paginating, - .. - })) => { - warn!("Another pagination request is already happening, returning early"); - Ok(false) + let BackPaginationOutcome { events, reached_start } = event_cache_outcome; + + let num_events = events.len(); + trace!("Back-pagination succeeded with {num_events} events"); + + self.inner + .add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination) + .await; + + if reached_start { + return Ok(true); } - result => result, + if num_events == 0 { + // As an exceptional contract: if there were no events in the response, + // and we've not hit the start of the timeline, retry until we get + // some events or reach the start of the timeline. + continue; + } + + // Exit the inner loop, and ask for another limit. + break; } + + Ok(false) } /// Subscribe to the back-pagination status of a live timeline. diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index e59d4be39..b4349737e 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -76,7 +76,7 @@ mod pagination; mod store; pub mod paginator; -pub use pagination::{RoomPagination, TimelineHasBeenResetWhilePaginating}; +pub use pagination::RoomPagination; /// An error observed in the [`EventCache`]. #[derive(thiserror::Error, Debug)] diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index 2d178e655..9477dada8 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -14,7 +14,7 @@ //! A sub-object for running pagination tasks on a given room. -use std::{future::Future, ops::ControlFlow, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use eyeball::Subscriber; use matrix_sdk_base::deserialized_responses::SyncTimelineEvent; @@ -59,75 +59,17 @@ impl RoomPagination { /// This automatically takes care of waiting for a pagination token from /// sync, if we haven't done that before. /// - /// The `until` argument is an async closure that returns a [`ControlFlow`] - /// to decide whether a new pagination must be run or not. It's helpful when - /// the server replies with e.g. a certain set of events, but we would like - /// more, or the event we are looking for isn't part of this set: in this - /// case, `until` returns [`Control::Continue`], otherwise it returns - /// [`ControlFlow::Break`]. `until` receives [`BackPaginationOutcome`] as - /// its sole argument. - /// /// # Errors /// /// It may return an error if the pagination token used during /// back-pagination has disappeared while we started the pagination. In /// that case, it's desirable to call the method again. - /// - /// # Example - /// - /// To do a single run: - /// - /// ```rust - /// use std::ops::ControlFlow; - /// - /// use matrix_sdk::event_cache::{ - /// BackPaginationOutcome, - /// RoomPagination, - /// TimelineHasBeenResetWhilePaginating - /// }; - /// - /// # async fn foo(room_pagination: RoomPagination) { - /// let result = room_pagination.run_backwards( - /// 42, - /// |BackPaginationOutcome { events, reached_start }, - /// _timeline_has_been_reset: TimelineHasBeenResetWhilePaginating| async move { - /// // Do something with `events` and `reached_start` maybe? - /// let _ = events; - /// let _ = reached_start; - /// - /// ControlFlow::Break(()) - /// } - /// ).await; - /// # } - #[instrument(skip(self, until))] - pub async fn run_backwards( - &self, - batch_size: u16, - mut until: Until, - ) -> Result - where - Until: FnMut(BackPaginationOutcome, TimelineHasBeenResetWhilePaginating) -> UntilFuture, - UntilFuture: Future>, - { - let mut timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No; - + #[instrument(skip(self))] + pub async fn run_backwards(&self, batch_size: u16) -> Result { loop { - if let Some(outcome) = self.run_backwards_impl(batch_size).await? { - match until(outcome, timeline_has_been_reset).await { - ControlFlow::Continue(()) => { - trace!("back-pagination continues"); - - timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No; - - continue; - } - - ControlFlow::Break(value) => return Ok(value), - } + if let Some(result) = self.run_backwards_impl(batch_size).await? { + return Ok(result); } - - timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::Yes; - debug!("back-pagination has been internally restarted because of a timeline reset."); } } @@ -317,16 +259,6 @@ impl RoomPagination { } } -/// A type representing whether the timeline has been reset. -#[derive(Debug)] -pub enum TimelineHasBeenResetWhilePaginating { - /// The timeline has been reset. - Yes, - - /// The timeline has not been reset. - No, -} - #[cfg(test)] mod tests { // Those tests require time to work, and it does not on wasm32. diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index a3e3891f0..5d6124f47 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -1,11 +1,8 @@ -use std::{future::ready, ops::ControlFlow, time::Duration}; +use std::time::Duration; use assert_matches2::{assert_let, assert_matches}; use matrix_sdk::{ - event_cache::{ - BackPaginationOutcome, EventCacheError, RoomEventCacheUpdate, - TimelineHasBeenResetWhilePaginating, - }, + event_cache::{BackPaginationOutcome, EventCacheError, RoomEventCacheUpdate}, test_utils::{assert_event_matches_msg, events::EventFactory, logged_in_client_with_server}, }; use matrix_sdk_test::{ @@ -27,13 +24,6 @@ use wiremock::{ use crate::mock_sync; -async fn once( - outcome: BackPaginationOutcome, - _timeline_has_been_reset: TimelineHasBeenResetWhilePaginating, -) -> ControlFlow { - ControlFlow::Break(outcome) -} - #[async_test] async fn test_must_explicitly_subscribe() { let (client, server) = logged_in_client_with_server().await; @@ -372,7 +362,7 @@ async fn test_backpaginate_once() { assert!(pagination.get_or_wait_for_token().await.is_some()); - pagination.run_backwards(20, once).await.unwrap() + pagination.run_backwards(20).await.unwrap() }; // I'll get all the previous events, in "reverse" order (same as the response). @@ -387,7 +377,7 @@ async fn test_backpaginate_once() { } #[async_test] -async fn test_backpaginate_many_times_with_many_iterations() { +async fn test_backpaginate_multiple_iterations() { let (client, server) = logged_in_client_with_server().await; let event_cache = client.event_cache(); @@ -435,7 +425,6 @@ async fn test_backpaginate_many_times_with_many_iterations() { } let mut num_iterations = 0; - let mut num_paginations = 0; let mut global_events = Vec::new(); let mut global_reached_start = false; @@ -460,149 +449,19 @@ async fn test_backpaginate_many_times_with_many_iterations() { // Then if I backpaginate in a loop, let pagination = room_event_cache.pagination(); while pagination.get_or_wait_for_token().await.is_some() { - pagination - .run_backwards(20, |outcome, timeline_has_been_reset| { - num_paginations += 1; + let BackPaginationOutcome { reached_start, events } = + pagination.run_backwards(20).await.unwrap(); - assert_matches!(timeline_has_been_reset, TimelineHasBeenResetWhilePaginating::No); - - if !global_reached_start { - global_reached_start = outcome.reached_start; - } - - global_events.extend(outcome.events); - - ready(ControlFlow::Break(())) - }) - .await - .unwrap(); + if !global_reached_start { + global_reached_start = reached_start; + } + global_events.extend(events); num_iterations += 1; } // I'll get all the previous events, - assert_eq!(num_iterations, 2); // in two iterations… - assert_eq!(num_paginations, 2); // … we get two paginations. - assert!(global_reached_start); - - assert_event_matches_msg(&global_events[0], "world"); - assert_event_matches_msg(&global_events[1], "hello"); - assert_event_matches_msg(&global_events[2], "oh well"); - assert_eq!(global_events.len(), 3); - - // And next time I'll open the room, I'll get the events in the right order. - let (events, _receiver) = room_event_cache.subscribe().await.unwrap(); - - assert_event_matches_msg(&events[0], "oh well"); - assert_event_matches_msg(&events[1], "hello"); - assert_event_matches_msg(&events[2], "world"); - assert_event_matches_msg(&events[3], "heyo"); - assert_eq!(events.len(), 4); - - assert!(room_stream.is_empty()); -} - -#[async_test] -async fn test_backpaginate_many_times_with_one_iteration() { - let (client, server) = logged_in_client_with_server().await; - - let event_cache = client.event_cache(); - - // Immediately subscribe the event cache to sync updates. - event_cache.subscribe().unwrap(); - - // If I sync and get informed I've joined The Room, and get a previous batch - // token, - let room_id = room_id!("!omelette:fromage.fr"); - - let event_builder = EventBuilder::new(); - let mut sync_builder = SyncResponseBuilder::new(); - - { - sync_builder.add_joined_room( - JoinedRoomBuilder::new(room_id) - // Note to self: a timeline must have at least single event to be properly - // serialized. - .add_timeline_event(event_builder.make_sync_message_event( - user_id!("@a:b.c"), - RoomMessageEventContent::text_plain("heyo"), - )) - .set_timeline_prev_batch("prev_batch".to_owned()), - ); - let response_body = sync_builder.build_json_sync_response(); - - mock_sync(&server, response_body, None).await; - client.sync_once(Default::default()).await.unwrap(); - server.reset().await; - } - - let (room_event_cache, _drop_handles) = - client.get_room(room_id).unwrap().event_cache().await.unwrap(); - - let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); - - // This is racy: either the initial message has been processed by the event - // cache (and no room updates will happen in this case), or it hasn't, and - // the stream will return the next message soon. - if events.is_empty() { - let _ = room_stream.recv().await.expect("read error"); - } else { - assert_eq!(events.len(), 1); - } - - let mut num_iterations = 0; - let mut num_paginations = 0; - let mut global_events = Vec::new(); - let mut global_reached_start = false; - - // The first back-pagination will return these two. - mock_messages( - &server, - "prev_batch", - Some("prev_batch2"), - non_sync_events!(event_builder, [ (room_id, "$2": "world"), (room_id, "$3": "hello") ]), - ) - .await; - - // The second round of back-pagination will return this one. - mock_messages( - &server, - "prev_batch2", - None, - non_sync_events!(event_builder, [ (room_id, "$4": "oh well"), ]), - ) - .await; - - // Then if I backpaginate in a loop, - let pagination = room_event_cache.pagination(); - while pagination.get_or_wait_for_token().await.is_some() { - pagination - .run_backwards(20, |outcome, timeline_has_been_reset| { - num_paginations += 1; - - assert_matches!(timeline_has_been_reset, TimelineHasBeenResetWhilePaginating::No); - - if !global_reached_start { - global_reached_start = outcome.reached_start; - } - - global_events.extend(outcome.events); - - ready(if outcome.reached_start { - ControlFlow::Break(()) - } else { - ControlFlow::Continue(()) - }) - }) - .await - .unwrap(); - - num_iterations += 1; - } - - // I'll get all the previous events, - assert_eq!(num_iterations, 1); // in one iteration… - assert_eq!(num_paginations, 2); // … we get two paginations! + assert_eq!(num_iterations, 2); assert!(global_reached_start); assert_event_matches_msg(&global_events[0], "world"); @@ -727,18 +586,7 @@ async fn test_reset_while_backpaginating() { let backpagination = spawn({ let pagination = room_event_cache.pagination(); - async move { - pagination - .run_backwards(20, |outcome, timeline_has_been_reset| { - assert_matches!( - timeline_has_been_reset, - TimelineHasBeenResetWhilePaginating::Yes - ); - - ready(ControlFlow::Break(outcome)) - }) - .await - } + async move { pagination.run_backwards(20).await } }); // Receive the sync response (which clears the timeline). @@ -808,7 +656,7 @@ async fn test_backpaginating_without_token() { // If we try to back-paginate with a token, it will hit the end of the timeline // and give us the resulting event. let BackPaginationOutcome { events, reached_start } = - pagination.run_backwards(20, once).await.unwrap(); + pagination.run_backwards(20).await.unwrap(); assert!(reached_start);