From af870fcff3c71d2c614a48fe19cb7ef36368fcdc Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Tue, 6 Jun 2023 13:06:36 +0200 Subject: [PATCH] ui: Allow waiting for token before starting pagination --- Cargo.lock | 1 + crates/matrix-sdk-ui/Cargo.toml | 1 + crates/matrix-sdk-ui/src/timeline/builder.rs | 21 +++++++++++++++-- crates/matrix-sdk-ui/src/timeline/mod.rs | 23 ++++++++++++------- .../matrix-sdk-ui/src/timeline/pagination.rs | 16 ++++++++++++- 5 files changed, 51 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 807aa3b14..56ba8ed40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2946,6 +2946,7 @@ dependencies = [ "assert-json-diff", "assert_matches", "async-once-cell", + "async-std", "async-stream", "async-trait", "chrono", diff --git a/crates/matrix-sdk-ui/Cargo.toml b/crates/matrix-sdk-ui/Cargo.toml index ff89ec8d3..1b6b34c6c 100644 --- a/crates/matrix-sdk-ui/Cargo.toml +++ b/crates/matrix-sdk-ui/Cargo.toml @@ -18,6 +18,7 @@ testing = ["matrix-sdk/testing", "dep:eyeball-im-util"] [dependencies] async-once-cell = "0.5.2" +async-std = { version = "1.12.0", features = ["unstable"] } async-stream = { workspace = true, optional = true } async-trait = { workspace = true } chrono = "0.4.23" diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index 18e547c8e..1fdffd351 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -14,12 +14,13 @@ use std::sync::Arc; +use async_std::sync::Mutex; use imbl::Vector; use matrix_sdk::{ deserialized_responses::SyncTimelineEvent, executor::spawn, room, sync::RoomUpdate, }; use ruma::events::receipt::{ReceiptThread, ReceiptType}; -use tokio::sync::{broadcast, Mutex}; +use tokio::sync::broadcast; use tracing::{error, warn}; #[cfg(feature = "e2e-encryption")] @@ -123,9 +124,12 @@ impl TimelineBuilder { let room = inner.room(); let client = room.client(); + let start_token = Arc::new(Mutex::new(prev_token)); + let mut room_update_rx = room.subscribe_to_updates(); let room_update_join_handle = spawn({ let inner = inner.clone(); + let start_token = start_token.clone(); async move { loop { let update = match room_update_rx.recv().await { @@ -138,11 +142,23 @@ impl TimelineBuilder { } }; + let update_start_token = |prev_batch: &Option<_>| { + // Only update start_token if it's not currently locked. + // If it is locked, pagination is currently in progress. + if let Some(mut start_token) = start_token.try_lock() { + if start_token.is_none() && prev_batch.is_some() { + *start_token = prev_batch.clone(); + } + } + }; + match update { RoomUpdate::Left { updates, .. } => { + update_start_token(&updates.timeline.prev_batch); inner.handle_sync_timeline(updates.timeline).await; } RoomUpdate::Joined { updates, .. } => { + update_start_token(&updates.timeline.prev_batch); inner.handle_joined_room_update(updates).await; } RoomUpdate::Invited { .. } => { @@ -174,7 +190,8 @@ impl TimelineBuilder { let timeline = Timeline { inner, - start_token: Mutex::new(prev_token), + start_token, + start_token_condvar: Default::default(), _end_token: Mutex::new(None), drop_handle: Arc::new(TimelineDropHandle { client, diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index 757e3eee7..14307e1be 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -18,6 +18,7 @@ use std::{fs, path::Path, pin::Pin, sync::Arc, task::Poll}; +use async_std::sync::{Condvar, Mutex}; use eyeball_im::VectorDiff; use futures_core::Stream; use futures_util::TryFutureExt; @@ -42,8 +43,7 @@ use ruma::{ EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, TransactionId, UserId, }; use thiserror::Error; -use tokio::sync::Mutex; -use tracing::{error, instrument, warn}; +use tracing::{error, info, instrument, warn}; mod builder; mod event_handler; @@ -87,7 +87,8 @@ const DEFAULT_SANITIZER_MODE: HtmlSanitizerMode = HtmlSanitizerMode::Compat; #[derive(Debug)] pub struct Timeline { inner: Arc>, - start_token: Mutex>, + start_token: Arc>>, + start_token_condvar: Arc, _end_token: Mutex>, drop_handle: Arc, } @@ -117,11 +118,17 @@ impl Timeline { #[instrument(skip_all, fields(room_id = ?self.room().room_id(), ?options))] pub async fn paginate_backwards(&self, mut options: PaginationOptions<'_>) -> Result<()> { let mut start_lock = self.start_token.lock().await; - if start_lock.is_none() - && self.inner.items().await.front().map_or(false, |item| item.is_timeline_start()) - { - warn!("Start of timeline reached, ignoring backwards-pagination request"); - return Ok(()); + if start_lock.is_none() { + if self.inner.items().await.front().map_or(false, |item| item.is_timeline_start()) { + warn!("Start of timeline reached, ignoring backwards-pagination request"); + return Ok(()); + } + + if options.wait_for_token { + info!("No prev_batch token, waiting"); + start_lock = + self.start_token_condvar.wait_until(start_lock, |tok| tok.is_some()).await; + } } self.inner.add_loading_indicator().await; diff --git a/crates/matrix-sdk-ui/src/timeline/pagination.rs b/crates/matrix-sdk-ui/src/timeline/pagination.rs index ef0f10ab5..f4bfdc1ac 100644 --- a/crates/matrix-sdk-ui/src/timeline/pagination.rs +++ b/crates/matrix-sdk-ui/src/timeline/pagination.rs @@ -17,6 +17,7 @@ use std::{fmt, ops::ControlFlow}; /// Options for pagination. pub struct PaginationOptions<'a> { inner: PaginationOptionsInner<'a>, + pub(super) wait_for_token: bool, } impl<'a> PaginationOptions<'a> { @@ -60,6 +61,19 @@ impl<'a> PaginationOptions<'a> { }) } + /// Whether to wait for a pagination token to be set before starting. + /// + /// This is not something you should normally do since it can lead to very + /// long wait times, however in the specific case of using sliding sync with + /// the current proxy and subscribing to the room in a way that you know a + /// sync will be coming in soon, it can be useful to reduce unnecessary + /// traffic from duplicated events and avoid ordering issues from the sync + /// proxy returning older data than pagination. + pub fn wait_for_token(mut self) -> Self { + self.wait_for_token = true; + self + } + pub(super) fn next_event_limit( &mut self, pagination_outcome: PaginationOutcome, @@ -81,7 +95,7 @@ impl<'a> PaginationOptions<'a> { } fn new(inner: PaginationOptionsInner<'a>) -> Self { - Self { inner } + Self { inner, wait_for_token: false } } }