mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-10 08:53:00 -04:00
refactor(timeline): don't reload a pinned event timeline that hasn't changed since the previous time
This commit is contained in:
@@ -326,8 +326,9 @@ where
|
||||
|
||||
while pinned_event_ids_stream.next().await.is_some() {
|
||||
trace!("received a pinned events update");
|
||||
|
||||
match timeline_controller.reload_pinned_events().await {
|
||||
Ok(events) => {
|
||||
Ok(Some(events)) => {
|
||||
trace!("successfully reloaded pinned events");
|
||||
timeline_controller
|
||||
.replace_with_initial_remote_events(
|
||||
@@ -336,6 +337,12 @@ where
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(None) => {
|
||||
// The list of pinned events hasn't changed since the previous
|
||||
// time.
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
warn!("Failed to reload pinned events: {err}");
|
||||
}
|
||||
|
||||
@@ -397,7 +397,12 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
|
||||
}
|
||||
|
||||
TimelineFocusData::PinnedEvents { loader } => {
|
||||
let loaded_events = loader.load_events().await.map_err(Error::PinnedEventsError)?;
|
||||
let Some(loaded_events) =
|
||||
loader.load_events().await.map_err(Error::PinnedEventsError)?
|
||||
else {
|
||||
// There wasn't any events.
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
drop(focus_guard);
|
||||
|
||||
@@ -447,7 +452,7 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
|
||||
|
||||
pub(crate) async fn reload_pinned_events(
|
||||
&self,
|
||||
) -> Result<Vec<TimelineEvent>, PinnedEventsLoaderError> {
|
||||
) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
|
||||
let focus_guard = self.focus.read().await;
|
||||
|
||||
if let TimelineFocusData::PinnedEvents { loader } = &*focus_guard {
|
||||
|
||||
@@ -22,6 +22,7 @@ use matrix_sdk::{
|
||||
use matrix_sdk_base::deserialized_responses::TimelineEvent;
|
||||
use ruma::{events::relation::RelationType, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId};
|
||||
use thiserror::Error;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
/// Utility to load the pinned events in a room.
|
||||
@@ -29,6 +30,12 @@ pub struct PinnedEventsLoader {
|
||||
/// Backend to load pinned events.
|
||||
room: Arc<dyn PinnedEventsRoom>,
|
||||
|
||||
/// A list of pinned event ids we've observed previously.
|
||||
///
|
||||
/// Starts as an empty vector and is updated when the list of pinned events
|
||||
/// is updated.
|
||||
previous_pinned_event_ids: Mutex<Vec<OwnedEventId>>,
|
||||
|
||||
/// Maximum number of pinned events to load (either from network or the
|
||||
/// cache).
|
||||
max_events_to_load: usize,
|
||||
@@ -46,7 +53,12 @@ impl PinnedEventsLoader {
|
||||
max_events_to_load: usize,
|
||||
max_concurrent_requests: usize,
|
||||
) -> Self {
|
||||
Self { room, max_events_to_load, max_concurrent_requests }
|
||||
Self {
|
||||
room,
|
||||
max_events_to_load,
|
||||
max_concurrent_requests,
|
||||
previous_pinned_event_ids: Mutex::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Loads the pinned events in this room, using the cache first and then
|
||||
@@ -54,10 +66,10 @@ impl PinnedEventsLoader {
|
||||
/// This method will perform as many concurrent requests for events as
|
||||
/// `max_concurrent_requests` allows, to avoid overwhelming the server.
|
||||
///
|
||||
/// It returns a `Result` with either a
|
||||
/// chronologically sorted list of retrieved [`TimelineEvent`]s
|
||||
/// or a [`PinnedEventsLoaderError`].
|
||||
pub async fn load_events(&self) -> Result<Vec<TimelineEvent>, PinnedEventsLoaderError> {
|
||||
/// Returns `None` if the list of pinned events hasn't changed since the
|
||||
/// previous time we loaded them. May return an error if there was an
|
||||
/// issue fetching the full events.
|
||||
pub async fn load_events(&self) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
|
||||
let pinned_event_ids: Vec<OwnedEventId> = self
|
||||
.room
|
||||
.pinned_event_ids()
|
||||
@@ -68,14 +80,20 @@ impl PinnedEventsLoader {
|
||||
.rev()
|
||||
.collect();
|
||||
|
||||
// Check if the list of pinned events has changed since the last time.
|
||||
if pinned_event_ids == *self.previous_pinned_event_ids.lock().await {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if pinned_event_ids.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
*self.previous_pinned_event_ids.lock().await = Vec::new();
|
||||
return Ok(Some(Vec::new()));
|
||||
}
|
||||
|
||||
let request_config = Some(RequestConfig::default().retry_limit(3));
|
||||
|
||||
let mut loaded_events: Vec<TimelineEvent> =
|
||||
stream::iter(pinned_event_ids.into_iter().map(|event_id| {
|
||||
stream::iter(pinned_event_ids.clone().into_iter().map(|event_id| {
|
||||
let provider = self.room.clone();
|
||||
let relations_filter =
|
||||
Some(vec![RelationType::Annotation, RelationType::Replacement]);
|
||||
@@ -116,7 +134,13 @@ impl PinnedEventsLoader {
|
||||
.unwrap_or_else(|_| MilliSecondsSinceUnixEpoch::now())
|
||||
});
|
||||
|
||||
Ok(loaded_events)
|
||||
tracing::warn!("loaded pinned events: {:#?}", loaded_events);
|
||||
|
||||
// We've successfully loaded *some* pinned events, so we can update the list of
|
||||
// previously seen pinned events.
|
||||
*self.previous_pinned_event_ids.lock().await = pinned_event_ids;
|
||||
|
||||
Ok(Some(loaded_events))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -99,26 +99,8 @@ async fn test_new_pinned_events_are_not_added_on_sync() {
|
||||
.await
|
||||
.expect("Room should be synced");
|
||||
|
||||
// If the test runs fast, we receive 1 update, then 2 updates. If the test runs
|
||||
// slow, we receive 3 updates directly. Let's solve this flakiness with a
|
||||
// `sleep`.
|
||||
sleep(Duration::from_millis(500)).await;
|
||||
|
||||
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
|
||||
assert_eq!(timeline_updates.len(), 3);
|
||||
|
||||
// The list is reloaded, so it's reset
|
||||
assert_let!(VectorDiff::Clear = &timeline_updates[0]);
|
||||
|
||||
// The loaded list items are added
|
||||
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[1]);
|
||||
assert_eq!(value.as_event().unwrap().event_id().unwrap(), event_id!("$1"));
|
||||
|
||||
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[2]);
|
||||
assert!(value.is_date_divider());
|
||||
|
||||
// Event $2 was received through sync, but it wasn't added to the pinned event
|
||||
// timeline
|
||||
// timeline.
|
||||
assert_pending!(timeline_stream);
|
||||
}
|
||||
|
||||
@@ -516,20 +498,6 @@ async fn test_edited_events_are_not_reflected_in_sync() {
|
||||
.await
|
||||
.expect("Sync failed");
|
||||
|
||||
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
|
||||
assert_eq!(timeline_updates.len(), 3);
|
||||
|
||||
// The list is reloaded, so it's reset
|
||||
assert_let!(VectorDiff::Clear = &timeline_updates[0]);
|
||||
|
||||
// Then the loaded list items are added
|
||||
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[1]);
|
||||
let event = value.as_event().unwrap();
|
||||
assert_eq!(event.event_id().unwrap(), event_id!("$1"));
|
||||
|
||||
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[2]);
|
||||
assert!(value.is_date_divider());
|
||||
|
||||
// The edit does not replace the original event
|
||||
assert_pending!(timeline_stream);
|
||||
}
|
||||
@@ -589,20 +557,6 @@ async fn test_redacted_events_are_not_reflected_in_sync() {
|
||||
.await
|
||||
.expect("Sync failed");
|
||||
|
||||
assert_let!(Some(timeline_updates) = timeline_stream.next().await);
|
||||
assert_eq!(timeline_updates.len(), 3);
|
||||
|
||||
// The list is reloaded, so it's reset
|
||||
assert_let!(VectorDiff::Clear = &timeline_updates[0]);
|
||||
|
||||
// Then the loaded list items are added
|
||||
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[1]);
|
||||
let event = value.as_event().unwrap();
|
||||
assert_eq!(event.event_id().unwrap(), event_id!("$1"));
|
||||
|
||||
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[2]);
|
||||
assert!(value.is_date_divider());
|
||||
|
||||
// The redaction does not replace the original event
|
||||
assert_pending!(timeline_stream);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user