refactor(event cache): call /messages directly in the room pagination

And don'y rely on the `Paginator`. This simplifies the code a bit,
avoids a few methods on the `Paginator`, and makes it more
straightforward the pagination happens.
This commit is contained in:
Benjamin Bouvier
2025-02-27 14:04:36 +01:00
parent 7a0bf9b9b9
commit 55143e1790
2 changed files with 33 additions and 77 deletions

View File

@@ -17,21 +17,23 @@
use std::{sync::Arc, time::Duration};
use eyeball::{SharedObservable, Subscriber};
use matrix_sdk_base::{linked_chunk::ChunkIdentifier, timeout::timeout};
use matrix_sdk_base::{
deserialized_responses::TimelineEvent, linked_chunk::ChunkIdentifier, timeout::timeout,
};
use matrix_sdk_common::linked_chunk::ChunkContent;
use ruma::api::Direction;
use tokio::sync::RwLockWriteGuard;
use tracing::{debug, instrument, trace};
use super::{
deduplicator::DeduplicationOutcome,
paginator::{PaginationResult, PaginatorState},
room::{
events::{Gap, RoomEvents},
LoadMoreEventsBackwardsOutcome, RoomEventCacheInner,
},
BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheState, RoomEventCacheUpdate,
};
use crate::event_cache::{paginator::Paginator, EventCacheError};
use crate::{event_cache::EventCacheError, room::MessagesOptions};
/// Status for the back-pagination on a room event cache.
#[derive(Debug, PartialEq, Clone, Copy)]
@@ -233,20 +235,27 @@ impl RoomPagination {
}
};
let (pagination_result, new_gap) = {
// Use a throw-away paginator instance.
let paginator = Paginator::new(self.inner.weak_room.clone());
let (events, new_gap) = {
let Some(room) = self.inner.weak_room.get() else {
// The client is shutting down, return an empty default response.
return Ok(Some(BackPaginationOutcome {
reached_start: false,
events: Default::default(),
}));
};
paginator
.set_idle_state(PaginatorState::Idle, prev_token.clone(), None)
.expect("a pristine paginator must be in the initial state");
let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
options.limit = batch_size.into();
// The network request happens here:
let result = paginator.paginate_backward(batch_size.into()).await?;
let response = room.messages(options).await.map_err(|err| {
EventCacheError::BackpaginationError(
crate::event_cache::paginator::PaginatorError::SdkError(Box::new(err)),
)
})?;
let new_gap = paginator.prev_batch_token().map(|prev_token| Gap { prev_token });
let new_gap = response.end.map(|prev_token| Gap { prev_token });
(result, new_gap)
(response.chunk, new_gap)
};
// Make sure the `RoomEvents` isn't updated while we are saving events from
@@ -275,20 +284,19 @@ impl RoomPagination {
None
};
self.handle_network_pagination_result(state, pagination_result, new_gap, prev_gap_id)
.await
.map(Some)
self.handle_network_pagination_result(state, events, new_gap, prev_gap_id).await.map(Some)
}
/// Handle the result of a successful network back-pagination.
async fn handle_network_pagination_result(
&self,
mut state: RwLockWriteGuard<'_, RoomEventCacheState>,
pagination_result: PaginationResult,
events: Vec<TimelineEvent>,
new_gap: Option<Gap>,
prev_gap_id: Option<ChunkIdentifier>,
) -> Result<BackPaginationOutcome> {
let PaginationResult { events, hit_end_of_timeline } = pagination_result;
// If there's no new previous gap, then we've reached the start of the timeline.
let reached_start = new_gap.is_none();
let (
DeduplicationOutcome {
@@ -405,7 +413,7 @@ impl RoomPagination {
state.events()
.chunks()
.next()
.map_or(hit_end_of_timeline, |chunk| chunk.is_definitive_head())
.map_or(reached_start, |chunk| chunk.is_definitive_head())
};
let backpagination_outcome = BackPaginationOutcome { events, reached_start };

View File

@@ -192,57 +192,6 @@ impl<PR: PaginableRoom> Paginator<PR> {
self.state.subscribe()
}
/// Prepares the paginator to be in the idle state, ready for backwards- and
/// forwards- pagination.
///
/// Will return an `InvalidPreviousState` error if the paginator is busy
/// (running /context or /messages).
pub(super) fn set_idle_state(
&self,
next_state: PaginatorState,
prev_batch_token: Option<String>,
next_batch_token: Option<String>,
) -> Result<(), PaginatorError> {
let prev_state = self.state.get();
match next_state {
PaginatorState::Initial | PaginatorState::Idle => {}
PaginatorState::FetchingTargetEvent | PaginatorState::Paginating => {
panic!("internal error: set_idle_state only accept Initial|Idle next states");
}
}
match prev_state {
PaginatorState::Initial | PaginatorState::Idle => {}
PaginatorState::FetchingTargetEvent | PaginatorState::Paginating => {
// The paginator was busy. Don't interrupt it.
return Err(PaginatorError::InvalidPreviousState {
// Technically it's initial OR idle, but we don't really care here.
expected: PaginatorState::Idle,
actual: prev_state,
});
}
}
self.state.set_if_not_eq(next_state);
{
let mut tokens = self.tokens.lock().unwrap();
tokens.previous = prev_batch_token.into();
tokens.next = next_batch_token.into();
}
Ok(())
}
/// Returns the current previous batch token, as stored in this paginator.
pub(super) fn prev_batch_token(&self) -> Option<String> {
match &self.tokens.lock().unwrap().previous {
PaginationToken::HitEnd | PaginationToken::None => None,
PaginationToken::HasMore(token) => Some(token.clone()),
}
}
/// Starts the pagination from the initial event, requesting `num_events`
/// additional context events.
///
@@ -1050,6 +999,7 @@ mod tests {
mod aborts {
use super::*;
use crate::event_cache::{paginator::PaginationTokens, PaginationToken};
#[derive(Clone, Default)]
struct AbortingRoom {
@@ -1128,13 +1078,11 @@ mod tests {
// Assuming a paginator ready to back- or forward- paginate,
let paginator = Paginator::new(room.clone());
paginator
.set_idle_state(
PaginatorState::Idle,
Some("prev".to_owned()),
Some("next".to_owned()),
)
.unwrap();
paginator.state.set(PaginatorState::Idle);
*paginator.tokens.lock().unwrap() = PaginationTokens {
previous: PaginationToken::HasMore("prev".to_owned()),
next: PaginationToken::HasMore("next".to_owned()),
};
let paginator = Arc::new(paginator);