refactor(event cache): don't hold onto a live instance of the paginator in RoomEventCache

Instead of keeping state for the `Paginator` instance, we create one
when needs be, in the `run_backwards_impl` method, and initialize it
with a previous-batch token. This is simpler than keeping one alive, and
making sure that we reset it in the right places.
This commit is contained in:
Benjamin Bouvier
2025-02-26 10:42:42 +01:00
parent 7841ed8637
commit 74bc3dfb6e
8 changed files with 149 additions and 118 deletions

View File

@@ -25,11 +25,12 @@ use matrix_sdk::{
BaseVideoInfo, Thumbnail,
},
deserialized_responses::{ShieldState as SdkShieldState, ShieldStateCode},
event_cache::RoomPaginationStatus,
room::edit::EditedContent as SdkEditedContent,
Error,
};
use matrix_sdk_ui::timeline::{
self, EventItemOrigin, LiveBackPaginationStatus, Profile, RepliedToEvent, TimelineDetails,
self, EventItemOrigin, Profile, RepliedToEvent, TimelineDetails,
TimelineUniqueId as SdkTimelineUniqueId,
};
use mime::Mime;
@@ -757,7 +758,7 @@ pub trait TimelineListener: Sync + Send {
#[matrix_sdk_ffi_macros::export(callback_interface)]
pub trait PaginationStatusListener: Sync + Send {
fn on_update(&self, status: LiveBackPaginationStatus);
fn on_update(&self, status: RoomPaginationStatus);
}
#[derive(Clone, uniffi::Object)]

View File

@@ -92,7 +92,6 @@ pub use self::{
},
event_type_filter::TimelineEventTypeFilter,
item::{TimelineItem, TimelineItemKind, TimelineUniqueId},
pagination::LiveBackPaginationStatus,
traits::RoomExt,
virtual_item::VirtualTimelineItem,
};

View File

