mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-11 01:13:14 -04:00
refactor(event cache): don't make the store optional in the event cache
This commit is contained in:
@@ -392,7 +392,12 @@ impl ClientInner {
|
||||
|
||||
let _ = client
|
||||
.event_cache
|
||||
.get_or_init(|| async { EventCache::new(WeakClient::from_inner(&client)) })
|
||||
.get_or_init(|| async {
|
||||
EventCache::new(
|
||||
WeakClient::from_inner(&client),
|
||||
client.base_client.event_cache_store().clone(),
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
client
|
||||
|
||||
@@ -44,7 +44,6 @@ use matrix_sdk_base::{
|
||||
sync::RoomUpdates,
|
||||
};
|
||||
use matrix_sdk_common::executor::{spawn, JoinHandle};
|
||||
use once_cell::sync::OnceCell;
|
||||
use room::RoomEventCacheState;
|
||||
use ruma::{
|
||||
events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
|
||||
@@ -155,11 +154,11 @@ impl Debug for EventCache {
|
||||
|
||||
impl EventCache {
|
||||
/// Create a new [`EventCache`] for the given client.
|
||||
pub(crate) fn new(client: WeakClient) -> Self {
|
||||
pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(EventCacheInner {
|
||||
client,
|
||||
store: Default::default(),
|
||||
store: event_cache_store,
|
||||
multiple_room_updates_lock: Default::default(),
|
||||
by_room: Default::default(),
|
||||
drop_handles: Default::default(),
|
||||
@@ -176,12 +175,6 @@ impl EventCache {
|
||||
pub fn subscribe(&self) -> Result<()> {
|
||||
let client = self.inner.client()?;
|
||||
|
||||
// Initialize storage.
|
||||
let _ = self.inner.store.get_or_try_init::<_, EventCacheError>(|| {
|
||||
let client = self.inner.client()?;
|
||||
Ok(client.event_cache_store().clone())
|
||||
})?;
|
||||
|
||||
// Initialize the drop handles.
|
||||
let _ = self.inner.drop_handles.get_or_init(|| {
|
||||
// Spawn the task that will listen to all the room updates at once.
|
||||
@@ -366,10 +359,7 @@ struct EventCacheInner {
|
||||
client: WeakClient,
|
||||
|
||||
/// Reference to the underlying store.
|
||||
///
|
||||
/// Set to none if we shouldn't use storage for reading / writing linked
|
||||
/// chunks.
|
||||
store: Arc<OnceCell<EventCacheStoreLock>>,
|
||||
store: EventCacheStoreLock,
|
||||
|
||||
/// A lock used when many rooms must be updated at once.
|
||||
///
|
||||
@@ -452,10 +442,7 @@ impl EventCacheInner {
|
||||
.await;
|
||||
|
||||
// Clear the storage for all the rooms, using the storage facility.
|
||||
if let Some(store) = self.store.get() {
|
||||
let store_guard = store.lock().await?;
|
||||
store_guard.clear_all_rooms_chunks().await?;
|
||||
}
|
||||
self.store.lock().await?.clear_all_rooms_chunks().await?;
|
||||
|
||||
// At this point, all the in-memory linked chunks are desynchronized from the
|
||||
// storage. Resynchronize them manually by calling reset(), and
|
||||
|
||||
@@ -537,7 +537,6 @@ mod private {
|
||||
linked_chunk::{lazy_loader, ChunkContent, ChunkIdentifierGenerator, Position, Update},
|
||||
};
|
||||
use matrix_sdk_common::executor::spawn;
|
||||
use once_cell::sync::OnceCell;
|
||||
use ruma::{
|
||||
events::{
|
||||
relation::RelationType, room::redaction::SyncRoomRedactionEvent, AnySyncTimelineEvent,
|
||||
@@ -570,10 +569,7 @@ mod private {
|
||||
room_version: RoomVersionId,
|
||||
|
||||
/// Reference to the underlying backing store.
|
||||
///
|
||||
/// Set to none if the room shouldn't read the linked chunk from
|
||||
/// storage, and shouldn't store updates to storage.
|
||||
store: Arc<OnceCell<EventCacheStoreLock>>,
|
||||
store: EventCacheStoreLock,
|
||||
|
||||
/// The events of the room.
|
||||
events: RoomEvents,
|
||||
@@ -607,43 +603,35 @@ mod private {
|
||||
pub async fn new(
|
||||
room_id: OwnedRoomId,
|
||||
room_version: RoomVersionId,
|
||||
store: Arc<OnceCell<EventCacheStoreLock>>,
|
||||
store: EventCacheStoreLock,
|
||||
pagination_status: SharedObservable<RoomPaginationStatus>,
|
||||
) -> Result<Self, EventCacheError> {
|
||||
let (events, deduplicator) = if let Some(store) = store.get() {
|
||||
let store_lock = store.lock().await?;
|
||||
let store_lock = store.lock().await?;
|
||||
|
||||
let linked_chunk = match store_lock
|
||||
.load_last_chunk(&room_id)
|
||||
.await
|
||||
.map_err(EventCacheError::from)
|
||||
.and_then(|(last_chunk, chunk_identifier_generator)| {
|
||||
lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
|
||||
.map_err(EventCacheError::from)
|
||||
}) {
|
||||
Ok(linked_chunk) => linked_chunk,
|
||||
let linked_chunk = match store_lock
|
||||
.load_last_chunk(&room_id)
|
||||
.await
|
||||
.map_err(EventCacheError::from)
|
||||
.and_then(|(last_chunk, chunk_identifier_generator)| {
|
||||
lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
|
||||
.map_err(EventCacheError::from)
|
||||
}) {
|
||||
Ok(linked_chunk) => linked_chunk,
|
||||
|
||||
Err(err) => {
|
||||
error!("error when reloading a linked chunk from memory: {err}");
|
||||
Err(err) => {
|
||||
error!("error when reloading a linked chunk from memory: {err}");
|
||||
|
||||
// Clear storage for this room.
|
||||
store_lock
|
||||
.handle_linked_chunk_updates(&room_id, vec![Update::Clear])
|
||||
.await?;
|
||||
// Clear storage for this room.
|
||||
store_lock.handle_linked_chunk_updates(&room_id, vec![Update::Clear]).await?;
|
||||
|
||||
// Restart with an empty linked chunk.
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
(
|
||||
RoomEvents::with_initial_linked_chunk(linked_chunk),
|
||||
Deduplicator::new_store_based(room_id.clone(), store.clone()),
|
||||
)
|
||||
} else {
|
||||
(RoomEvents::default(), Deduplicator::new_memory_based())
|
||||
// Restart with an empty linked chunk.
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let events = RoomEvents::with_initial_linked_chunk(linked_chunk);
|
||||
let deduplicator = Deduplicator::new_store_based(room_id.clone(), store.clone());
|
||||
|
||||
Ok(Self {
|
||||
room: room_id,
|
||||
room_version,
|
||||
@@ -727,22 +715,6 @@ mod private {
|
||||
pub(in super::super) async fn load_more_events_backwards(
|
||||
&mut self,
|
||||
) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
|
||||
let Some(store) = self.store.get() else {
|
||||
// No store to reload events from. Pretend the caller has to act as if a gap was
|
||||
// present. Limited syncs will always clear and push a gap, in this mode.
|
||||
// There's no lazy-loading.
|
||||
|
||||
// Look for a gap in the in-memory chunk, iterating in reverse so as to get the
|
||||
// most recent one.
|
||||
if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) {
|
||||
return Ok(LoadMoreEventsBackwardsOutcome::Gap {
|
||||
prev_token: Some(prev_token),
|
||||
});
|
||||
}
|
||||
|
||||
return Ok(self.conclude_load_more_for_fully_loaded_chunk());
|
||||
};
|
||||
|
||||
// If any in-memory chunk is a gap, don't load more events, and let the caller
|
||||
// resolve the gap.
|
||||
if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) {
|
||||
@@ -754,7 +726,7 @@ mod private {
|
||||
let first_chunk_identifier =
|
||||
self.events.chunks().next().expect("a linked chunk is never empty").identifier();
|
||||
|
||||
let store = store.lock().await?;
|
||||
let store = self.store.lock().await?;
|
||||
|
||||
// The first chunk is not a gap, we can load its previous chunk.
|
||||
let new_first_chunk =
|
||||
@@ -835,13 +807,7 @@ mod private {
|
||||
pub(super) async fn shrink_to_last_chunk(
|
||||
&mut self,
|
||||
) -> Result<Option<Vec<VectorDiff<TimelineEvent>>>, EventCacheError> {
|
||||
let Some(store) = self.store.get() else {
|
||||
// No need to do anything if there's no storage; we'll already reset the
|
||||
// timeline after a limited response.
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let store_lock = store.lock().await?;
|
||||
let store_lock = self.store.lock().await?;
|
||||
|
||||
// Attempt to load the last chunk.
|
||||
let (last_chunk, chunk_identifier_generator) = match store_lock
|
||||
@@ -1014,10 +980,6 @@ mod private {
|
||||
&mut self,
|
||||
mut updates: Vec<Update<TimelineEvent, Gap>>,
|
||||
) -> Result<(), EventCacheError> {
|
||||
let Some(store) = self.store.get() else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if updates.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
@@ -1045,7 +1007,7 @@ mod private {
|
||||
// The store cross-process locking involves an actual mutex, which ensures that
|
||||
// storing updates happens in the expected order.
|
||||
|
||||
let store = store.clone();
|
||||
let store = self.store.clone();
|
||||
let room_id = self.room.clone();
|
||||
|
||||
spawn(async move {
|
||||
@@ -1111,12 +1073,7 @@ mod private {
|
||||
}
|
||||
}
|
||||
|
||||
let Some(store) = self.store.get() else {
|
||||
// No store, event is not present.
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let store = store.lock().await?;
|
||||
let store = self.store.lock().await?;
|
||||
|
||||
Ok(store
|
||||
.find_event(&self.room, event_id)
|
||||
@@ -1134,12 +1091,7 @@ mod private {
|
||||
event_id: &EventId,
|
||||
filters: Option<Vec<RelationType>>,
|
||||
) -> Result<Option<(TimelineEvent, Vec<TimelineEvent>)>, EventCacheError> {
|
||||
let Some(store) = self.store.get() else {
|
||||
// No store, event is not present.
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let store = store.lock().await?;
|
||||
let store = self.store.lock().await?;
|
||||
|
||||
// First, hit storage to get the target event and its related events.
|
||||
let found = store.find_event(&self.room, event_id).await?;
|
||||
@@ -1315,9 +1267,7 @@ mod private {
|
||||
&self,
|
||||
events: impl IntoIterator<Item = TimelineEvent>,
|
||||
) -> Result<(), EventCacheError> {
|
||||
let Some(store) = self.store.get() else { return Ok(()) };
|
||||
|
||||
let store = store.clone();
|
||||
let store = self.store.clone();
|
||||
let room_id = self.room.clone();
|
||||
let events = events.into_iter().collect::<Vec<_>>();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user