sdk-ui: use the moved PinnedEventCache instead of the one encapsulated in PinnedEventsLoader

This commit is contained in:
Jorge Martín
2024-08-02 13:17:40 +02:00
committed by Jorge Martin Espinosa
parent f1b20a8ea5
commit 674605aeab
3 changed files with 33 additions and 55 deletions

View File

@@ -150,6 +150,7 @@ impl TimelineBuilder {
let client = room.client();
let event_cache = client.event_cache();
let pinned_event_cache = Arc::new(client.pinned_event_cache().clone());
// Subscribe the event cache to sync responses, in case we hadn't done it yet.
event_cache.subscribe()?;
@@ -171,7 +172,7 @@ impl TimelineBuilder {
)
.with_settings(settings);
let has_events = inner.init_focus(&room_event_cache).await?;
let has_events = inner.init_focus(&room_event_cache, &pinned_event_cache).await?;
let room = inner.room();
let client = room.client();
@@ -180,9 +181,10 @@ impl TimelineBuilder {
let mut pinned_event_ids_stream = room.pinned_event_ids_stream();
Some(spawn({
let inner = inner.clone();
let cache = pinned_event_cache.clone();
async move {
while pinned_event_ids_stream.next().await.is_some() {
if let Ok(events) = inner.pinned_events_load_events().await {
if let Ok(events) = inner.pinned_events_load_events(&cache).await {
inner
.replace_with_initial_remote_events(
events,
@@ -267,7 +269,7 @@ impl TimelineBuilder {
// events, update the pinned events cache with them, reload the list of pinned event ids and reload
// the list of pinned events with this info.
if let TimelineFocus::PinnedEvents { .. } = &*focus.clone() {
if let Ok(events) = inner.pinned_events_load_events().await {
if let Ok(events) = inner.pinned_events_load_events(&pinned_event_cache).await {
inner.replace_with_initial_remote_events(events, RemoteEventOrigin::Sync).await;
}
} else {

View File

@@ -26,6 +26,7 @@ use matrix_sdk::crypto::OlmMachine;
use matrix_sdk::{
deserialized_responses::SyncTimelineEvent,
event_cache::{paginator::Paginator, RoomEventCache},
pinned_events_cache::PinnedEventCache,
send_queue::{LocalEcho, RoomSendQueueUpdate, SendHandle},
Result, Room,
};
@@ -282,6 +283,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
pub(super) async fn init_focus(
&self,
room_event_cache: &RoomEventCache,
pinned_event_cache: &PinnedEventCache,
) -> Result<bool, Error> {
let focus_guard = self.focus.read().await;
@@ -319,7 +321,10 @@ impl<P: RoomDataProvider> TimelineInner<P> {
}
TimelineFocusData::PinnedEvents { loader } => {
let loaded_events = loader.load_events().await.map_err(Error::PinnedEventsError)?;
let loaded_events = loader
.load_events(pinned_event_cache)
.await
.map_err(Error::PinnedEventsError)?;
drop(focus_guard);
@@ -338,11 +343,12 @@ impl<P: RoomDataProvider> TimelineInner<P> {
pub(crate) async fn pinned_events_load_events(
&self,
pinned_event_cache: &PinnedEventCache,
) -> Result<Vec<SyncTimelineEvent>, PinnedEventsLoaderError> {
let focus_guard = self.focus.read().await;
if let TimelineFocusData::PinnedEvents { loader } = &*focus_guard {
loader.load_events().await
loader.load_events(pinned_event_cache).await
} else {
Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
}

View File

@@ -1,31 +1,27 @@
use std::{collections::BTreeMap, fmt::Formatter, sync::Arc};
use std::{fmt::Formatter, sync::Arc};
use itertools::Itertools;
use matrix_sdk::{event_cache::paginator::PaginatorError, Room, SendOutsideWasm, SyncOutsideWasm};
use matrix_sdk::{
event_cache::paginator::PaginatorError, pinned_events_cache::PinnedEventCache, Room,
SendOutsideWasm, SyncOutsideWasm,
};
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId};
use thiserror::Error;
use tokio::sync::{RwLock, Semaphore};
use tokio::sync::Semaphore;
use tracing::info;
/// Utility to load the pinned events in a room.
pub struct PinnedEventsLoader {
room: Arc<Box<dyn PinnedEventsRoom>>,
max_events_to_load: usize,
max_concurrent_requests: usize,
cache: PinnedEventCache,
}
impl PinnedEventsLoader {
/// Creates a new `PinnedEventsLoader` instance.
pub fn new(room: Box<dyn PinnedEventsRoom>, max_events_to_load: usize) -> Self {
Self {
room: Arc::new(room),
max_events_to_load,
max_concurrent_requests: 10,
cache: PinnedEventCache {
inner: Arc::new(InnerPinnedEventCache { events: Default::default() }),
},
}
Self { room: Arc::new(room), max_events_to_load, max_concurrent_requests: 10 }
}
/// Loads the pinned events in this room, using the cache first and then
@@ -36,7 +32,10 @@ impl PinnedEventsLoader {
/// It returns a `Result` with either a
/// chronologically sorted list of retrieved `SyncTimelineEvent`s or a
/// `PinnedEventsLoaderError`.
pub async fn load_events(&self) -> Result<Vec<SyncTimelineEvent>, PinnedEventsLoaderError> {
pub async fn load_events(
&self,
cache: &PinnedEventCache,
) -> Result<Vec<SyncTimelineEvent>, PinnedEventsLoaderError> {
let pinned_event_ids: Vec<OwnedEventId> = self
.room
.pinned_event_ids()
@@ -49,9 +48,11 @@ impl PinnedEventsLoader {
let mut loaded_events = Vec::new();
let mut event_ids_to_request = Vec::new();
for ev_id in pinned_event_ids {
if let Some(ev) = self.cache.get(&ev_id).await {
if let Some(ev) = cache.get(&ev_id).await {
info!("Loading pinned event {ev_id} from cache");
loaded_events.push(ev.clone());
} else {
info!("Loading pinned event {ev_id} from HS");
event_ids_to_request.push(ev_id);
}
}
@@ -87,7 +88,8 @@ impl PinnedEventsLoader {
}
}
self.cache.set_bulk(&loaded_events).await;
info!("Saving {} pinned events to the cache", loaded_events.len());
cache.set_bulk(&loaded_events).await;
fn timestamp(item: &SyncTimelineEvent) -> MilliSecondsSinceUnixEpoch {
item.event
@@ -113,6 +115,7 @@ impl PinnedEventsLoader {
pub async fn update_if_needed(
&self,
events: Vec<impl Into<SyncTimelineEvent>>,
cache: &PinnedEventCache,
) -> Option<Vec<SyncTimelineEvent>> {
let mut to_update = Vec::new();
for ev in events {
@@ -125,8 +128,8 @@ impl PinnedEventsLoader {
}
if !to_update.is_empty() {
self.cache.set_bulk(&to_update).await;
self.load_events().await.ok()
cache.set_bulk(&to_update).await;
self.load_events(cache).await.ok()
} else {
None
}
@@ -189,36 +192,3 @@ pub enum PinnedEventsLoaderError {
#[error("Timeline focus is not pinned events.")]
TimelineFocusNotPinnedEvents,
}
/// Cache used to store the events associated with pinned event ids.
///
/// Cloning is shallow, and thus is cheap to do.
#[derive(Clone)]
pub(crate) struct PinnedEventCache {
inner: Arc<InnerPinnedEventCache>,
}
impl PinnedEventCache {
/// Gets the event associated with the provided event id, if it exists in
/// the cache.
pub(crate) async fn get(&self, event_id: &EventId) -> Option<SyncTimelineEvent> {
let cache = self.inner.events.read().await;
cache.get(event_id).cloned()
}
/// Adds a list of pinned events to the cache in a performant way.
pub(crate) async fn set_bulk(&self, events: &Vec<SyncTimelineEvent>) {
let mut cache = self.inner.events.write().await;
for ev in events {
if let Some(ev_id) = ev.event_id() {
cache.insert(ev_id.to_owned(), ev.clone());
}
}
}
}
/// The non-cloneable implementation of the cache.
struct InnerPinnedEventCache {
/// The pinned events of the room.
events: RwLock<BTreeMap<OwnedEventId, SyncTimelineEvent>>,
}