@@ -16,11 +16,7 @@ use async_rx::StreamExt as _;
use async_stream::stream;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt as _};
use matrix_sdk::event_cache::{
self,
paginator::{PaginatorError, PaginatorState},
EventCacheError, RoomPagination,
};
use matrix_sdk::event_cache::{self, EventCacheError, RoomPaginationStatus};
use tracing::{instrument, warn};
use super::Error;
@@ -83,11 +79,7 @@ impl super::Timeline {
}
}
Err(EventCacheError::BackpaginationError(
PaginatorError::InvalidPreviousState {
actual: PaginatorState::Paginating, ..
},
)) => {
Err(EventCacheError::AlreadyBackpaginating) => {
// Treat an already running pagination exceptionally, returning false so that
// the caller retries later.
warn!("Another pagination request is already happening, returning early");
@@ -108,7 +100,7 @@ impl super::Timeline {
/// call to [`Self::paginate_backwards()`].
pub async fn live_back_pagination_status(
&self,
) -> Option<(LiveBackPaginationStatus, impl Stream<Item = LiveBackPaginationStatus>)> {
) -> Option<(RoomPaginationStatus, impl Stream<Item = RoomPaginationStatus>)> {
if !self.controller.is_live().await {
return None;
}
@@ -117,53 +109,16 @@ impl super::Timeline {
let mut status = pagination.status();
let current_value =
LiveBackPaginationStatus::from_paginator_status(&pagination, status.next_now());
let current_value = status.next_now();
let stream = Box::pin(stream! {
let status_stream = status.dedup();
pin_mut!(status_stream);
while let Some(state) = status_stream.next().await {
yield LiveBackPaginationStatus::from_paginator_status(&pagination, state);
yield state;
}
});
Some((current_value, stream))
}
}
/// Status for the back-pagination on a live timeline.
#[derive(Debug, PartialEq)]
#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
pub enum LiveBackPaginationStatus {
/// No back-pagination is happening right now.
Idle {
/// Have we hit the start of the timeline, i.e. back-paginating wouldn't
/// have any effect?
hit_start_of_timeline: bool,
},
/// Back-pagination is already running in the background.
Paginating,
}
impl LiveBackPaginationStatus {
/// Converts from a [`PaginatorState`] into the live back-pagination status.
///
/// Private method instead of `From`/`Into` impl, to avoid making it public
/// API.
fn from_paginator_status(pagination: &RoomPagination, state: PaginatorState) -> Self {
match state {
PaginatorState::Initial => Self::Idle { hit_start_of_timeline: false },
PaginatorState::FetchingTargetEvent => {
panic!("unexpected paginator state for a live backpagination")
}
PaginatorState::Idle => {
Self::Idle { hit_start_of_timeline: pagination.hit_timeline_start() }
}
PaginatorState::Paginating => Self::Paginating,
}
}
}

View File

@@ -23,6 +23,7 @@ use futures_util::{
};
use matrix_sdk::{
config::SyncSettings,
event_cache::RoomPaginationStatus,
test_utils::{
logged_in_client_with_server,
mocks::{MatrixMockServer, RoomMessagesResponseTemplate},
@@ -32,9 +33,7 @@ use matrix_sdk_test::{
async_test, event_factory::EventFactory, mocks::mock_encryption_state, JoinedRoomBuilder,
StateTestEvent, SyncResponseBuilder, ALICE, BOB,
};
use matrix_sdk_ui::timeline::{
AnyOtherFullStateEventContent, LiveBackPaginationStatus, RoomExt, TimelineItemContent,
};
use matrix_sdk_ui::timeline::{AnyOtherFullStateEventContent, RoomExt, TimelineItemContent};
use once_cell::sync::Lazy;
use ruma::{
events::{room::message::MessageType, FullStateEventContent},
@@ -87,7 +86,7 @@ async fn test_back_pagination() {
server.reset().await;
};
let observe_paginating = async {
assert_eq!(back_pagination_status.next().await, Some(LiveBackPaginationStatus::Paginating));
assert_eq!(back_pagination_status.next().await, Some(RoomPaginationStatus::Paginating));
};
join(paginate, observe_paginating).await;
@@ -156,7 +155,7 @@ async fn test_back_pagination() {
assert!(hit_start);
assert_next_eq!(
back_pagination_status,
LiveBackPaginationStatus::Idle { hit_start_of_timeline: true }
RoomPaginationStatus::Idle { hit_timeline_start: true }
);
assert_pending!(timeline_stream);
@@ -299,10 +298,10 @@ async fn test_wait_for_token() {
timeline.paginate_backwards(10).await.unwrap();
};
let observe_paginating = async {
assert_eq!(back_pagination_status.next().await, Some(LiveBackPaginationStatus::Paginating));
assert_eq!(back_pagination_status.next().await, Some(RoomPaginationStatus::Paginating));
assert_eq!(
back_pagination_status.next().await,
Some(LiveBackPaginationStatus::Idle { hit_start_of_timeline: false })
Some(RoomPaginationStatus::Idle { hit_timeline_start: false })
);
};
let sync = async {
@@ -462,7 +461,7 @@ async fn test_timeline_reset_while_paginating() {
{
match update {
Some(state) => {
if state == LiveBackPaginationStatus::Paginating {
if state == RoomPaginationStatus::Paginating {
seen_paginating = true;
}
}
@@ -476,7 +475,7 @@ async fn test_timeline_reset_while_paginating() {
// Timeline start reached because second pagination response contains no end
// field.
assert_eq!(status, LiveBackPaginationStatus::Idle { hit_start_of_timeline: true });
assert_eq!(status, RoomPaginationStatus::Idle { hit_timeline_start: true });
};
let sync = async {
@@ -619,7 +618,7 @@ async fn test_empty_chunk() {
server.reset().await;
};
let observe_paginating = async {
assert_eq!(back_pagination_status.next().await, Some(LiveBackPaginationStatus::Paginating));
assert_eq!(back_pagination_status.next().await, Some(RoomPaginationStatus::Paginating));
};
join(paginate, observe_paginating).await;
@@ -729,7 +728,7 @@ async fn test_until_num_items_with_empty_chunk() {
timeline.paginate_backwards(10).await.unwrap();
};
let observe_paginating = async {
assert_eq!(back_pagination_status.next().await, Some(LiveBackPaginationStatus::Paginating));
assert_eq!(back_pagination_status.next().await, Some(RoomPaginationStatus::Paginating));
};
join(paginate, observe_paginating).await;
@@ -832,7 +831,7 @@ async fn test_back_pagination_aborted() {
}
});
assert_eq!(back_pagination_status.next().await, Some(LiveBackPaginationStatus::Paginating));
assert_eq!(back_pagination_status.next().await, Some(RoomPaginationStatus::Paginating));
// Abort the pagination!
paginate.abort();
@@ -843,7 +842,7 @@ async fn test_back_pagination_aborted() {
// The timeline should automatically reset to idle.
assert_next_eq!(
back_pagination_status,
LiveBackPaginationStatus::Idle { hit_start_of_timeline: false }
RoomPaginationStatus::Idle { hit_timeline_start: false }
);
// And there should be no other pending pagination status updates.

View File

@@ -33,7 +33,7 @@ use std::{
sync::{Arc, OnceLock},
};
use eyeball::Subscriber;
use eyeball::{SharedObservable, Subscriber};
use eyeball_im::VectorDiff;
use matrix_sdk_base::{
deserialized_responses::{AmbiguityChange, TimelineEvent},
@@ -69,7 +69,7 @@ mod pagination;
mod room;
pub mod paginator;
pub use pagination::{PaginationToken, RoomPagination};
pub use pagination::{PaginationToken, RoomPagination, RoomPaginationStatus};
pub use room::RoomEventCache;
/// An error observed in the [`EventCache`].
@@ -97,6 +97,11 @@ pub enum EventCacheError {
#[error("Error observed while back-paginating: {0}")]
BackpaginationError(#[from] PaginatorError),
/// Back-pagination was already happening in a given room, where we tried to
/// back-paginate again.
#[error("We were already back-paginating.")]
AlreadyBackpaginating,
/// An error happening when interacting with storage.
#[error(transparent)]
Storage(#[from] EventCacheStoreError),
@@ -720,13 +725,21 @@ impl EventCacheInner {
return Ok(room.clone());
}
let room_state =
RoomEventCacheState::new(room_id.to_owned(), self.store.clone()).await?;
let pagination_status =
SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
let room_state = RoomEventCacheState::new(
room_id.to_owned(),
self.store.clone(),
pagination_status.clone(),
)
.await?;
let room_version = self
.client
.get()
.and_then(|client| client.get_room(room_id))
.as_ref()
.map(|room| room.clone_info().room_version_or_default())
.unwrap_or_else(|| {
warn!("unknown room version for {room_id}, using default V1");
@@ -743,6 +756,7 @@ impl EventCacheInner {
let room_event_cache = RoomEventCache::new(
self.client.clone(),
room_state,
pagination_status,
room_id.to_owned(),
room_version,
self.all_events.clone(),

View File

@@ -16,7 +16,7 @@
use std::{sync::Arc, time::Duration};
use eyeball::Subscriber;
use eyeball::{SharedObservable, Subscriber};
use matrix_sdk_base::timeout::timeout;
use matrix_sdk_common::linked_chunk::ChunkContent;
use tracing::{debug, instrument, trace};
@@ -30,6 +30,44 @@ use super::{
},
BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheUpdate,
};
use crate::event_cache::{paginator::Paginator, EventCacheError};
/// Status for the back-pagination on a room event cache.
#[derive(Debug, PartialEq, Clone, Copy)]
#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
pub enum RoomPaginationStatus {
/// No back-pagination is happening right now.
Idle {
/// Have we hit the start of the timeline, i.e. back-paginating wouldn't
/// have any effect?
hit_timeline_start: bool,
},
/// Back-pagination is already running in the background.
Paginating,
}
/// Small RAII guard to reset the pagination status on drop, if not disarmed in
/// the meanwhile.
struct ResetStatusOnDrop {
prev_status: Option<RoomPaginationStatus>,
pagination_status: SharedObservable<RoomPaginationStatus>,
}
impl ResetStatusOnDrop {
/// Make the RAII guard have no effect.
fn disarm(mut self) {
self.prev_status = None;
}
}
impl Drop for ResetStatusOnDrop {
fn drop(&mut self) {
if let Some(status) = self.prev_status.take() {
let _ = self.pagination_status.set(status);
}
}
}
/// An API object to run pagination queries on a [`super::RoomEventCache`].
///
@@ -130,7 +168,18 @@ impl RoomPagination {
}
}
// There is at least one gap that must be resolved. Let's reach the network!
// There is at least one gap that must be resolved; reach the network.
// First, ensure there's no other ongoing back-pagination.
let prev_status = self.inner.pagination_status.set(RoomPaginationStatus::Paginating);
if !matches!(prev_status, RoomPaginationStatus::Idle { .. }) {
return Err(EventCacheError::AlreadyBackpaginating);
}
let reset_status_on_drop_guard = ResetStatusOnDrop {
prev_status: Some(prev_status),
pagination_status: self.inner.pagination_status.clone(),
};
let prev_token = self.get_or_wait_for_token(Some(DEFAULT_WAIT_FOR_TOKEN_DURATION)).await;
@@ -143,9 +192,11 @@ impl RoomPagination {
}
};
let paginator = &self.inner.paginator;
let paginator = Paginator::new(self.inner.weak_room.clone());
paginator.set_idle_state(PaginatorState::Idle, prev_token.clone(), None)?;
paginator
.set_idle_state(PaginatorState::Idle, prev_token.clone(), None)
.expect("a pristine paginator must be in the initial state");
// Run the actual pagination.
let PaginationResult { events, hit_end_of_timeline: reached_start } =
@@ -298,6 +349,14 @@ impl RoomPagination {
let backpagination_outcome = BackPaginationOutcome { events, reached_start };
// Back-pagination's over; time to disarm the status guard.
reset_status_on_drop_guard.disarm();
// Notify subscribers that pagination ended.
self.inner
.pagination_status
.set(RoomPaginationStatus::Idle { hit_timeline_start: reached_start });
if !sync_timeline_events_diffs.is_empty() {
let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
diffs: sync_timeline_events_diffs,
@@ -376,16 +435,8 @@ impl RoomPagination {
/// Returns a subscriber to the pagination status used for the
/// back-pagination integrated to the event cache.
pub fn status(&self) -> Subscriber<PaginatorState> {
self.inner.paginator.state()
}
/// Returns whether we've hit the start of the timeline.
///
/// This is true if, and only if, we didn't have a previous-batch token and
/// running backwards pagination would be useless.
pub fn hit_timeline_start(&self) -> bool {
self.inner.paginator.hit_timeline_start()
pub fn status(&self) -> Subscriber<RoomPaginationStatus> {
self.inner.pagination_status.subscribe()
}
}

View File

@@ -25,6 +25,7 @@ use std::{
};
use events::{sort_positions_descending, Gap};
use eyeball::SharedObservable;
use eyeball_im::VectorDiff;
use matrix_sdk_base::{
deserialized_responses::{AmbiguityChange, TimelineEvent},
@@ -42,10 +43,8 @@ use tokio::sync::{
use tracing::{error, trace, warn};
use super::{
deduplicator::DeduplicationOutcome,
paginator::{Paginator, PaginatorState},
AllEventsCache, AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheUpdate,
RoomPagination,
deduplicator::DeduplicationOutcome, AllEventsCache, AutoShrinkChannelPayload, EventsOrigin,
Result, RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
};
use crate::{client::WeakClient, room::WeakRoom};
@@ -143,6 +142,7 @@ impl RoomEventCache {
pub(super) fn new(
client: WeakClient,
state: RoomEventCacheState,
pagination_status: SharedObservable<RoomPaginationStatus>,
room_id: OwnedRoomId,
room_version: RoomVersionId,
all_events_cache: Arc<RwLock<AllEventsCache>>,
@@ -152,6 +152,7 @@ impl RoomEventCache {
inner: Arc::new(RoomEventCacheInner::new(
client,
state,
pagination_status,
room_id,
room_version,
all_events_cache,
@@ -237,10 +238,6 @@ impl RoomEventCache {
// Clear the (temporary) events mappings.
self.inner.all_events.write().await.clear();
// Reset the paginator.
// TODO: properly stop any ongoing back-pagination.
let _ = self.inner.paginator.set_idle_state(PaginatorState::Initial, None, None);
// Notify observers about the update.
let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
diffs: updates_as_vector_diffs,
@@ -296,8 +293,10 @@ pub(super) struct RoomEventCacheInner {
/// The room id for this room.
room_id: OwnedRoomId,
pub weak_room: WeakRoom,
/// The room version for this room.
pub(crate) room_version: RoomVersionId,
pub room_version: RoomVersionId,
/// Sender part for subscribers to this room.
pub sender: Sender<RoomEventCacheUpdate>,
@@ -314,15 +313,7 @@ pub(super) struct RoomEventCacheInner {
/// A notifier that we received a new pagination token.
pub pagination_batch_token_notifier: Notify,
/// A paginator instance, that's configured to run back-pagination on our
/// behalf.
///
/// Note: forward-paginations are still run "out-of-band", that is,
/// disconnected from the event cache, as we don't implement matching
/// events received from those kinds of pagination with the cache. This
/// paginator is only used for queries that interact with the actual event
/// cache.
pub paginator: Paginator<WeakRoom>,
pub pagination_status: SharedObservable<RoomPaginationStatus>,
/// Sender to the auto-shrink channel.
///
@@ -337,6 +328,7 @@ impl RoomEventCacheInner {
fn new(
client: WeakClient,
state: RoomEventCacheState,
pagination_status: SharedObservable<RoomPaginationStatus>,
room_id: OwnedRoomId,
room_version: RoomVersionId,
all_events_cache: Arc<RwLock<AllEventsCache>>,
@@ -346,13 +338,14 @@ impl RoomEventCacheInner {
let weak_room = WeakRoom::new(client, room_id);
Self {
room_id: weak_room.room_id().to_owned(),
weak_room,
room_version,
state: RwLock::new(state),
all_events: all_events_cache,
sender,
pagination_batch_token_notifier: Default::default(),
paginator: Paginator::new(weak_room),
auto_shrink_sender,
pagination_status,
}
}
@@ -502,9 +495,6 @@ impl RoomEventCacheInner {
)
.await?;
// Reset the paginator status to initial.
self.paginator.set_idle_state(PaginatorState::Initial, prev_batch, None)?;
Ok(())
}
@@ -663,6 +653,7 @@ pub(super) enum LoadMoreEventsBackwardsOutcome {
mod private {
use std::sync::{atomic::AtomicUsize, Arc};
use eyeball::SharedObservable;
use eyeball_im::VectorDiff;
use matrix_sdk_base::{
deserialized_responses::{TimelineEvent, TimelineEventKind},
@@ -682,6 +673,7 @@ mod private {
events::RoomEvents,
sort_positions_descending, LoadMoreEventsBackwardsOutcome,
};
use crate::event_cache::RoomPaginationStatus;
/// State for a single room's event cache.
///
@@ -709,6 +701,8 @@ mod private {
/// that upon clearing the timeline events.
pub waited_for_initial_prev_token: bool,
pagination_status: SharedObservable<RoomPaginationStatus>,
/// An atomic count of the current number of listeners of the
/// [`super::RoomEventCache`].
pub(super) listener_count: Arc<AtomicUsize>,
@@ -727,6 +721,7 @@ mod private {
pub async fn new(
room_id: OwnedRoomId,
store: Arc<OnceCell<EventCacheStoreLock>>,
pagination_status: SharedObservable<RoomPaginationStatus>,
) -> Result<Self, EventCacheError> {
let (events, deduplicator) = if let Some(store) = store.get() {
let store_lock = store.lock().await?;
@@ -769,6 +764,7 @@ mod private {
deduplicator,
waited_for_initial_prev_token: false,
listener_count: Default::default(),
pagination_status,
})
}
@@ -848,11 +844,15 @@ mod private {
// All good, let's continue with this chunk.
new_first_chunk
}
Ok(None) => {
// No previous chunk: no events to insert. Better, it means we've reached
// the start of the timeline!
self.pagination_status
.set(RoomPaginationStatus::Idle { hit_timeline_start: true });
return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
}
Err(err) => {
error!("error when loading the previous chunk of a linked chunk: {err}");
@@ -948,6 +948,11 @@ mod private {
return self.reset().await.map(Some);
}
// Let pagination observers know that we may have not reached the start of the
// timeline.
// TODO: likely need to cancel any ongoing pagination.
self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
// Don't propagate those updates to the store; this is only for the in-memory
// representation that we're doing this. Let's drain those store updates.
let _ = self.events.store_updates().take();
@@ -1132,7 +1137,13 @@ mod private {
pub async fn reset(&mut self) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
self.events.reset();
self.propagate_changes().await?;
// Reset the pagination state too: pretend we never waited for the initial
// prev-batch token, and indicate that we're not at the start of the
// timeline, since we don't.
self.waited_for_initial_prev_token = false;
// TODO: likely must cancel any ongoing back-paginations too
self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
Ok(self.events.updates_as_vector_diffs())
}

View File

@@ -8,8 +8,8 @@ use matrix_sdk::{
assert_let_timeout, assert_next_matches_with_timeout,
deserialized_responses::TimelineEvent,
event_cache::{
paginator::PaginatorState, BackPaginationOutcome, EventCacheError, PaginationToken,
RoomEventCacheUpdate,
BackPaginationOutcome, EventCacheError, PaginationToken, RoomEventCacheUpdate,
RoomPaginationStatus,
},
linked_chunk::{ChunkIdentifier, Position, Update},
test_utils::{
@@ -784,10 +784,10 @@ async fn test_limited_timeline_resets_pagination() {
.mount()
.await;
// At the beginning, the paginator is in the initial state.
// At the beginning, the paginator is in the idle state.
let pagination = room_event_cache.pagination();
let mut pagination_status = pagination.status();
assert_eq!(pagination_status.get(), PaginatorState::Initial);
assert_eq!(pagination_status.get(), RoomPaginationStatus::Idle { hit_timeline_start: false });
// If we try to back-paginate with a token, it will hit the end of the timeline
// and give us the resulting event.
@@ -808,8 +808,10 @@ async fn test_limited_timeline_resets_pagination() {
// And the paginator state delivers this as an update, and is internally
// consistent with it:
assert_next_matches_with_timeout!(pagination_status, PaginatorState::Idle);
assert!(pagination.hit_timeline_start());
assert_next_matches_with_timeout!(
pagination_status,
RoomPaginationStatus::Idle { hit_timeline_start: true }
);
// When a limited sync comes back from the server,
server.sync_room(&client, JoinedRoomBuilder::new(room_id).set_timeline_limited()).await;
@@ -821,13 +823,12 @@ async fn test_limited_timeline_resets_pagination() {
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Clear = &diffs[0]);
// The paginator state is reset: status set to Initial, hasn't hit the timeline
// The paginator state is reset: status set to Idle, hasn't hit the timeline
// start.
assert!(!pagination.hit_timeline_start());
assert_eq!(pagination_status.get(), PaginatorState::Initial);
// We receive an update about the paginator status.
assert_next_matches_with_timeout!(pagination_status, PaginatorState::Initial);
assert_next_matches_with_timeout!(
pagination_status,
RoomPaginationStatus::Idle { hit_timeline_start: false }
);
assert!(room_stream.is_empty());
}