From 6c90f7aafffc8f51bc89373a94f68653f6711185 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 28 May 2024 08:57:59 +0200 Subject: [PATCH 1/4] =?UTF-8?q?feat(sdk):=20Add=20`RoomPagination::run=5Fb?= =?UTF-8?q?ackwards(=E2=80=A6,=20until)`.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch adds a new argument to `RoomPagination::run_backwards`: `until`. It becomes: pub async fn run_backwards(&self, batch_size: u16, mut until: F ) -> Result where F: FnMut(BackPaginationOutcome) -> Fut, Fut: Future>, The idea behind `until` is to run pagination _until_ `until` returns `ControlFlow::Break`, otherwise it continues paginating. This is useful is many scenearii (cf. the documentation). This is also and primarily the first step to stop adding events directly from the pagination, and starts adding events only and strictly only from `event_cache::RoomEventCacheUpdate` (again, see the `TODO` in the documentation). This is not done in this patch for the sake of ease of review. --- .../matrix-sdk-ui/src/timeline/pagination.rs | 68 +++++++++---------- .../matrix-sdk/src/event_cache/pagination.rs | 57 ++++++++++++++-- .../tests/integration/event_cache.rs | 14 ++-- 3 files changed, 93 insertions(+), 46 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/pagination.rs b/crates/matrix-sdk-ui/src/timeline/pagination.rs index 7d4595b2e..e9ac6a425 100644 --- a/crates/matrix-sdk-ui/src/timeline/pagination.rs +++ b/crates/matrix-sdk-ui/src/timeline/pagination.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::ControlFlow; + use async_rx::StreamExt as _; use async_stream::stream; use futures_core::Stream; @@ -67,49 +69,43 @@ impl super::Timeline { pub async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result { let pagination = self.event_cache.pagination(); - loop { - let result = pagination.run_backwards(batch_size).await; + let result = pagination + .run_backwards( + batch_size, + |BackPaginationOutcome { events, reached_start }| async move { + let num_events = events.len(); + trace!("Back-pagination succeeded with {num_events} events"); - let event_cache_outcome = match result { - Ok(outcome) => outcome, + // TODO(hywan): Remove, and let spread events via + // `matrix_sdk::event_cache::RoomEventCacheUpdate` from + // `matrix_sdk::event_cache::RoomPagination::run_backwards`. + self.inner + .add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination) + .await; - Err(EventCacheError::BackpaginationError( - PaginatorError::InvalidPreviousState { - actual: PaginatorState::Paginating, .. - }, - )) => { - warn!("Another pagination request is already happening, returning early"); - return Ok(false); - } + if num_events == 0 && !reached_start { + // As an exceptional contract: if there were no events in the response, + // and we've not hit the start of the timeline, retry until we get + // some events or reach the start of the timeline. + return ControlFlow::Continue(()); + } - Err(err) => return Err(err), - }; + ControlFlow::Break(reached_start) + }, + ) + .await; - let BackPaginationOutcome { events, reached_start } = event_cache_outcome; - - let num_events = events.len(); - trace!("Back-pagination succeeded with {num_events} events"); - - self.inner - .add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination) - .await; - - if reached_start { - return Ok(true); + match result { + Err(EventCacheError::BackpaginationError(PaginatorError::InvalidPreviousState { + actual: PaginatorState::Paginating, + .. + })) => { + warn!("Another pagination request is already happening, returning early"); + Ok(false) } - if num_events == 0 { - // As an exceptional contract: if there were no events in the response, - // and we've not hit the start of the timeline, retry until we get - // some events or reach the start of the timeline. - continue; - } - - // Exit the inner loop, and ask for another limit. - break; + result => result, } - - Ok(false) } /// Subscribe to the back-pagination status of a live timeline. diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index 9477dada8..e93cf4b05 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -14,7 +14,7 @@ //! A sub-object for running pagination tasks on a given room. -use std::{sync::Arc, time::Duration}; +use std::{future::Future, ops::ControlFlow, sync::Arc, time::Duration}; use eyeball::Subscriber; use matrix_sdk_base::deserialized_responses::SyncTimelineEvent; @@ -59,17 +59,64 @@ impl RoomPagination { /// This automatically takes care of waiting for a pagination token from /// sync, if we haven't done that before. /// + /// The `until` argument is an async closure that returns a [`ControlFlow`] + /// to decide whether a new pagination must be run or not. It's helpful when + /// the server replies with e.g. a certain set of events, but we would like + /// more, or the event we are looking for isn't part of this set: in this + /// case, `until` returns [`Control::Continue`], otherwise it returns + /// [`ControlFlow::Break`]. `until` receives [`BackPaginationOutcome`] as + /// its sole argument. + /// /// # Errors /// /// It may return an error if the pagination token used during /// back-pagination has disappeared while we started the pagination. In /// that case, it's desirable to call the method again. - #[instrument(skip(self))] - pub async fn run_backwards(&self, batch_size: u16) -> Result { + /// + /// # Example + /// + /// To do a single run: + /// + /// ```rust + /// use std::ops::ControlFlow; + /// + /// use matrix_sdk::event_cache::{BackPaginationOutcome, RoomPagination}; + /// + /// # async fn foo(room_pagination: RoomPagination) { + /// let result = room_pagination.run_backwards( + /// 42, + /// |BackPaginationOutcome { events, reached_start }| async move { + /// // Do something with `events` and `reached_start` maybe? + /// let _ = events; + /// let _ = reached_start; + /// + /// ControlFlow::Break(()) + /// } + /// ).await; + /// # } + #[instrument(skip(self, until))] + pub async fn run_backwards( + &self, + batch_size: u16, + mut until: Until, + ) -> Result + where + Until: FnMut(BackPaginationOutcome) -> UntilFuture, + UntilFuture: Future>, + { loop { - if let Some(result) = self.run_backwards_impl(batch_size).await? { - return Ok(result); + if let Some(outcome) = self.run_backwards_impl(batch_size).await? { + match until(outcome).await { + ControlFlow::Continue(()) => { + debug!("back-pagination continues"); + + continue; + } + + ControlFlow::Break(value) => return Ok(value), + } } + debug!("back-pagination has been internally restarted because of a timeline reset."); } } diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index 72029127f..8708e9ade 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{ops::ControlFlow, time::Duration}; use assert_matches2::{assert_let, assert_matches}; use matrix_sdk::{ @@ -24,6 +24,10 @@ use wiremock::{ use crate::mock_sync; +pub async fn once(outcome: BackPaginationOutcome) -> ControlFlow { + ControlFlow::Break(outcome) +} + #[async_test] async fn test_must_explicitly_subscribe() { let (client, server) = logged_in_client_with_server().await; @@ -362,7 +366,7 @@ async fn test_backpaginate_once() { assert!(pagination.get_or_wait_for_token().await.is_some()); - pagination.run_backwards(20).await.unwrap() + pagination.run_backwards(20, once).await.unwrap() }; // I'll get all the previous events, in "reverse" order (same as the response). @@ -450,7 +454,7 @@ async fn test_backpaginate_multiple_iterations() { let pagination = room_event_cache.pagination(); while pagination.get_or_wait_for_token().await.is_some() { let BackPaginationOutcome { reached_start, events } = - pagination.run_backwards(20).await.unwrap(); + pagination.run_backwards(20, once).await.unwrap(); if !global_reached_start { global_reached_start = reached_start; @@ -586,7 +590,7 @@ async fn test_reset_while_backpaginating() { let backpagination = spawn({ let pagination = room_event_cache.pagination(); - async move { pagination.run_backwards(20).await } + async move { pagination.run_backwards(20, once).await } }); // Receive the sync response (which clears the timeline). @@ -656,7 +660,7 @@ async fn test_backpaginating_without_token() { // If we try to back-paginate with a token, it will hit the end of the timeline // and give us the resulting event. let BackPaginationOutcome { events, reached_start } = - pagination.run_backwards(20).await.unwrap(); + pagination.run_backwards(20, once).await.unwrap(); assert!(reached_start); From 13df5d9db8bb16b1adeb57cbd41f6b8463d6a736 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 28 May 2024 11:32:24 +0200 Subject: [PATCH 2/4] feat(sdk): Add `TimelineHasBeenResetWhilePaginating`. This patch adds a new argument to the `until` argument closure of `RoomPagination::run_backwards`: `timeline_has_been_reset`, which designates when the timeline has been reset. A test has been updated accordingly. --- .../matrix-sdk-ui/src/timeline/pagination.rs | 3 +- crates/matrix-sdk/src/event_cache/mod.rs | 2 +- .../matrix-sdk/src/event_cache/pagination.rs | 29 ++++++++++++++++--- .../tests/integration/event_cache.rs | 25 +++++++++++++--- 4 files changed, 49 insertions(+), 10 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/pagination.rs b/crates/matrix-sdk-ui/src/timeline/pagination.rs index e9ac6a425..dd18392f6 100644 --- a/crates/matrix-sdk-ui/src/timeline/pagination.rs +++ b/crates/matrix-sdk-ui/src/timeline/pagination.rs @@ -72,7 +72,8 @@ impl super::Timeline { let result = pagination .run_backwards( batch_size, - |BackPaginationOutcome { events, reached_start }| async move { + |BackPaginationOutcome { events, reached_start }, + _timeline_has_been_reset| async move { let num_events = events.len(); trace!("Back-pagination succeeded with {num_events} events"); diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index c4226a2bc..8b859b959 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -76,7 +76,7 @@ mod pagination; mod store; pub mod paginator; -pub use pagination::RoomPagination; +pub use pagination::{RoomPagination, TimelineHasBeenResetWhilePaginating}; /// An error observed in the [`EventCache`]. #[derive(thiserror::Error, Debug)] diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index e93cf4b05..186e73d69 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -80,12 +80,17 @@ impl RoomPagination { /// ```rust /// use std::ops::ControlFlow; /// - /// use matrix_sdk::event_cache::{BackPaginationOutcome, RoomPagination}; + /// use matrix_sdk::event_cache::{ + /// BackPaginationOutcome, + /// RoomPagination, + /// TimelineHasBeenResetWhilePaginating + /// }; /// /// # async fn foo(room_pagination: RoomPagination) { /// let result = room_pagination.run_backwards( /// 42, - /// |BackPaginationOutcome { events, reached_start }| async move { + /// |BackPaginationOutcome { events, reached_start }, + /// _timeline_has_been_reset: TimelineHasBeenResetWhilePaginating| async move { /// // Do something with `events` and `reached_start` maybe? /// let _ = events; /// let _ = reached_start; @@ -101,15 +106,19 @@ impl RoomPagination { mut until: Until, ) -> Result where - Until: FnMut(BackPaginationOutcome) -> UntilFuture, + Until: FnMut(BackPaginationOutcome, TimelineHasBeenResetWhilePaginating) -> UntilFuture, UntilFuture: Future>, { + let mut timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No; + loop { if let Some(outcome) = self.run_backwards_impl(batch_size).await? { - match until(outcome).await { + match until(outcome, timeline_has_been_reset).await { ControlFlow::Continue(()) => { debug!("back-pagination continues"); + timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No; + continue; } @@ -117,6 +126,8 @@ impl RoomPagination { } } + timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::Yes; + debug!("back-pagination has been internally restarted because of a timeline reset."); } } @@ -306,6 +317,16 @@ impl RoomPagination { } } +/// A type representing whether the timeline has been reset. +#[derive(Debug)] +pub enum TimelineHasBeenResetWhilePaginating { + /// The timeline has been reset. + Yes, + + /// The timeline has not been reset. + No, +} + #[cfg(test)] mod tests { // Those tests require time to work, and it does not on wasm32. diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index 8708e9ade..dac1ead35 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -1,8 +1,11 @@ -use std::{ops::ControlFlow, time::Duration}; +use std::{future::ready, ops::ControlFlow, time::Duration}; use assert_matches2::{assert_let, assert_matches}; use matrix_sdk::{ - event_cache::{BackPaginationOutcome, EventCacheError, RoomEventCacheUpdate}, + event_cache::{ + BackPaginationOutcome, EventCacheError, RoomEventCacheUpdate, + TimelineHasBeenResetWhilePaginating, + }, test_utils::{assert_event_matches_msg, events::EventFactory, logged_in_client_with_server}, }; use matrix_sdk_test::{ @@ -24,7 +27,10 @@ use wiremock::{ use crate::mock_sync; -pub async fn once(outcome: BackPaginationOutcome) -> ControlFlow { +async fn once( + outcome: BackPaginationOutcome, + _timeline_has_been_reset: TimelineHasBeenResetWhilePaginating, +) -> ControlFlow { ControlFlow::Break(outcome) } @@ -590,7 +596,18 @@ async fn test_reset_while_backpaginating() { let backpagination = spawn({ let pagination = room_event_cache.pagination(); - async move { pagination.run_backwards(20, once).await } + async move { + pagination + .run_backwards(20, |outcome, timeline_has_been_reset| { + assert_matches!( + timeline_has_been_reset, + TimelineHasBeenResetWhilePaginating::Yes + ); + + ready(ControlFlow::Break(outcome)) + }) + .await + } }); // Receive the sync response (which clears the timeline). From 7377971da838e74ea7b37896a28268af9c8efa3c Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 28 May 2024 11:34:05 +0200 Subject: [PATCH 3/4] test(sdk): Ensure that `until` can trigger multiple iterations. This patch adds a test for `until` when it returns `ControlFlow::Continue` multiple times instead of `ControlFlow::Break` immediately. --- .../tests/integration/event_cache.rs | 116 +++++++++++++++++- 1 file changed, 115 insertions(+), 1 deletion(-) diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index dac1ead35..180e497cc 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -387,7 +387,7 @@ async fn test_backpaginate_once() { } #[async_test] -async fn test_backpaginate_multiple_iterations() { +async fn test_backpaginate_many_times_with_many_iterations() { let (client, server) = logged_in_client_with_server().await; let event_cache = client.event_cache(); @@ -491,6 +491,120 @@ async fn test_backpaginate_multiple_iterations() { assert!(room_stream.is_empty()); } +#[async_test] +async fn test_backpaginate_many_times_with_one_iteration() { + let (client, server) = logged_in_client_with_server().await; + + let event_cache = client.event_cache(); + + // Immediately subscribe the event cache to sync updates. + event_cache.subscribe().unwrap(); + + // If I sync and get informed I've joined The Room, and get a previous batch + // token, + let room_id = room_id!("!omelette:fromage.fr"); + + let event_builder = EventBuilder::new(); + let mut sync_builder = SyncResponseBuilder::new(); + + { + sync_builder.add_joined_room( + JoinedRoomBuilder::new(room_id) + // Note to self: a timeline must have at least single event to be properly + // serialized. + .add_timeline_event(event_builder.make_sync_message_event( + user_id!("@a:b.c"), + RoomMessageEventContent::text_plain("heyo"), + )) + .set_timeline_prev_batch("prev_batch".to_owned()), + ); + let response_body = sync_builder.build_json_sync_response(); + + mock_sync(&server, response_body, None).await; + client.sync_once(Default::default()).await.unwrap(); + server.reset().await; + } + + let (room_event_cache, _drop_handles) = + client.get_room(room_id).unwrap().event_cache().await.unwrap(); + + let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); + + // This is racy: either the initial message has been processed by the event + // cache (and no room updates will happen in this case), or it hasn't, and + // the stream will return the next message soon. + if events.is_empty() { + let _ = room_stream.recv().await.expect("read error"); + } else { + assert_eq!(events.len(), 1); + } + + let mut num_iterations = 0; + let mut global_events = Vec::new(); + let mut global_reached_start = false; + + // The first back-pagination will return these two. + mock_messages( + &server, + "prev_batch", + Some("prev_batch2"), + non_sync_events!(event_builder, [ (room_id, "$2": "world"), (room_id, "$3": "hello") ]), + ) + .await; + + // The second round of back-pagination will return this one. + mock_messages( + &server, + "prev_batch2", + None, + non_sync_events!(event_builder, [ (room_id, "$4": "oh well"), ]), + ) + .await; + + // Then if I backpaginate in a loop, + let pagination = room_event_cache.pagination(); + while pagination.get_or_wait_for_token().await.is_some() { + pagination + .run_backwards(20, |outcome, _timeline_has_been_reset| { + if !global_reached_start { + global_reached_start = outcome.reached_start; + } + + global_events.extend(outcome.events); + + ready(if outcome.reached_start { + ControlFlow::Break(()) + } else { + ControlFlow::Continue(()) + }) + }) + .await + .unwrap(); + + num_iterations += 1; + } + + // I'll get all the previous events, + assert_eq!(num_iterations, 1); // in one iteration! + assert!(global_reached_start); + + assert_event_matches_msg(&global_events[0], "world"); + assert_event_matches_msg(&global_events[1], "hello"); + assert_event_matches_msg(&global_events[2], "oh well"); + assert_eq!(global_events.len(), 3); + + // And next time I'll open the room, I'll get the events in the right order. + let (events, _receiver) = room_event_cache.subscribe().await.unwrap(); + + assert_event_matches_msg(&events[0], "oh well"); + assert_event_matches_msg(&events[1], "hello"); + assert_event_matches_msg(&events[2], "world"); + assert_event_matches_msg(&events[3], "heyo"); + assert_eq!(events.len(), 4); + + assert!(room_stream.is_empty()); +} + #[async_test] async fn test_reset_while_backpaginating() { let (client, server) = logged_in_client_with_server().await; From 9a1de1d9e80f0f806351ef4d7cfd66326a1eb68f Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 28 May 2024 15:43:01 +0200 Subject: [PATCH 4/4] test(sdk): Improve tests around `RoomPagination::run_backwards`. --- .../matrix-sdk/src/event_cache/pagination.rs | 2 +- .../tests/integration/event_cache.rs | 35 ++++++++++++++----- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index 186e73d69..2d178e655 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -115,7 +115,7 @@ impl RoomPagination { if let Some(outcome) = self.run_backwards_impl(batch_size).await? { match until(outcome, timeline_has_been_reset).await { ControlFlow::Continue(()) => { - debug!("back-pagination continues"); + trace!("back-pagination continues"); timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No; diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index 180e497cc..2e8070904 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -435,6 +435,7 @@ async fn test_backpaginate_many_times_with_many_iterations() { } let mut num_iterations = 0; + let mut num_paginations = 0; let mut global_events = Vec::new(); let mut global_reached_start = false; @@ -459,19 +460,29 @@ async fn test_backpaginate_many_times_with_many_iterations() { // Then if I backpaginate in a loop, let pagination = room_event_cache.pagination(); while pagination.get_or_wait_for_token().await.is_some() { - let BackPaginationOutcome { reached_start, events } = - pagination.run_backwards(20, once).await.unwrap(); + pagination + .run_backwards(20, |outcome, timeline_has_been_reset| { + num_paginations += 1; - if !global_reached_start { - global_reached_start = reached_start; - } - global_events.extend(events); + assert_matches!(timeline_has_been_reset, TimelineHasBeenResetWhilePaginating::No); + + if !global_reached_start { + global_reached_start = outcome.reached_start; + } + + global_events.extend(outcome.events); + + ready(ControlFlow::Break(())) + }) + .await + .unwrap(); num_iterations += 1; } // I'll get all the previous events, - assert_eq!(num_iterations, 2); + assert_eq!(num_iterations, 2); // in two iterations… + assert_eq!(num_paginations, 2); // … we get two paginations. assert!(global_reached_start); assert_event_matches_msg(&global_events[0], "world"); @@ -540,6 +551,7 @@ async fn test_backpaginate_many_times_with_one_iteration() { } let mut num_iterations = 0; + let mut num_paginations = 0; let mut global_events = Vec::new(); let mut global_reached_start = false; @@ -565,7 +577,11 @@ async fn test_backpaginate_many_times_with_one_iteration() { let pagination = room_event_cache.pagination(); while pagination.get_or_wait_for_token().await.is_some() { pagination - .run_backwards(20, |outcome, _timeline_has_been_reset| { + .run_backwards(20, |outcome, timeline_has_been_reset| { + num_paginations += 1; + + assert_matches!(timeline_has_been_reset, TimelineHasBeenResetWhilePaginating::No); + if !global_reached_start { global_reached_start = outcome.reached_start; } @@ -585,7 +601,8 @@ async fn test_backpaginate_many_times_with_one_iteration() { } // I'll get all the previous events, - assert_eq!(num_iterations, 1); // in one iteration! + assert_eq!(num_iterations, 1); // in one iteration… + assert_eq!(num_paginations, 2); // … we get two paginations! assert!(global_reached_start); assert_event_matches_msg(&global_events[0], "world");