From 54729ce32bcf8fae27795e2a7930cf309ad2cb1d Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 Mar 2024 12:57:45 +0100 Subject: [PATCH] feat(sdk): Start disabling the global store in `EventCache`. --- crates/matrix-sdk/src/event_cache/mod.rs | 229 ++++++++++++----------- 1 file changed, 124 insertions(+), 105 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 1087bdd48..d1162c1e6 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -68,10 +68,11 @@ use tokio::{ }; use tracing::{error, instrument, trace, warn}; -use self::store::{EventCacheStore, MemoryStore, TimelineEntry}; -use crate::{ - client::ClientInner, event_cache::store::PaginationToken, room::MessagesOptions, Client, Room, +use self::{ + linked_chunk::ChunkContent, + store::{EventCacheStore, Gap, MemoryStore, PaginationToken, RoomEvents, TimelineEntry}, }; +use crate::{client::ClientInner, room::MessagesOptions, Client, Room}; mod linked_chunk; mod store; @@ -149,15 +150,14 @@ impl Debug for EventCache { impl EventCache { /// Create a new [`EventCache`] for the given client. pub(crate) fn new(client: &Arc) -> Self { - let store = Arc::new(MemoryStore::new()); - let inner = Arc::new(EventCacheInner { - client: Arc::downgrade(client), - by_room: Default::default(), - store: Arc::new(Mutex::new(store)), - drop_handles: Default::default(), - }); - - Self { inner } + Self { + inner: Arc::new(EventCacheInner { + client: Arc::downgrade(client), + multiple_room_updates_lock: Default::default(), + by_room: Default::default(), + drop_handles: Default::default(), + }), + } } /// Starts subscribing the [`EventCache`] to sync responses, if not done @@ -205,6 +205,8 @@ impl EventCache { // Forget everything we know; we could have missed events, and we have // no way to reconcile at the moment! // TODO: implement Smart Matching™, + todo!(); + /* let store = inner.store.lock().await; let mut by_room = inner.by_room.write().await; for room_id in by_room.keys() { @@ -213,6 +215,7 @@ impl EventCache { } } by_room.clear(); + */ } Err(RecvError::Closed) => { @@ -256,15 +259,17 @@ impl EventCache { // We could have received events during a previous sync; remove them all, since // we can't know where to insert the "initial events" with respect to // them. + todo!(); + /* let store = self.inner.store.lock().await; store.clear_room(room_id).await?; + */ let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::Clear); room_cache .inner .append_events( - &**store, events, prev_batch, Default::default(), @@ -282,20 +287,14 @@ struct EventCacheInner { /// on the owning client. client: Weak, + multiple_room_updates_lock: Mutex<()>, + /// Lazily-filled cache of live [`RoomEventCache`], once per room. by_room: RwLock>, - /// Backend used for storage. - /// - /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to - /// ensure that multiple updates will be applied in the correct order, which - /// is enforced by taking the store lock when handling an update. - /// - /// TODO: replace with a cross-process lock - store: Arc>>, - /// Handles to keep alive the task listening to updates. drop_handles: OnceLock>, + // TODO: that's the place to add a cross-process lock! } impl EventCacheInner { @@ -308,7 +307,7 @@ impl EventCacheInner { async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> { // First, take the lock that indicates we're processing updates, to avoid // handling multiple updates concurrently. - let store = self.store.lock().await; + let lock = self.multiple_room_updates_lock.lock().await; // Left rooms. for (room_id, left_room_update) in updates.leave { @@ -317,7 +316,7 @@ impl EventCacheInner { continue; }; - if let Err(err) = room.inner.handle_left_room_update(&**store, left_room_update).await { + if let Err(err) = room.inner.handle_left_room_update(left_room_update).await { // Non-fatal error, try to continue to the next room. error!("handling left room update: {err}"); } @@ -330,9 +329,7 @@ impl EventCacheInner { continue; }; - if let Err(err) = - room.inner.handle_joined_room_update(&**store, joined_room_update).await - { + if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await { // Non-fatal error, try to continue to the next room. error!("handling joined room update: {err}"); } @@ -371,7 +368,7 @@ impl EventCacheInner { return Ok(None); }; - let room_event_cache = RoomEventCache::new(room, self.store.clone()); + let room_event_cache = RoomEventCache::new(room); by_room_guard.insert(room_id.to_owned(), room_event_cache.clone()); @@ -397,8 +394,8 @@ impl Debug for RoomEventCache { impl RoomEventCache { /// Create a new [`RoomEventCache`] using the given room and store. - fn new(room: Room, store: Arc>>) -> Self { - Self { inner: Arc::new(RoomEventCacheInner::new(room, store)) } + fn new(room: Room) -> Self { + Self { inner: Arc::new(RoomEventCacheInner::new(room)) } } /// Subscribe to room updates for this room, after getting the initial list @@ -408,9 +405,12 @@ impl RoomEventCache { pub async fn subscribe( &self, ) -> Result<(Vec, Receiver)> { + /* let store = self.inner.store.lock().await; Ok((store.room_events(self.inner.room.room_id()).await?, self.inner.sender.subscribe())) + */ + todo!() } /// Returns the oldest back-pagination token, that is, the one closest to @@ -447,14 +447,12 @@ struct RoomEventCacheInner { /// Sender part for subscribers to this room. sender: Sender, - /// Backend used for storage, shared with the parent [`EventCacheInner`]. - /// - /// See comment there. - store: Arc>>, - /// The Client [`Room`] this event cache pertains to. room: Room, + /// The events of the room. + events: RwLock, + /// A notifier that we received a new pagination token. pagination_token_notifier: Notify, @@ -466,24 +464,20 @@ struct RoomEventCacheInner { impl RoomEventCacheInner { /// Creates a new cache for a room, and subscribes to room updates, so as /// to handle new timeline events. - fn new(room: Room, store: Arc>>) -> Self { + fn new(room: Room) -> Self { let sender = Sender::new(32); + Self { room, - store, + events: RwLock::new(RoomEvents::default()), sender, pagination_lock: Default::default(), pagination_token_notifier: Default::default(), } } - async fn handle_joined_room_update( - &self, - store: &dyn EventCacheStore, - updates: JoinedRoomUpdate, - ) -> Result<()> { + async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> { self.handle_timeline( - store, updates.timeline, updates.ephemeral.clone(), updates.account_data, @@ -495,7 +489,6 @@ impl RoomEventCacheInner { async fn handle_timeline( &self, - store: &dyn EventCacheStore, timeline: Timeline, ephemeral: Vec>, account_data: Vec>, @@ -508,7 +501,7 @@ impl RoomEventCacheInner { trace!("limited timeline, clearing all previous events"); // Clear internal state (events, pagination tokens, etc.). - store.clear_room(self.room.room_id()).await?; + self.events.write().await.reset(); // Propagate to observers. let _ = self.sender.send(RoomEventCacheUpdate::Clear); @@ -517,7 +510,6 @@ impl RoomEventCacheInner { // Add all the events to the backend. trace!("adding new events"); self.append_events( - store, timeline.events, timeline.prev_batch, account_data, @@ -529,19 +521,9 @@ impl RoomEventCacheInner { Ok(()) } - async fn handle_left_room_update( - &self, - store: &dyn EventCacheStore, - updates: LeftRoomUpdate, - ) -> Result<()> { - self.handle_timeline( - store, - updates.timeline, - Vec::new(), - Vec::new(), - updates.ambiguity_changes, - ) - .await?; + async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> { + self.handle_timeline(updates.timeline, Vec::new(), Vec::new(), updates.ambiguity_changes) + .await?; Ok(()) } @@ -549,7 +531,6 @@ impl RoomEventCacheInner { /// observers. async fn append_events( &self, - store: &dyn EventCacheStore, events: Vec, prev_batch: Option, account_data: Vec>, @@ -565,21 +546,17 @@ impl RoomEventCacheInner { return Ok(()); } - let room_id = self.room.room_id(); - // Add the previous back-pagination token (if present), followed by the timeline // events themselves. - let gap_with_token = prev_batch - .clone() - .map(|val| TimelineEntry::Gap { prev_token: PaginationToken(val) }) - .into_iter(); + { + let mut room_events = self.events.write().await; - store - .append_room_entries( - room_id, - gap_with_token.chain(events.iter().cloned().map(TimelineEntry::Event)).collect(), - ) - .await?; + if let Some(prev_token) = &prev_batch { + room_events.push_gap(Gap { prev_token: PaginationToken(prev_token.clone()) }); + } + + room_events.push_events(events.clone().into_iter()); + } if prev_batch.is_some() { self.pagination_token_notifier.notify_one(); @@ -610,13 +587,27 @@ impl RoomEventCacheInner { // Make sure there's at most one back-pagination request. let _guard = self.pagination_lock.lock().await; - if let Some(token) = token.as_ref() { - let store = self.store.lock().await; - if !store.contains_gap(self.room.room_id(), token).await? { + // Make sure the `RoomEvents` isn't updated while we are back-paginating. + let mut room_events = self.events.write().await; + + // Check that the `token` exists if any. + let gap_identifier = if let Some(token) = token.as_ref() { + let gap_identifier = room_events.chunk_identifier(|chunk| { + matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if prev_token == token) + }); + + // The method has been called with `token` but it doesn't exist in `RoomEvents`, + // it's an error. + if gap_identifier.is_none() { return Err(EventCacheError::UnknownBackpaginationToken); } - } + gap_identifier + } else { + None + }; + + // Get messages. let messages = self .room .messages(assign!(MessagesOptions::backward(), { @@ -629,7 +620,8 @@ impl RoomEventCacheInner { // Would we want to backpaginate again, we'd start from the `end` token as the // next `from` token. - let prev_token = messages.end; + let prev_token = + messages.end.map(|prev_token| Gap { prev_token: PaginationToken(prev_token) }); // If this token is missing, then we've reached the end of the timeline. let reached_start = prev_token.is_none(); @@ -640,41 +632,64 @@ impl RoomEventCacheInner { // should be prepended first). let events = messages.chunk; - // Prepend the previous token (if any) at the beginning of the timeline, - // followed by the events received in the response (in reverse order). - let new_gap = prev_token - .map(|token| TimelineEntry::Gap { prev_token: PaginationToken(token) }) - .into_iter(); + let sync_events = events + .iter() + // Reverse the order of the events as `/messages` has been called with `dir=b` + // (backward). The `RoomEvents` API expects the first event to be the oldest. + .rev() + .cloned() + .map(|timeline_event| SyncTimelineEvent::from(timeline_event)); - // For storage, reverse events to store them in the normal (non-reversed order). - // - // It's fine to convert from `TimelineEvent` (i.e. that has a room id) to - // `SyncTimelineEvent` (i.e. that doesn't have it), because those events are - // always tied to a room in storage anyways. - let new_events = events.iter().rev().map(|ev| TimelineEntry::Event(ev.clone().into())); + // There is a `token`/gap, let's replace it by new events! + if let Some(gap_identifier) = gap_identifier { + // Replace the gap by new events. + room_events + .replace_gap_at(sync_events, gap_identifier) + // SAFETY: we are sure that `gap_identifier` represents a valid `ChunkIdentifier` + // for a gap. + .unwrap(); - let replaced = self - .store - .lock() - .await - .replace_gap(self.room.room_id(), token.as_ref(), new_gap.chain(new_events).collect()) - .await?; + // And insert a new gap if there is any `prev_token`. + if let Some(prev_token_gap) = prev_token { + room_events + .insert_gap_at(prev_token_gap, gap_identifier.into()) + // SAFETY: we are sure that `gap_identifier` represents a valid + // `ChunkIdentifier` for a gap. + .unwrap(); + } - if !replaced { - // The previous token disappeared! - // This can happen if we got a limited timeline and lost track of our pagination - // token, because the whole timeline has been reset. - // - // TODO: With smarter reconciliation, this might get away. In the meanwhile, - // early return and forget about all the events. - trace!("gap was missing, likely because we observed a gappy sync response"); - Ok(BackPaginationOutcome::UnknownBackpaginationToken) - } else { trace!("replaced gap with new events from backpagination"); // TODO: implement smarter reconciliation later //let _ = self.sender.send(RoomEventCacheUpdate::Prepend { events }); + Ok(BackPaginationOutcome::Success { events, reached_start }) + } + // There is no `token`/gap identifier. Let's assume we must prepend the new events. + else { + let first_item_position = + room_events.events().nth(0).map(|(item_position, _)| item_position); + + match first_item_position { + // Is there a first item? Insert at this position. + Some(item_position) => { + if let Some(prev_token_gap) = prev_token { + room_events.insert_gap_at(prev_token_gap, item_position).unwrap(); + } + + room_events.insert_events_at(sync_events, item_position).unwrap(); + } + + // There is no first item. Let's simply push. + None => { + if let Some(prev_token_gap) = prev_token { + room_events.push_gap(prev_token_gap); + } + + room_events.push_events(sync_events); + } + } + Ok(BackPaginationOutcome::Success { events, reached_start }) } } @@ -689,6 +704,8 @@ impl RoomEventCacheInner { max_wait: Option, ) -> Result> { // Optimistically try to return the backpagination token immediately. + todo!(); + /* if let Some(token) = self.store.lock().await.oldest_backpagination_token(self.room.room_id()).await? { @@ -705,6 +722,7 @@ impl RoomEventCacheInner { let _ = timeout(max_wait, self.pagination_token_notifier.notified()).await; self.store.lock().await.oldest_backpagination_token(self.room.room_id()).await + */ } } @@ -758,7 +776,6 @@ pub enum RoomEventCacheUpdate { #[cfg(test)] mod tests { - use assert_matches2::assert_matches; use matrix_sdk_common::executor::spawn; use matrix_sdk_test::{async_test, sync_timeline_event}; @@ -803,6 +820,7 @@ mod tests { assert_matches!(res.unwrap_err(), EventCacheError::UnknownBackpaginationToken); } + /* // Those tests require time to work, and it does not on wasm32. #[cfg(not(target_arch = "wasm32"))] mod time_tests { @@ -1003,4 +1021,5 @@ mod tests { insert_token_task.await.unwrap(); } } + */ }