ui: Allow waiting for token before starting pagination

This commit is contained in:
Jonas Platte
2023-06-06 13:06:36 +02:00
committed by Jonas Platte
parent 1e10a54d3d
commit af870fcff3
5 changed files with 51 additions and 11 deletions

1
Cargo.lock generated
View File

@@ -2946,6 +2946,7 @@ dependencies = [
"assert-json-diff",
"assert_matches",
"async-once-cell",
"async-std",
"async-stream",
"async-trait",
"chrono",

View File

@@ -18,6 +18,7 @@ testing = ["matrix-sdk/testing", "dep:eyeball-im-util"]
[dependencies]
async-once-cell = "0.5.2"
async-std = { version = "1.12.0", features = ["unstable"] }
async-stream = { workspace = true, optional = true }
async-trait = { workspace = true }
chrono = "0.4.23"

View File

@@ -14,12 +14,13 @@
use std::sync::Arc;
use async_std::sync::Mutex;
use imbl::Vector;
use matrix_sdk::{
deserialized_responses::SyncTimelineEvent, executor::spawn, room, sync::RoomUpdate,
};
use ruma::events::receipt::{ReceiptThread, ReceiptType};
use tokio::sync::{broadcast, Mutex};
use tokio::sync::broadcast;
use tracing::{error, warn};
#[cfg(feature = "e2e-encryption")]
@@ -123,9 +124,12 @@ impl TimelineBuilder {
let room = inner.room();
let client = room.client();
let start_token = Arc::new(Mutex::new(prev_token));
let mut room_update_rx = room.subscribe_to_updates();
let room_update_join_handle = spawn({
let inner = inner.clone();
let start_token = start_token.clone();
async move {
loop {
let update = match room_update_rx.recv().await {
@@ -138,11 +142,23 @@ impl TimelineBuilder {
}
};
let update_start_token = |prev_batch: &Option<_>| {
// Only update start_token if it's not currently locked.
// If it is locked, pagination is currently in progress.
if let Some(mut start_token) = start_token.try_lock() {
if start_token.is_none() && prev_batch.is_some() {
*start_token = prev_batch.clone();
}
}
};
match update {
RoomUpdate::Left { updates, .. } => {
update_start_token(&updates.timeline.prev_batch);
inner.handle_sync_timeline(updates.timeline).await;
}
RoomUpdate::Joined { updates, .. } => {
update_start_token(&updates.timeline.prev_batch);
inner.handle_joined_room_update(updates).await;
}
RoomUpdate::Invited { .. } => {
@@ -174,7 +190,8 @@ impl TimelineBuilder {
let timeline = Timeline {
inner,
start_token: Mutex::new(prev_token),
start_token,
start_token_condvar: Default::default(),
_end_token: Mutex::new(None),
drop_handle: Arc::new(TimelineDropHandle {
client,

View File

@@ -18,6 +18,7 @@
use std::{fs, path::Path, pin::Pin, sync::Arc, task::Poll};
use async_std::sync::{Condvar, Mutex};
use eyeball_im::VectorDiff;
use futures_core::Stream;
use futures_util::TryFutureExt;
@@ -42,8 +43,7 @@ use ruma::{
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, TransactionId, UserId,
};
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::{error, instrument, warn};
use tracing::{error, info, instrument, warn};
mod builder;
mod event_handler;
@@ -87,7 +87,8 @@ const DEFAULT_SANITIZER_MODE: HtmlSanitizerMode = HtmlSanitizerMode::Compat;
#[derive(Debug)]
pub struct Timeline {
inner: Arc<TimelineInner<room::Common>>,
start_token: Mutex<Option<String>>,
start_token: Arc<Mutex<Option<String>>>,
start_token_condvar: Arc<Condvar>,
_end_token: Mutex<Option<String>>,
drop_handle: Arc<TimelineDropHandle>,
}
@@ -117,11 +118,17 @@ impl Timeline {
#[instrument(skip_all, fields(room_id = ?self.room().room_id(), ?options))]
pub async fn paginate_backwards(&self, mut options: PaginationOptions<'_>) -> Result<()> {
let mut start_lock = self.start_token.lock().await;
if start_lock.is_none()
&& self.inner.items().await.front().map_or(false, |item| item.is_timeline_start())
{
warn!("Start of timeline reached, ignoring backwards-pagination request");
return Ok(());
if start_lock.is_none() {
if self.inner.items().await.front().map_or(false, |item| item.is_timeline_start()) {
warn!("Start of timeline reached, ignoring backwards-pagination request");
return Ok(());
}
if options.wait_for_token {
info!("No prev_batch token, waiting");
start_lock =
self.start_token_condvar.wait_until(start_lock, |tok| tok.is_some()).await;
}
}
self.inner.add_loading_indicator().await;

View File

@@ -17,6 +17,7 @@ use std::{fmt, ops::ControlFlow};
/// Options for pagination.
pub struct PaginationOptions<'a> {
inner: PaginationOptionsInner<'a>,
pub(super) wait_for_token: bool,
}
impl<'a> PaginationOptions<'a> {
@@ -60,6 +61,19 @@ impl<'a> PaginationOptions<'a> {
})
}
/// Whether to wait for a pagination token to be set before starting.
///
/// This is not something you should normally do since it can lead to very
/// long wait times, however in the specific case of using sliding sync with
/// the current proxy and subscribing to the room in a way that you know a
/// sync will be coming in soon, it can be useful to reduce unnecessary
/// traffic from duplicated events and avoid ordering issues from the sync
/// proxy returning older data than pagination.
pub fn wait_for_token(mut self) -> Self {
self.wait_for_token = true;
self
}
pub(super) fn next_event_limit(
&mut self,
pagination_outcome: PaginationOutcome,
@@ -81,7 +95,7 @@ impl<'a> PaginationOptions<'a> {
}
fn new(inner: PaginationOptionsInner<'a>) -> Self {
Self { inner }
Self { inner, wait_for_token: false }
}
}