From 36e199c31eb759a89284f4e2fa2583943a541c04 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 Mar 2024 12:22:58 +0100 Subject: [PATCH 01/20] feat(sdk): Implement `RoomEvents::reset`, `push_gap`, `replace_gap_at` and `events`. This patch implements the following wrapper methods (over `LinkedChunk`): `push_gap`, `replace_gap_at` and `events`. This patch also implements the `reset` method that clears/drops all chunks in the `LinkedChunk`. --- crates/matrix-sdk/src/event_cache/store.rs | 33 ++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/crates/matrix-sdk/src/event_cache/store.rs b/crates/matrix-sdk/src/event_cache/store.rs index f5d12ae97..998b2b655 100644 --- a/crates/matrix-sdk/src/event_cache/store.rs +++ b/crates/matrix-sdk/src/event_cache/store.rs @@ -230,6 +230,11 @@ impl RoomEvents { Self { chunks: LinkedChunk::new() } } + /// Clear all events. + pub fn reset(&mut self) { + self.chunks = LinkedChunk::new(); + } + /// Return the number of events. pub fn len(&self) -> usize { self.chunks.len() @@ -251,6 +256,11 @@ impl RoomEvents { self.chunks.push_items_back(events) } + /// Push a gap after existing events. + pub fn push_gap(&mut self, gap: Gap) { + self.chunks.push_gap_back(gap) + } + /// Insert events at a specified position. pub fn insert_events_at( &mut self, @@ -273,6 +283,22 @@ impl RoomEvents { self.chunks.insert_gap_at(gap, position) } + /// Replace the gap identified by `gap_identifier`, by events. + /// + /// Because the `gap_identifier` can represent non-gap chunk, this method + /// returns a `Result`. + pub fn replace_gap_at( + &mut self, + items: I, + gap_identifier: ChunkIdentifier, + ) -> StdResult<(), LinkedChunkError> + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + { + self.chunks.replace_gap_at(items, gap_identifier) + } + /// Search for a chunk, and return its identifier. pub fn chunk_identifier<'a, P>(&'a self, predicate: P) -> Option where @@ -328,6 +354,13 @@ impl RoomEvents { self.chunks.ritems() } + /// Iterate over the events, forward. + /// + /// The oldest event comes first. + pub fn events(&self) -> impl Iterator { + self.chunks.items() + } + /// Iterate over the events, starting from `position`, backward. pub fn revents_from( &self, From 54729ce32bcf8fae27795e2a7930cf309ad2cb1d Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 Mar 2024 12:57:45 +0100 Subject: [PATCH 02/20] feat(sdk): Start disabling the global store in `EventCache`. --- crates/matrix-sdk/src/event_cache/mod.rs | 229 ++++++++++++----------- 1 file changed, 124 insertions(+), 105 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 1087bdd48..d1162c1e6 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -68,10 +68,11 @@ use tokio::{ }; use tracing::{error, instrument, trace, warn}; -use self::store::{EventCacheStore, MemoryStore, TimelineEntry}; -use crate::{ - client::ClientInner, event_cache::store::PaginationToken, room::MessagesOptions, Client, Room, +use self::{ + linked_chunk::ChunkContent, + store::{EventCacheStore, Gap, MemoryStore, PaginationToken, RoomEvents, TimelineEntry}, }; +use crate::{client::ClientInner, room::MessagesOptions, Client, Room}; mod linked_chunk; mod store; @@ -149,15 +150,14 @@ impl Debug for EventCache { impl EventCache { /// Create a new [`EventCache`] for the given client. pub(crate) fn new(client: &Arc) -> Self { - let store = Arc::new(MemoryStore::new()); - let inner = Arc::new(EventCacheInner { - client: Arc::downgrade(client), - by_room: Default::default(), - store: Arc::new(Mutex::new(store)), - drop_handles: Default::default(), - }); - - Self { inner } + Self { + inner: Arc::new(EventCacheInner { + client: Arc::downgrade(client), + multiple_room_updates_lock: Default::default(), + by_room: Default::default(), + drop_handles: Default::default(), + }), + } } /// Starts subscribing the [`EventCache`] to sync responses, if not done @@ -205,6 +205,8 @@ impl EventCache { // Forget everything we know; we could have missed events, and we have // no way to reconcile at the moment! // TODO: implement Smart Matching™, + todo!(); + /* let store = inner.store.lock().await; let mut by_room = inner.by_room.write().await; for room_id in by_room.keys() { @@ -213,6 +215,7 @@ impl EventCache { } } by_room.clear(); + */ } Err(RecvError::Closed) => { @@ -256,15 +259,17 @@ impl EventCache { // We could have received events during a previous sync; remove them all, since // we can't know where to insert the "initial events" with respect to // them. + todo!(); + /* let store = self.inner.store.lock().await; store.clear_room(room_id).await?; + */ let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::Clear); room_cache .inner .append_events( - &**store, events, prev_batch, Default::default(), @@ -282,20 +287,14 @@ struct EventCacheInner { /// on the owning client. client: Weak, + multiple_room_updates_lock: Mutex<()>, + /// Lazily-filled cache of live [`RoomEventCache`], once per room. by_room: RwLock>, - /// Backend used for storage. - /// - /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to - /// ensure that multiple updates will be applied in the correct order, which - /// is enforced by taking the store lock when handling an update. - /// - /// TODO: replace with a cross-process lock - store: Arc>>, - /// Handles to keep alive the task listening to updates. drop_handles: OnceLock>, + // TODO: that's the place to add a cross-process lock! } impl EventCacheInner { @@ -308,7 +307,7 @@ impl EventCacheInner { async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> { // First, take the lock that indicates we're processing updates, to avoid // handling multiple updates concurrently. - let store = self.store.lock().await; + let lock = self.multiple_room_updates_lock.lock().await; // Left rooms. for (room_id, left_room_update) in updates.leave { @@ -317,7 +316,7 @@ impl EventCacheInner { continue; }; - if let Err(err) = room.inner.handle_left_room_update(&**store, left_room_update).await { + if let Err(err) = room.inner.handle_left_room_update(left_room_update).await { // Non-fatal error, try to continue to the next room. error!("handling left room update: {err}"); } @@ -330,9 +329,7 @@ impl EventCacheInner { continue; }; - if let Err(err) = - room.inner.handle_joined_room_update(&**store, joined_room_update).await - { + if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await { // Non-fatal error, try to continue to the next room. error!("handling joined room update: {err}"); } @@ -371,7 +368,7 @@ impl EventCacheInner { return Ok(None); }; - let room_event_cache = RoomEventCache::new(room, self.store.clone()); + let room_event_cache = RoomEventCache::new(room); by_room_guard.insert(room_id.to_owned(), room_event_cache.clone()); @@ -397,8 +394,8 @@ impl Debug for RoomEventCache { impl RoomEventCache { /// Create a new [`RoomEventCache`] using the given room and store. - fn new(room: Room, store: Arc>>) -> Self { - Self { inner: Arc::new(RoomEventCacheInner::new(room, store)) } + fn new(room: Room) -> Self { + Self { inner: Arc::new(RoomEventCacheInner::new(room)) } } /// Subscribe to room updates for this room, after getting the initial list @@ -408,9 +405,12 @@ impl RoomEventCache { pub async fn subscribe( &self, ) -> Result<(Vec, Receiver)> { + /* let store = self.inner.store.lock().await; Ok((store.room_events(self.inner.room.room_id()).await?, self.inner.sender.subscribe())) + */ + todo!() } /// Returns the oldest back-pagination token, that is, the one closest to @@ -447,14 +447,12 @@ struct RoomEventCacheInner { /// Sender part for subscribers to this room. sender: Sender, - /// Backend used for storage, shared with the parent [`EventCacheInner`]. - /// - /// See comment there. - store: Arc>>, - /// The Client [`Room`] this event cache pertains to. room: Room, + /// The events of the room. + events: RwLock, + /// A notifier that we received a new pagination token. pagination_token_notifier: Notify, @@ -466,24 +464,20 @@ struct RoomEventCacheInner { impl RoomEventCacheInner { /// Creates a new cache for a room, and subscribes to room updates, so as /// to handle new timeline events. - fn new(room: Room, store: Arc>>) -> Self { + fn new(room: Room) -> Self { let sender = Sender::new(32); + Self { room, - store, + events: RwLock::new(RoomEvents::default()), sender, pagination_lock: Default::default(), pagination_token_notifier: Default::default(), } } - async fn handle_joined_room_update( - &self, - store: &dyn EventCacheStore, - updates: JoinedRoomUpdate, - ) -> Result<()> { + async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> { self.handle_timeline( - store, updates.timeline, updates.ephemeral.clone(), updates.account_data, @@ -495,7 +489,6 @@ impl RoomEventCacheInner { async fn handle_timeline( &self, - store: &dyn EventCacheStore, timeline: Timeline, ephemeral: Vec>, account_data: Vec>, @@ -508,7 +501,7 @@ impl RoomEventCacheInner { trace!("limited timeline, clearing all previous events"); // Clear internal state (events, pagination tokens, etc.). - store.clear_room(self.room.room_id()).await?; + self.events.write().await.reset(); // Propagate to observers. let _ = self.sender.send(RoomEventCacheUpdate::Clear); @@ -517,7 +510,6 @@ impl RoomEventCacheInner { // Add all the events to the backend. trace!("adding new events"); self.append_events( - store, timeline.events, timeline.prev_batch, account_data, @@ -529,19 +521,9 @@ impl RoomEventCacheInner { Ok(()) } - async fn handle_left_room_update( - &self, - store: &dyn EventCacheStore, - updates: LeftRoomUpdate, - ) -> Result<()> { - self.handle_timeline( - store, - updates.timeline, - Vec::new(), - Vec::new(), - updates.ambiguity_changes, - ) - .await?; + async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> { + self.handle_timeline(updates.timeline, Vec::new(), Vec::new(), updates.ambiguity_changes) + .await?; Ok(()) } @@ -549,7 +531,6 @@ impl RoomEventCacheInner { /// observers. async fn append_events( &self, - store: &dyn EventCacheStore, events: Vec, prev_batch: Option, account_data: Vec>, @@ -565,21 +546,17 @@ impl RoomEventCacheInner { return Ok(()); } - let room_id = self.room.room_id(); - // Add the previous back-pagination token (if present), followed by the timeline // events themselves. - let gap_with_token = prev_batch - .clone() - .map(|val| TimelineEntry::Gap { prev_token: PaginationToken(val) }) - .into_iter(); + { + let mut room_events = self.events.write().await; - store - .append_room_entries( - room_id, - gap_with_token.chain(events.iter().cloned().map(TimelineEntry::Event)).collect(), - ) - .await?; + if let Some(prev_token) = &prev_batch { + room_events.push_gap(Gap { prev_token: PaginationToken(prev_token.clone()) }); + } + + room_events.push_events(events.clone().into_iter()); + } if prev_batch.is_some() { self.pagination_token_notifier.notify_one(); @@ -610,13 +587,27 @@ impl RoomEventCacheInner { // Make sure there's at most one back-pagination request. let _guard = self.pagination_lock.lock().await; - if let Some(token) = token.as_ref() { - let store = self.store.lock().await; - if !store.contains_gap(self.room.room_id(), token).await? { + // Make sure the `RoomEvents` isn't updated while we are back-paginating. + let mut room_events = self.events.write().await; + + // Check that the `token` exists if any. + let gap_identifier = if let Some(token) = token.as_ref() { + let gap_identifier = room_events.chunk_identifier(|chunk| { + matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if prev_token == token) + }); + + // The method has been called with `token` but it doesn't exist in `RoomEvents`, + // it's an error. + if gap_identifier.is_none() { return Err(EventCacheError::UnknownBackpaginationToken); } - } + gap_identifier + } else { + None + }; + + // Get messages. let messages = self .room .messages(assign!(MessagesOptions::backward(), { @@ -629,7 +620,8 @@ impl RoomEventCacheInner { // Would we want to backpaginate again, we'd start from the `end` token as the // next `from` token. - let prev_token = messages.end; + let prev_token = + messages.end.map(|prev_token| Gap { prev_token: PaginationToken(prev_token) }); // If this token is missing, then we've reached the end of the timeline. let reached_start = prev_token.is_none(); @@ -640,41 +632,64 @@ impl RoomEventCacheInner { // should be prepended first). let events = messages.chunk; - // Prepend the previous token (if any) at the beginning of the timeline, - // followed by the events received in the response (in reverse order). - let new_gap = prev_token - .map(|token| TimelineEntry::Gap { prev_token: PaginationToken(token) }) - .into_iter(); + let sync_events = events + .iter() + // Reverse the order of the events as `/messages` has been called with `dir=b` + // (backward). The `RoomEvents` API expects the first event to be the oldest. + .rev() + .cloned() + .map(|timeline_event| SyncTimelineEvent::from(timeline_event)); - // For storage, reverse events to store them in the normal (non-reversed order). - // - // It's fine to convert from `TimelineEvent` (i.e. that has a room id) to - // `SyncTimelineEvent` (i.e. that doesn't have it), because those events are - // always tied to a room in storage anyways. - let new_events = events.iter().rev().map(|ev| TimelineEntry::Event(ev.clone().into())); + // There is a `token`/gap, let's replace it by new events! + if let Some(gap_identifier) = gap_identifier { + // Replace the gap by new events. + room_events + .replace_gap_at(sync_events, gap_identifier) + // SAFETY: we are sure that `gap_identifier` represents a valid `ChunkIdentifier` + // for a gap. + .unwrap(); - let replaced = self - .store - .lock() - .await - .replace_gap(self.room.room_id(), token.as_ref(), new_gap.chain(new_events).collect()) - .await?; + // And insert a new gap if there is any `prev_token`. + if let Some(prev_token_gap) = prev_token { + room_events + .insert_gap_at(prev_token_gap, gap_identifier.into()) + // SAFETY: we are sure that `gap_identifier` represents a valid + // `ChunkIdentifier` for a gap. + .unwrap(); + } - if !replaced { - // The previous token disappeared! - // This can happen if we got a limited timeline and lost track of our pagination - // token, because the whole timeline has been reset. - // - // TODO: With smarter reconciliation, this might get away. In the meanwhile, - // early return and forget about all the events. - trace!("gap was missing, likely because we observed a gappy sync response"); - Ok(BackPaginationOutcome::UnknownBackpaginationToken) - } else { trace!("replaced gap with new events from backpagination"); // TODO: implement smarter reconciliation later //let _ = self.sender.send(RoomEventCacheUpdate::Prepend { events }); + Ok(BackPaginationOutcome::Success { events, reached_start }) + } + // There is no `token`/gap identifier. Let's assume we must prepend the new events. + else { + let first_item_position = + room_events.events().nth(0).map(|(item_position, _)| item_position); + + match first_item_position { + // Is there a first item? Insert at this position. + Some(item_position) => { + if let Some(prev_token_gap) = prev_token { + room_events.insert_gap_at(prev_token_gap, item_position).unwrap(); + } + + room_events.insert_events_at(sync_events, item_position).unwrap(); + } + + // There is no first item. Let's simply push. + None => { + if let Some(prev_token_gap) = prev_token { + room_events.push_gap(prev_token_gap); + } + + room_events.push_events(sync_events); + } + } + Ok(BackPaginationOutcome::Success { events, reached_start }) } } @@ -689,6 +704,8 @@ impl RoomEventCacheInner { max_wait: Option, ) -> Result> { // Optimistically try to return the backpagination token immediately. + todo!(); + /* if let Some(token) = self.store.lock().await.oldest_backpagination_token(self.room.room_id()).await? { @@ -705,6 +722,7 @@ impl RoomEventCacheInner { let _ = timeout(max_wait, self.pagination_token_notifier.notified()).await; self.store.lock().await.oldest_backpagination_token(self.room.room_id()).await + */ } } @@ -758,7 +776,6 @@ pub enum RoomEventCacheUpdate { #[cfg(test)] mod tests { - use assert_matches2::assert_matches; use matrix_sdk_common::executor::spawn; use matrix_sdk_test::{async_test, sync_timeline_event}; @@ -803,6 +820,7 @@ mod tests { assert_matches!(res.unwrap_err(), EventCacheError::UnknownBackpaginationToken); } + /* // Those tests require time to work, and it does not on wasm32. #[cfg(not(target_arch = "wasm32"))] mod time_tests { @@ -1003,4 +1021,5 @@ mod tests { insert_token_task.await.unwrap(); } } + */ } From 9102a9c84124d0b6bd2fbd4910df38cb33ff6dc3 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 Mar 2024 14:30:58 +0100 Subject: [PATCH 03/20] feat(sdk): `RoomEventCachecacherInner::oldest_backpagination_token` uses `RoomEvents`. --- crates/matrix-sdk/src/event_cache/mod.rs | 22 ++++++++++++---------- crates/matrix-sdk/src/event_cache/store.rs | 7 +++++++ 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index d1162c1e6..23e44baaf 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -62,7 +62,7 @@ use ruma::{ use tokio::{ sync::{ broadcast::{error::RecvError, Receiver, Sender}, - Mutex, Notify, RwLock, + Mutex, Notify, RwLock, RwLockReadGuard, }, time::timeout, }; @@ -307,7 +307,7 @@ impl EventCacheInner { async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> { // First, take the lock that indicates we're processing updates, to avoid // handling multiple updates concurrently. - let lock = self.multiple_room_updates_lock.lock().await; + let _lock = self.multiple_room_updates_lock.lock().await; // Left rooms. for (room_id, left_room_update) in updates.leave { @@ -704,16 +704,19 @@ impl RoomEventCacheInner { max_wait: Option, ) -> Result> { // Optimistically try to return the backpagination token immediately. - todo!(); - /* - if let Some(token) = - self.store.lock().await.oldest_backpagination_token(self.room.room_id()).await? - { + fn get_oldest(room_events: RwLockReadGuard) -> Option { + room_events.chunks().find_map(|chunk| match chunk.content() { + ChunkContent::Gap(gap) => Some(gap.prev_token.clone()), + ChunkContent::Items(..) => None, + }) + } + + if let Some(token) = get_oldest(self.events.read().await) { return Ok(Some(token)); } let Some(max_wait) = max_wait else { - // We had no token and no time to wait, so... no tokens. + // We had no token and no time to wait, so… no tokens. return Ok(None); }; @@ -721,8 +724,7 @@ impl RoomEventCacheInner { // Timeouts are fine, per this function's contract. let _ = timeout(max_wait, self.pagination_token_notifier.notified()).await; - self.store.lock().await.oldest_backpagination_token(self.room.room_id()).await - */ + Ok(get_oldest(self.events.read().await)) } } diff --git a/crates/matrix-sdk/src/event_cache/store.rs b/crates/matrix-sdk/src/event_cache/store.rs index 998b2b655..c0fe3b08e 100644 --- a/crates/matrix-sdk/src/event_cache/store.rs +++ b/crates/matrix-sdk/src/event_cache/store.rs @@ -324,6 +324,13 @@ impl RoomEvents { self.chunks.rchunks() } + /// Iterate over the chunks, forward. + /// + /// The oldest chunk comes first. + pub fn chunks(&self) -> LinkedChunkIter<'_, SyncTimelineEvent, Gap, DEFAULT_CHUNK_CAPACITY> { + self.chunks.chunks() + } + /// Iterate over the chunks, starting from `identifier`, backward. pub fn rchunks_from( &self, From 29caa02ef0098ccb0ba0bd8b1a215fddd484fa61 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 Mar 2024 14:44:08 +0100 Subject: [PATCH 04/20] feat(sdk): `EventCache::add_initial_events` uses `RoomEvents`. --- crates/matrix-sdk/src/event_cache/mod.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 23e44baaf..c35b51d85 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -259,12 +259,8 @@ impl EventCache { // We could have received events during a previous sync; remove them all, since // we can't know where to insert the "initial events" with respect to // them. - todo!(); - /* - let store = self.inner.store.lock().await; + room_cache.inner.events.write().await.reset(); - store.clear_room(room_id).await?; - */ let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::Clear); room_cache From 022b8a0f38e137cd30691a604ec9d0b8c28c6af7 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 Mar 2024 14:49:52 +0100 Subject: [PATCH 05/20] feat(sdk): `EventCache::listen_task` uses `RoomEvents`. --- crates/matrix-sdk/src/event_cache/mod.rs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index c35b51d85..debf0bae2 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -205,17 +205,7 @@ impl EventCache { // Forget everything we know; we could have missed events, and we have // no way to reconcile at the moment! // TODO: implement Smart Matching™, - todo!(); - /* - let store = inner.store.lock().await; - let mut by_room = inner.by_room.write().await; - for room_id in by_room.keys() { - if let Err(err) = store.clear_room(room_id).await { - error!("unable to clear room after room updates lag: {err}"); - } - } - by_room.clear(); - */ + inner.by_room.write().await.clear(); } Err(RecvError::Closed) => { From 667ada88e6d1da89e5de39cf72f7fbe834887e6b Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 Mar 2024 14:52:31 +0100 Subject: [PATCH 06/20] feat(sdk): `RoomEventCache::subscribe` uses `RoomEvents`. --- crates/matrix-sdk/src/event_cache/mod.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index debf0bae2..815d3fa9d 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -391,12 +391,10 @@ impl RoomEventCache { pub async fn subscribe( &self, ) -> Result<(Vec, Receiver)> { - /* - let store = self.inner.store.lock().await; + let events = + self.inner.events.read().await.events().map(|(_position, item)| item.clone()).collect(); - Ok((store.room_events(self.inner.room.room_id()).await?, self.inner.sender.subscribe())) - */ - todo!() + Ok((events, self.inner.sender.subscribe())) } /// Returns the oldest back-pagination token, that is, the one closest to From 5bb2511914cc75ce331a810acb4bcb4cdb6ffa4e Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 Mar 2024 15:12:18 +0100 Subject: [PATCH 07/20] test(sdk): Tests of `EventCache` uses `RoomEvents`. --- crates/matrix-sdk/src/event_cache/mod.rs | 114 +++++++++-------------- 1 file changed, 44 insertions(+), 70 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 815d3fa9d..f43ce3c6b 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -806,50 +806,41 @@ mod tests { assert_matches!(res.unwrap_err(), EventCacheError::UnknownBackpaginationToken); } - /* // Those tests require time to work, and it does not on wasm32. #[cfg(not(target_arch = "wasm32"))] mod time_tests { use std::time::{Duration, Instant}; + use matrix_sdk_base::RoomState; use tokio::time::sleep; - use super::*; + use super::{super::store::Gap, *}; #[async_test] async fn test_wait_no_pagination_token() { let client = logged_in_client(None).await; let room_id = room_id!("!galette:saucisse.bzh"); - client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); + client.base_client().get_or_create_room(room_id, RoomState::Joined); - client.event_cache().subscribe().unwrap(); + let event_cache = client.event_cache(); + + event_cache.subscribe().unwrap(); + + let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap(); + let room_event_cache = room_event_cache.unwrap(); // When I only have events in a room, - client - .event_cache() - .inner - .store - .lock() - .await - .append_room_entries( - room_id, - vec![TimelineEntry::Event( - sync_timeline_event!({ - "sender": "b@z.h", - "type": "m.room.message", - "event_id": "$ida", - "origin_server_ts": 12344446, - "content": { "body":"yolo", "msgtype": "m.text" }, - }) - .into(), - )], - ) - .await - .unwrap(); - - let (room_event_cache, _drop_handlers) = - client.event_cache().for_room(room_id).await.unwrap(); - let room_event_cache = room_event_cache.unwrap(); + { + let mut room_events = room_event_cache.inner.events.write().await; + room_events.push_events([sync_timeline_event!({ + "sender": "b@z.h", + "type": "m.room.message", + "event_id": "$ida", + "origin_server_ts": 12344446, + "content": { "body":"yolo", "msgtype": "m.text" }, + }) + .into()]); + } // If I don't wait for the backpagination token, let found = room_event_cache.oldest_backpagination_token(None).await.unwrap(); @@ -888,39 +879,28 @@ mod tests { let room_id = room_id!("!galette:saucisse.bzh"); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); - client.event_cache().subscribe().unwrap(); + let event_cache = client.event_cache(); - let (room_event_cache, _drop_handles) = - client.event_cache().for_room(room_id).await.unwrap(); + event_cache.subscribe().unwrap(); + + let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap(); let room_event_cache = room_event_cache.unwrap(); let expected_token = PaginationToken("old".to_owned()); // When I have events and multiple gaps, in a room, - client - .event_cache() - .inner - .store - .lock() - .await - .append_room_entries( - room_id, - vec![ - TimelineEntry::Gap { prev_token: expected_token.clone() }, - TimelineEntry::Event( - sync_timeline_event!({ - "sender": "b@z.h", - "type": "m.room.message", - "event_id": "$ida", - "origin_server_ts": 12344446, - "content": { "body":"yolo", "msgtype": "m.text" }, - }) - .into(), - ), - ], - ) - .await - .unwrap(); + { + let mut room_events = room_event_cache.inner.events.write().await; + room_events.push_gap(Gap { prev_token: expected_token.clone() }); + room_events.push_events([sync_timeline_event!({ + "sender": "b@z.h", + "type": "m.room.message", + "event_id": "$ida", + "origin_server_ts": 12344446, + "content": { "body":"yolo", "msgtype": "m.text" }, + }) + .into()]); + } // If I don't wait for a back-pagination token, let found = room_event_cache.oldest_backpagination_token(None).await.unwrap(); @@ -958,7 +938,9 @@ mod tests { let room_id = room_id!("!galette:saucisse.bzh"); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); - client.event_cache().subscribe().unwrap(); + let event_cache = client.event_cache(); + + event_cache.subscribe().unwrap(); let (room_event_cache, _drop_handles) = client.event_cache().for_room(room_id).await.unwrap(); @@ -968,22 +950,15 @@ mod tests { let before = Instant::now(); let cloned_expected_token = expected_token.clone(); + let cloned_room_event_cache = room_event_cache.clone(); let insert_token_task = spawn(async move { // If a backpagination token is inserted after 400 milliseconds, sleep(Duration::from_millis(400)).await; - client - .event_cache() - .inner - .store - .lock() - .await - .append_room_entries( - room_id, - vec![TimelineEntry::Gap { prev_token: cloned_expected_token }], - ) - .await - .unwrap(); + { + let mut room_events = cloned_room_event_cache.inner.events.write().await; + room_events.push_gap(Gap { prev_token: cloned_expected_token }); + } }); // Then first I don't get it (if I'm not waiting,) @@ -1007,5 +982,4 @@ mod tests { insert_token_task.await.unwrap(); } } - */ } From 85538dc3ed6a2d64fca6c838797513e245980b5e Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 18 Mar 2024 15:17:25 +0100 Subject: [PATCH 08/20] feat(sdk): Remove `EventCacheStore`, `TimelineEntry`, `RoomInfo` and `MemoryStore`. --- crates/matrix-sdk/src/event_cache/mod.rs | 4 +- crates/matrix-sdk/src/event_cache/store.rs | 214 ++------------------- 2 files changed, 16 insertions(+), 202 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index f43ce3c6b..435b79636 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -70,7 +70,7 @@ use tracing::{error, instrument, trace, warn}; use self::{ linked_chunk::ChunkContent, - store::{EventCacheStore, Gap, MemoryStore, PaginationToken, RoomEvents, TimelineEntry}, + store::{Gap, PaginationToken, RoomEvents}, }; use crate::{client::ClientInner, room::MessagesOptions, Client, Room}; @@ -767,7 +767,7 @@ mod tests { use matrix_sdk_test::{async_test, sync_timeline_event}; use ruma::room_id; - use super::{store::TimelineEntry, EventCacheError}; + use super::EventCacheError; use crate::{event_cache::store::PaginationToken, test_utils::logged_in_client}; #[async_test] diff --git a/crates/matrix-sdk/src/event_cache/store.rs b/crates/matrix-sdk/src/event_cache/store.rs index c0fe3b08e..a29d036ab 100644 --- a/crates/matrix-sdk/src/event_cache/store.rs +++ b/crates/matrix-sdk/src/event_cache/store.rs @@ -12,199 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::BTreeMap, fmt, iter::once, result::Result as StdResult}; +use std::{fmt, iter::once, result::Result}; -use async_trait::async_trait; use matrix_sdk_common::deserialized_responses::SyncTimelineEvent; -use ruma::{OwnedRoomId, RoomId}; -use tokio::sync::RwLock; -use super::{ - linked_chunk::{ - Chunk, ChunkIdentifier, LinkedChunk, LinkedChunkError, LinkedChunkIter, - LinkedChunkIterBackward, Position, - }, - Result, +use super::linked_chunk::{ + Chunk, ChunkIdentifier, LinkedChunk, LinkedChunkError, LinkedChunkIter, + LinkedChunkIterBackward, Position, }; -/// A store that can be remember information about the event cache. -/// -/// It really acts as a cache, in the sense that clearing the backing data -/// should not have any irremediable effect, other than providing a lesser user -/// experience. -#[async_trait] -pub trait EventCacheStore: Send + Sync { - /// Returns all the known events for the given room. - async fn room_events(&self, room: &RoomId) -> Result>; - - /// Adds all the entries to the given room's timeline. - async fn append_room_entries(&self, room: &RoomId, entries: Vec) -> Result<()>; - - /// Returns whether the store knows about the given pagination token. - async fn contains_gap(&self, room: &RoomId, pagination_token: &PaginationToken) - -> Result; - - /// Replaces a given gap (identified by its pagination token) with the given - /// entries. - /// - /// Note: if the gap hasn't been found, then nothing happens, and the events - /// are lost. - /// - /// Returns whether the gap was found. - async fn replace_gap( - &self, - room: &RoomId, - gap_id: Option<&PaginationToken>, - entries: Vec, - ) -> Result; - - /// Retrieve the oldest backpagination token for the given room. - async fn oldest_backpagination_token(&self, room: &RoomId) -> Result>; - - /// Clear all the information tied to a given room. - /// - /// This forgets the following: - /// - events in the room - /// - pagination tokens - async fn clear_room(&self, room: &RoomId) -> Result<()>; -} - /// A newtype wrapper for a pagination token returned by a /messages response. #[derive(Clone, Debug, PartialEq)] pub struct PaginationToken(pub String); -#[derive(Clone)] -pub enum TimelineEntry { - Event(SyncTimelineEvent), - - Gap { - /// The token to use in the query, extracted from a previous "from" / - /// "end" field of a `/messages` response. - prev_token: PaginationToken, - }, -} - -/// All the information related to a room and stored in the event cache. -#[derive(Default)] -struct RoomInfo { - /// All the timeline entries per room, in sync order. - entries: Vec, -} - -impl RoomInfo { - fn clear(&mut self) { - self.entries.clear(); - } -} - -/// An [`EventCacheStore`] implementation that keeps all the information in -/// memory. -#[derive(Default)] -pub(crate) struct MemoryStore { - by_room: RwLock>, -} - -impl MemoryStore { - /// Create a new empty [`MemoryStore`]. - pub fn new() -> Self { - Default::default() - } -} - -#[async_trait] -impl EventCacheStore for MemoryStore { - async fn room_events(&self, room: &RoomId) -> Result> { - Ok(self - .by_room - .read() - .await - .get(room) - .map(|room_info| { - room_info - .entries - .iter() - .filter_map( - |entry| if let TimelineEntry::Event(ev) = entry { Some(ev) } else { None }, - ) - .cloned() - .collect() - }) - .unwrap_or_default()) - } - - async fn append_room_entries(&self, room: &RoomId, entries: Vec) -> Result<()> { - self.by_room.write().await.entry(room.to_owned()).or_default().entries.extend(entries); - Ok(()) - } - - async fn clear_room(&self, room: &RoomId) -> Result<()> { - // Clear the room, so as to avoid reallocations if the room is being reused. - // XXX: do we also want an actual way to *remove* a room? (for left rooms) - if let Some(room) = self.by_room.write().await.get_mut(room) { - room.clear(); - } - - Ok(()) - } - - async fn oldest_backpagination_token(&self, room: &RoomId) -> Result> { - Ok(self.by_room.read().await.get(room).and_then(|room| { - room.entries.iter().find_map(|entry| { - if let TimelineEntry::Gap { prev_token: backpagination_token } = entry { - Some(backpagination_token.clone()) - } else { - None - } - }) - })) - } - - async fn contains_gap(&self, room: &RoomId, needle: &PaginationToken) -> Result { - let mut by_room_guard = self.by_room.write().await; - let room = by_room_guard.entry(room.to_owned()).or_default(); - - Ok(room.entries.iter().any(|entry| { - if let TimelineEntry::Gap { prev_token: existing } = entry { - existing == needle - } else { - false - } - })) - } - - async fn replace_gap( - &self, - room: &RoomId, - token: Option<&PaginationToken>, - entries: Vec, - ) -> Result { - let mut by_room_guard = self.by_room.write().await; - let room = by_room_guard.entry(room.to_owned()).or_default(); - - if let Some(token) = token { - let gap_pos = room.entries.iter().enumerate().find_map(|(i, t)| { - if let TimelineEntry::Gap { prev_token: existing } = t { - if existing == token { - return Some(i); - } - } - None - }); - - if let Some(pos) = gap_pos { - room.entries.splice(pos..pos + 1, entries); - Ok(true) - } else { - Ok(false) - } - } else { - // We had no previous token: assume we can prepend the events. - room.entries.splice(0..0, entries); - Ok(true) - } - } -} - #[derive(Debug)] pub struct Gap { /// The token to use in the query, extracted from a previous "from" / @@ -266,7 +86,7 @@ impl RoomEvents { &mut self, events: I, position: Position, - ) -> StdResult<(), LinkedChunkError> + ) -> Result<(), LinkedChunkError> where I: IntoIterator, I::IntoIter: ExactSizeIterator, @@ -275,11 +95,7 @@ impl RoomEvents { } /// Insert a gap at a specified position. - pub fn insert_gap_at( - &mut self, - gap: Gap, - position: Position, - ) -> StdResult<(), LinkedChunkError> { + pub fn insert_gap_at(&mut self, gap: Gap, position: Position) -> Result<(), LinkedChunkError> { self.chunks.insert_gap_at(gap, position) } @@ -291,7 +107,7 @@ impl RoomEvents { &mut self, items: I, gap_identifier: ChunkIdentifier, - ) -> StdResult<(), LinkedChunkError> + ) -> Result<(), LinkedChunkError> where I: IntoIterator, I::IntoIter: ExactSizeIterator, @@ -335,7 +151,7 @@ impl RoomEvents { pub fn rchunks_from( &self, identifier: ChunkIdentifier, - ) -> StdResult< + ) -> Result< LinkedChunkIterBackward<'_, SyncTimelineEvent, Gap, DEFAULT_CHUNK_CAPACITY>, LinkedChunkError, > { @@ -347,10 +163,8 @@ impl RoomEvents { pub fn chunks_from( &self, identifier: ChunkIdentifier, - ) -> StdResult< - LinkedChunkIter<'_, SyncTimelineEvent, Gap, DEFAULT_CHUNK_CAPACITY>, - LinkedChunkError, - > { + ) -> Result, LinkedChunkError> + { self.chunks.chunks_from(identifier) } @@ -364,7 +178,7 @@ impl RoomEvents { /// Iterate over the events, forward. /// /// The oldest event comes first. - pub fn events(&self) -> impl Iterator { + pub fn events(&self) -> impl Iterator { self.chunks.items() } @@ -372,7 +186,7 @@ impl RoomEvents { pub fn revents_from( &self, position: Position, - ) -> StdResult, LinkedChunkError> { + ) -> Result, LinkedChunkError> { self.chunks.ritems_from(position) } @@ -381,13 +195,13 @@ impl RoomEvents { pub fn events_from( &self, position: Position, - ) -> StdResult, LinkedChunkError> { + ) -> Result, LinkedChunkError> { self.chunks.items_from(position) } } impl fmt::Debug for RoomEvents { - fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { formatter.debug_struct("RoomEvents").field("chunk", &self.chunks).finish() } } From 25fb9ee47dd9820670c0c14141450c2de3285e47 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 20 Mar 2024 10:01:12 +0100 Subject: [PATCH 09/20] feat(sdk): Update `RoomEvents::replace_gap_at` to return a `&Chunk`. --- crates/matrix-sdk/src/event_cache/mod.rs | 20 +++++++++++++------- crates/matrix-sdk/src/event_cache/store.rs | 11 +++++++---- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 435b79636..e15e42f03 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -542,6 +542,8 @@ impl RoomEventCacheInner { room_events.push_events(events.clone().into_iter()); } + // Now that all events have been added, we can trigger the + // `pagination_token_notifier`. if prev_batch.is_some() { self.pagination_token_notifier.notify_one(); } @@ -626,17 +628,21 @@ impl RoomEventCacheInner { // There is a `token`/gap, let's replace it by new events! if let Some(gap_identifier) = gap_identifier { - // Replace the gap by new events. - room_events - .replace_gap_at(sync_events, gap_identifier) - // SAFETY: we are sure that `gap_identifier` represents a valid `ChunkIdentifier` - // for a gap. - .unwrap(); + let new_position = { + // Replace the gap by new events. + let new_chunk = room_events + .replace_gap_at(sync_events, gap_identifier) + // SAFETY: we are sure that `gap_identifier` represents a valid + // `ChunkIdentifier` for a gap. + .unwrap(); + + new_chunk.first_position() + }; // And insert a new gap if there is any `prev_token`. if let Some(prev_token_gap) = prev_token { room_events - .insert_gap_at(prev_token_gap, gap_identifier.into()) + .insert_gap_at(prev_token_gap, new_position) // SAFETY: we are sure that `gap_identifier` represents a valid // `ChunkIdentifier` for a gap. .unwrap(); diff --git a/crates/matrix-sdk/src/event_cache/store.rs b/crates/matrix-sdk/src/event_cache/store.rs index a29d036ab..e5b53612d 100644 --- a/crates/matrix-sdk/src/event_cache/store.rs +++ b/crates/matrix-sdk/src/event_cache/store.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fmt, iter::once, result::Result}; +use std::{fmt, iter::once}; use matrix_sdk_common::deserialized_responses::SyncTimelineEvent; @@ -103,16 +103,19 @@ impl RoomEvents { /// /// Because the `gap_identifier` can represent non-gap chunk, this method /// returns a `Result`. + /// + /// The returned `Chunk` represents the newly created `Chunk` that contains + /// the first events. pub fn replace_gap_at( &mut self, - items: I, + events: I, gap_identifier: ChunkIdentifier, - ) -> Result<(), LinkedChunkError> + ) -> Result<&Chunk, LinkedChunkError> where I: IntoIterator, I::IntoIter: ExactSizeIterator, { - self.chunks.replace_gap_at(items, gap_identifier) + self.chunks.replace_gap_at(events, gap_identifier) } /// Search for a chunk, and return its identifier. From 9319f4fcffea5cea189fda0aa2ea8eefe4008e4a Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 20 Mar 2024 10:01:41 +0100 Subject: [PATCH 10/20] test(sdk): Fix `test_reset_while_backpaginating`. The test `test_reset_while_backpaginating` was expecting a race-condition, which no longer exists. It first initially tried to assert a workaround about this race-condition. It doesn't hold anymore. Rewrite the test to assert the (correct) new behaviour. --- .../tests/integration/event_cache.rs | 36 +++++++++++++------ 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index dae21ab0d..e4ac43808 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -428,11 +428,12 @@ async fn test_reset_while_backpaginating() { JoinedRoomBuilder::new(room_id) // Note to self: a timeline must have at least single event to be properly // serialized. - .add_timeline_event(event_builder.make_sync_message_event( + .add_timeline_event(event_builder.make_sync_message_event_with_id( user_id!("@a:b.c"), + event_id!("$from_first_sync"), RoomMessageEventContent::text_plain("heyo"), )) - .set_timeline_prev_batch("first_backpagination".to_owned()), + .set_timeline_prev_batch("first_prev_batch_token".to_owned()), ); let response_body = sync_builder.build_json_sync_response(); @@ -471,25 +472,27 @@ async fn test_reset_while_backpaginating() { JoinedRoomBuilder::new(room_id) // Note to self: a timeline must have at least single event to be properly // serialized. - .add_timeline_event(event_builder.make_sync_message_event( + .add_timeline_event(event_builder.make_sync_message_event_with_id( user_id!("@a:b.c"), + event_id!("$from_second_sync"), RoomMessageEventContent::text_plain("heyo"), )) - .set_timeline_prev_batch("second_backpagination".to_owned()) + .set_timeline_prev_batch("second_prev_batch_token_from_sync".to_owned()) .set_timeline_limited(), ); let sync_response_body = sync_builder.build_json_sync_response(); // First back-pagination request: - let chunk = non_sync_events!(event_builder, [ (room_id, "$2": "lalala") ]); + let chunk = non_sync_events!(event_builder, [ (room_id, "$from_backpagination": "lalala") ]); let response_json = json!({ "chunk": chunk, "start": "t392-516_47314_0_7_1_1_1_11444_1", + "end": "second_prev_batch_token_from_backpagination" }); Mock::given(method("GET")) .and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$")) .and(header("authorization", "Bearer 1234")) - .and(query_param("from", "first_backpagination")) + .and(query_param("from", "first_prev_batch_token")) .respond_with( ResponseTemplate::new(200) .set_body_json(response_json.clone()) @@ -506,7 +509,11 @@ async fn test_reset_while_backpaginating() { let rec = room_event_cache.clone(); let first_token_clone = first_token.clone(); - let backpagination = spawn(async move { rec.backpaginate(20, first_token_clone).await }); + let backpagination = spawn(async move { + let ret = rec.backpaginate(20, first_token_clone).await; + + ret + }); // Receive the sync response (which clears the timeline). mock_sync(&server, sync_response_body, None).await; @@ -514,14 +521,21 @@ async fn test_reset_while_backpaginating() { let outcome = backpagination.await.expect("join failed").unwrap(); - // Backpagination should be confused, and the operation should result in an - // unknown token. - assert_matches!(outcome, BackPaginationOutcome::UnknownBackpaginationToken); + // Backpagination should have been executed before the sync, despite the + // concurrency here. The backpagination should have acquired a write lock before + // the sync. + { + assert_let!(BackPaginationOutcome::Success { events, reached_start } = outcome); + assert!(!reached_start); + assert_event_matches_msg(&events[0].clone().into(), "lalala"); + assert_eq!(events.len(), 1); + } // Now if we retrieve the earliest token, it's not the one we had before. + // Even better, it's the one from the sync, NOT from the backpagination! let second_token = room_event_cache.oldest_backpagination_token(None).await.unwrap().unwrap(); assert!(first_token.unwrap() != second_token); - assert_eq!(second_token.0, "second_backpagination"); + assert_eq!(second_token.0, "second_prev_batch_token_from_sync"); } #[async_test] From fa5bbadf57c50b7d1f32a1c022048025dfaabac7 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 20 Mar 2024 15:42:15 +0100 Subject: [PATCH 11/20] doc(sdk): Just highlight how important this lock is. --- crates/matrix-sdk/src/event_cache/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index e15e42f03..0c3df8400 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -573,7 +573,9 @@ impl RoomEventCacheInner { // Make sure there's at most one back-pagination request. let _guard = self.pagination_lock.lock().await; - // Make sure the `RoomEvents` isn't updated while we are back-paginating. + // Make sure the `RoomEvents` isn't updated while we are back-paginating. This + // is really important, for example if a sync is happening while we are + // back-paginating. let mut room_events = self.events.write().await; // Check that the `token` exists if any. From f61de718b831ea2545dbda0cba8d69be6050eb95 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 20 Mar 2024 21:54:34 +0100 Subject: [PATCH 12/20] fix(sdk): Fix a race-condition in `EventCache`. This patch ensures that operations on `RoomEvents` happen in one block, by sharing the same lock. 2 new methods are created: `replace_all_events_by` and `append_new_events`. --- crates/matrix-sdk/src/event_cache/mod.rs | 105 ++++++++++++++++++----- 1 file changed, 82 insertions(+), 23 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 0c3df8400..5d4c22480 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -62,7 +62,7 @@ use ruma::{ use tokio::{ sync::{ broadcast::{error::RecvError, Receiver, Sender}, - Mutex, Notify, RwLock, RwLockReadGuard, + Mutex, Notify, RwLock, RwLockReadGuard, RwLockWriteGuard, }, time::timeout, }; @@ -249,13 +249,14 @@ impl EventCache { // We could have received events during a previous sync; remove them all, since // we can't know where to insert the "initial events" with respect to // them. - room_cache.inner.events.write().await.reset(); + // let mut room_events = room_cache.inner.events.write().await; + // room_events.reset(); - let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::Clear); + // let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::Clear); room_cache .inner - .append_events( + .replace_all_events_by( events, prev_batch, Default::default(), @@ -482,26 +483,30 @@ impl RoomEventCacheInner { // Ideally we'd try to reconcile existing events against those received in the // timeline, but we're not there yet. In the meanwhile, clear the // items from the room. TODO: implement Smart Matching™. - trace!("limited timeline, clearing all previous events"); + trace!("limited timeline, clearing all previous events and pushing new events"); - // Clear internal state (events, pagination tokens, etc.). - self.events.write().await.reset(); + self.replace_all_events_by( + timeline.events, + timeline.prev_batch, + account_data, + ephemeral, + ambiguity_changes, + ) + .await?; + } else { + // Add all the events to the backend. + trace!("adding new events"); - // Propagate to observers. - let _ = self.sender.send(RoomEventCacheUpdate::Clear); + self.append_new_events( + timeline.events, + timeline.prev_batch, + account_data, + ephemeral, + ambiguity_changes, + ) + .await?; } - // Add all the events to the backend. - trace!("adding new events"); - self.append_events( - timeline.events, - timeline.prev_batch, - account_data, - ephemeral, - ambiguity_changes, - ) - .await?; - Ok(()) } @@ -511,15 +516,71 @@ impl RoomEventCacheInner { Ok(()) } + // Remove existing events, and append a set of events to the room cache and + // storage, notifying observers. + async fn replace_all_events_by( + &self, + events: Vec, + prev_batch: Option, + account_data: Vec>, + ephemeral: Vec>, + ambiguity_changes: BTreeMap, + ) -> Result<()> { + // Acquire the lock. + let mut room_events = self.events.write().await; + + // Reset the events. + room_events.reset(); + + // Propagate to observers. + let _ = self.sender.send(RoomEventCacheUpdate::Clear); + + // Push the new events. + self.append_events_locked_impl( + room_events, + events, + prev_batch, + account_data, + ephemeral, + ambiguity_changes, + ) + .await + } + /// Append a set of events to the room cache and storage, notifying /// observers. - async fn append_events( + async fn append_new_events( &self, events: Vec, prev_batch: Option, account_data: Vec>, ephemeral: Vec>, ambiguity_changes: BTreeMap, + ) -> Result<()> { + self.append_events_locked_impl( + self.events.write().await, + events, + prev_batch, + account_data, + ephemeral, + ambiguity_changes, + ) + .await + } + + /// Append a set of events, with an attached lock. + /// + /// If the lock `room_events` is `None`, one will be created. + /// + /// This is a private implementation. It must not be exposed publicly. + async fn append_events_locked_impl( + &self, + mut room_events: RwLockWriteGuard<'_, RoomEvents>, + events: Vec, + prev_batch: Option, + account_data: Vec>, + ephemeral: Vec>, + ambiguity_changes: BTreeMap, ) -> Result<()> { if events.is_empty() && prev_batch.is_none() @@ -533,8 +594,6 @@ impl RoomEventCacheInner { // Add the previous back-pagination token (if present), followed by the timeline // events themselves. { - let mut room_events = self.events.write().await; - if let Some(prev_token) = &prev_batch { room_events.push_gap(Gap { prev_token: PaginationToken(prev_token.clone()) }); } From a6232152575ba8fcdbdc72f36893b4e19fa2a884 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 20 Mar 2024 22:13:48 +0100 Subject: [PATCH 13/20] chore(sdk): Make Clippy happy. --- crates/matrix-sdk/src/event_cache/mod.rs | 4 ++-- crates/matrix-sdk/tests/integration/event_cache.rs | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 5d4c22480..ffca4908c 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -685,7 +685,7 @@ impl RoomEventCacheInner { // (backward). The `RoomEvents` API expects the first event to be the oldest. .rev() .cloned() - .map(|timeline_event| SyncTimelineEvent::from(timeline_event)); + .map(SyncTimelineEvent::from); // There is a `token`/gap, let's replace it by new events! if let Some(gap_identifier) = gap_identifier { @@ -755,7 +755,7 @@ impl RoomEventCacheInner { max_wait: Option, ) -> Result> { // Optimistically try to return the backpagination token immediately. - fn get_oldest(room_events: RwLockReadGuard) -> Option { + fn get_oldest(room_events: RwLockReadGuard<'_, RoomEvents>) -> Option { room_events.chunks().find_map(|chunk| match chunk.content() { ChunkContent::Gap(gap) => Some(gap.prev_token.clone()), ChunkContent::Items(..) => None, diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index e4ac43808..54b4b3854 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -509,11 +509,7 @@ async fn test_reset_while_backpaginating() { let rec = room_event_cache.clone(); let first_token_clone = first_token.clone(); - let backpagination = spawn(async move { - let ret = rec.backpaginate(20, first_token_clone).await; - - ret - }); + let backpagination = spawn(async move { rec.backpaginate(20, first_token_clone).await }); // Receive the sync response (which clears the timeline). mock_sync(&server, sync_response_body, None).await; From 8c1d3f4f6043f034f662ef75f2c80b20bd7fd4c6 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 21 Mar 2024 12:29:14 +0100 Subject: [PATCH 14/20] feat(sdk): `RoomEventCacheInner::backpaginate` always return `Ok` for unknown token. Prior to this patch, in `RoomEventCacheInner::backpaginate`, when the `token` validity was checked, and it was invalid: * before calling `/messages`, `Err(EventCacheError::UnknownBackpaginationToken)` was returned, * after calling `/messages`, `Ok(BackPaginationOutput::UnknownBackpaginationToken)` was returned. This patch tries to uniformize this by only returning `Ok(BackPaginationOutput::UnknownBackpaginationToken)`. That's a tradeoff. It will probably be refactor later. The idea is also to call `/messages` **before** taking the write-lock of `RoomEvents`, otherwise it can keep the lock for up to 30secs in this case. Also, checking the validity of the `token` **before** and **after** `/messages` is not necessary: it can be done only after. --- crates/matrix-sdk/src/event_cache/mod.rs | 91 ++++++++++++------- .../tests/integration/event_cache.rs | 14 +-- 2 files changed, 60 insertions(+), 45 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index ffca4908c..663b373cf 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -632,9 +632,18 @@ impl RoomEventCacheInner { // Make sure there's at most one back-pagination request. let _guard = self.pagination_lock.lock().await; - // Make sure the `RoomEvents` isn't updated while we are back-paginating. This - // is really important, for example if a sync is happening while we are - // back-paginating. + // Get messages. + let messages = self + .room + .messages(assign!(MessagesOptions::backward(), { + from: token.as_ref().map(|token| token.0.clone()), + limit: batch_size.into() + })) + .await + .map_err(EventCacheError::SdkError)?; + + // Make sure the `RoomEvents` isn't updated while we are saving events from + // backpagination. let mut room_events = self.events.write().await; // Check that the `token` exists if any. @@ -646,7 +655,7 @@ impl RoomEventCacheInner { // The method has been called with `token` but it doesn't exist in `RoomEvents`, // it's an error. if gap_identifier.is_none() { - return Err(EventCacheError::UnknownBackpaginationToken); + return Ok(BackPaginationOutcome::UnknownBackpaginationToken); } gap_identifier @@ -654,16 +663,6 @@ impl RoomEventCacheInner { None }; - // Get messages. - let messages = self - .room - .messages(assign!(MessagesOptions::backward(), { - from: token.as_ref().map(|token| token.0.clone()), - limit: batch_size.into() - })) - .await - .map_err(EventCacheError::SdkError)?; - // Would we want to backpaginate again, we'd start from the `end` token as the // next `from` token. @@ -834,7 +833,7 @@ mod tests { use matrix_sdk_test::{async_test, sync_timeline_event}; use ruma::room_id; - use super::EventCacheError; + use super::{BackPaginationOutcome, EventCacheError}; use crate::{event_cache::store::PaginationToken, test_utils::logged_in_client}; #[async_test] @@ -853,35 +852,57 @@ mod tests { assert_matches!(result, Err(EventCacheError::NotSubscribedYet)); } - #[async_test] - async fn test_unknown_pagination_token() { - let client = logged_in_client(None).await; - let room_id = room_id!("!galette:saucisse.bzh"); - client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); - - client.event_cache().subscribe().unwrap(); - - let (room_event_cache, _drop_handles) = - client.event_cache().for_room(room_id).await.unwrap(); - let room_event_cache = room_event_cache.unwrap(); - - // If I try to back-paginate with an unknown back-pagination token, - let token = PaginationToken("old".to_owned()); - - // Then I run into an error. - let res = room_event_cache.backpaginate(20, Some(token)).await; - assert_matches!(res.unwrap_err(), EventCacheError::UnknownBackpaginationToken); - } - // Those tests require time to work, and it does not on wasm32. #[cfg(not(target_arch = "wasm32"))] mod time_tests { use std::time::{Duration, Instant}; use matrix_sdk_base::RoomState; + use serde_json::json; use tokio::time::sleep; + use wiremock::{ + matchers::{header, method, path_regex, query_param}, + Mock, ResponseTemplate, + }; use super::{super::store::Gap, *}; + use crate::test_utils::logged_in_client_with_server; + + #[async_test] + async fn test_unknown_pagination_token() { + let (client, server) = logged_in_client_with_server().await; + + let room_id = room_id!("!galette:saucisse.bzh"); + client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); + + client.event_cache().subscribe().unwrap(); + + let (room_event_cache, _drop_handles) = + client.event_cache().for_room(room_id).await.unwrap(); + let room_event_cache = room_event_cache.unwrap(); + + // If I try to back-paginate with an unknown back-pagination token, + let token_name = "unknown"; + let token = PaginationToken(token_name.to_owned()); + + // Then I run into an error. + Mock::given(method("GET")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$")) + .and(header("authorization", "Bearer 1234")) + .and(query_param("from", token_name)) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "start": token_name, + "chunk": [], + }))) + .expect(1) + .mount(&server) + .await; + + let res = room_event_cache.backpaginate(20, Some(token)).await; + assert_matches!(res, Ok(BackPaginationOutcome::UnknownBackpaginationToken)); + + server.verify().await + } #[async_test] async fn test_wait_no_pagination_token() { diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index 54b4b3854..0e2f7dce1 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -515,17 +515,11 @@ async fn test_reset_while_backpaginating() { mock_sync(&server, sync_response_body, None).await; client.sync_once(Default::default()).await.unwrap(); - let outcome = backpagination.await.expect("join failed").unwrap(); + let outcome = backpagination.await.expect("join failed"); - // Backpagination should have been executed before the sync, despite the - // concurrency here. The backpagination should have acquired a write lock before - // the sync. - { - assert_let!(BackPaginationOutcome::Success { events, reached_start } = outcome); - assert!(!reached_start); - assert_event_matches_msg(&events[0].clone().into(), "lalala"); - assert_eq!(events.len(), 1); - } + // Backpagination should be confused, and the operation should result in an + // unknown token. + assert_matches!(outcome, Ok(BackPaginationOutcome::UnknownBackpaginationToken)); // Now if we retrieve the earliest token, it's not the one we had before. // Even better, it's the one from the sync, NOT from the backpagination! From 8fe27ab582411f6c97aff8fdce42ca8f707fe984 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 25 Mar 2024 09:41:37 +0100 Subject: [PATCH 15/20] doc(sdk): Improve documentation of `RoomEvents::replace_gap_at`. --- crates/matrix-sdk/src/event_cache/store.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/store.rs b/crates/matrix-sdk/src/event_cache/store.rs index e5b53612d..0b22b3f09 100644 --- a/crates/matrix-sdk/src/event_cache/store.rs +++ b/crates/matrix-sdk/src/event_cache/store.rs @@ -104,8 +104,8 @@ impl RoomEvents { /// Because the `gap_identifier` can represent non-gap chunk, this method /// returns a `Result`. /// - /// The returned `Chunk` represents the newly created `Chunk` that contains - /// the first events. + /// This method returns a reference to the (first if many) newly created + /// `Chunk` that contains the `items`. pub fn replace_gap_at( &mut self, events: I, From dabc7c512c293f97c37c2994b3658042c6a6d42c Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 25 Mar 2024 09:47:05 +0100 Subject: [PATCH 16/20] !revert test changes --- .../tests/integration/event_cache.rs | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index 0e2f7dce1..dae21ab0d 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -428,12 +428,11 @@ async fn test_reset_while_backpaginating() { JoinedRoomBuilder::new(room_id) // Note to self: a timeline must have at least single event to be properly // serialized. - .add_timeline_event(event_builder.make_sync_message_event_with_id( + .add_timeline_event(event_builder.make_sync_message_event( user_id!("@a:b.c"), - event_id!("$from_first_sync"), RoomMessageEventContent::text_plain("heyo"), )) - .set_timeline_prev_batch("first_prev_batch_token".to_owned()), + .set_timeline_prev_batch("first_backpagination".to_owned()), ); let response_body = sync_builder.build_json_sync_response(); @@ -472,27 +471,25 @@ async fn test_reset_while_backpaginating() { JoinedRoomBuilder::new(room_id) // Note to self: a timeline must have at least single event to be properly // serialized. - .add_timeline_event(event_builder.make_sync_message_event_with_id( + .add_timeline_event(event_builder.make_sync_message_event( user_id!("@a:b.c"), - event_id!("$from_second_sync"), RoomMessageEventContent::text_plain("heyo"), )) - .set_timeline_prev_batch("second_prev_batch_token_from_sync".to_owned()) + .set_timeline_prev_batch("second_backpagination".to_owned()) .set_timeline_limited(), ); let sync_response_body = sync_builder.build_json_sync_response(); // First back-pagination request: - let chunk = non_sync_events!(event_builder, [ (room_id, "$from_backpagination": "lalala") ]); + let chunk = non_sync_events!(event_builder, [ (room_id, "$2": "lalala") ]); let response_json = json!({ "chunk": chunk, "start": "t392-516_47314_0_7_1_1_1_11444_1", - "end": "second_prev_batch_token_from_backpagination" }); Mock::given(method("GET")) .and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$")) .and(header("authorization", "Bearer 1234")) - .and(query_param("from", "first_prev_batch_token")) + .and(query_param("from", "first_backpagination")) .respond_with( ResponseTemplate::new(200) .set_body_json(response_json.clone()) @@ -515,17 +512,16 @@ async fn test_reset_while_backpaginating() { mock_sync(&server, sync_response_body, None).await; client.sync_once(Default::default()).await.unwrap(); - let outcome = backpagination.await.expect("join failed"); + let outcome = backpagination.await.expect("join failed").unwrap(); // Backpagination should be confused, and the operation should result in an // unknown token. - assert_matches!(outcome, Ok(BackPaginationOutcome::UnknownBackpaginationToken)); + assert_matches!(outcome, BackPaginationOutcome::UnknownBackpaginationToken); // Now if we retrieve the earliest token, it's not the one we had before. - // Even better, it's the one from the sync, NOT from the backpagination! let second_token = room_event_cache.oldest_backpagination_token(None).await.unwrap().unwrap(); assert!(first_token.unwrap() != second_token); - assert_eq!(second_token.0, "second_prev_batch_token_from_sync"); + assert_eq!(second_token.0, "second_backpagination"); } #[async_test] From 1fa4bb4cfaeaaf84ccf6de2d49e4bedefaff683c Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 25 Mar 2024 09:49:52 +0100 Subject: [PATCH 17/20] chore(sdk): Remove commented code. --- crates/matrix-sdk/src/event_cache/mod.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 663b373cf..a3c66c0eb 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -249,10 +249,6 @@ impl EventCache { // We could have received events during a previous sync; remove them all, since // we can't know where to insert the "initial events" with respect to // them. - // let mut room_events = room_cache.inner.events.write().await; - // room_events.reset(); - - // let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::Clear); room_cache .inner From 7b092fd1743f21c4f42a18aee9e35a35533ec1ed Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 25 Mar 2024 09:52:53 +0100 Subject: [PATCH 18/20] doc(sdk): Improve documentation of `EventCacheInner::multiple_room_updates_lock`. --- crates/matrix-sdk/src/event_cache/mod.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index a3c66c0eb..e47162578 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -270,6 +270,12 @@ struct EventCacheInner { /// on the owning client. client: Weak, + /// A lock used when many rooms must be updated at once. + /// + /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to + /// ensure that multiple updates will be applied in the correct order, which + /// is enforced by taking this lock when handling an update. + // TODO: that's the place to add a cross-process lock! multiple_room_updates_lock: Mutex<()>, /// Lazily-filled cache of live [`RoomEventCache`], once per room. @@ -277,7 +283,6 @@ struct EventCacheInner { /// Handles to keep alive the task listening to updates. drop_handles: OnceLock>, - // TODO: that's the place to add a cross-process lock! } impl EventCacheInner { From f11cf87326ab7ebd81459414f685bb30d1d9fccd Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 25 Mar 2024 12:18:06 +0100 Subject: [PATCH 19/20] fix(sdk): Replace `unwrap` by `expect` and add `SAFETY` docs. --- crates/matrix-sdk/src/event_cache/mod.rs | 37 +++++++++++++++--------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index e47162578..c18abbbc4 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -694,8 +694,8 @@ impl RoomEventCacheInner { let new_chunk = room_events .replace_gap_at(sync_events, gap_identifier) // SAFETY: we are sure that `gap_identifier` represents a valid - // `ChunkIdentifier` for a gap. - .unwrap(); + // `ChunkIdentifier` for a `Gap` chunk. + .expect("The `gap_identifier` must represent a `Gap`"); new_chunk.first_position() }; @@ -704,9 +704,9 @@ impl RoomEventCacheInner { if let Some(prev_token_gap) = prev_token { room_events .insert_gap_at(prev_token_gap, new_position) - // SAFETY: we are sure that `gap_identifier` represents a valid - // `ChunkIdentifier` for a gap. - .unwrap(); + // SAFETY: we are sure that `new_position` represents a valid + // `ChunkIdentifier` for an `Item` chunk. + .expect("The `new_position` must represent an `Item`"); } trace!("replaced gap with new events from backpagination"); @@ -715,20 +715,30 @@ impl RoomEventCacheInner { //let _ = self.sender.send(RoomEventCacheUpdate::Prepend { events }); Ok(BackPaginationOutcome::Success { events, reached_start }) - } - // There is no `token`/gap identifier. Let's assume we must prepend the new events. - else { + } else { + // There is no `token`/gap identifier. Let's assume we must prepend the new + // events. let first_item_position = - room_events.events().nth(0).map(|(item_position, _)| item_position); + room_events.events().next().map(|(item_position, _)| item_position); match first_item_position { // Is there a first item? Insert at this position. - Some(item_position) => { + Some(first_item_position) => { if let Some(prev_token_gap) = prev_token { - room_events.insert_gap_at(prev_token_gap, item_position).unwrap(); + room_events + .insert_gap_at(prev_token_gap, first_item_position) + // SAFETY: The `first_item_position` can only be an `Item` chunk, it's + // an invariant of `LinkedChunk`. Also, it can only represent a valid + // `ChunkIdentifier` as the data structure isn't modified yet. + .expect("The `first_item_position` must represent a valid `Item`"); } - room_events.insert_events_at(sync_events, item_position).unwrap(); + room_events + .insert_events_at(sync_events, first_item_position) + // SAFETY: The `first_item_position` can only be an `Item` chunk, it's + // an invariant of `LinkedChunk`. The chunk it points to has not been + // removed. + .expect("The `first_item_position` must represent an `Item`"); } // There is no first item. Let's simply push. @@ -1031,8 +1041,7 @@ mod tests { event_cache.subscribe().unwrap(); - let (room_event_cache, _drop_handles) = - client.event_cache().for_room(room_id).await.unwrap(); + let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap(); let room_event_cache = room_event_cache.unwrap(); let expected_token = PaginationToken("old".to_owned()); From 11c3799fa2abcb01e3d28636127ddde5d67bed76 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 27 Mar 2024 09:09:40 +0100 Subject: [PATCH 20/20] doc(sdk): Improve doc of `EventCache`. --- crates/matrix-sdk/src/event_cache/mod.rs | 4 ++-- crates/matrix-sdk/src/event_cache/store.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index c18abbbc4..c72a39f1b 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -517,8 +517,8 @@ impl RoomEventCacheInner { Ok(()) } - // Remove existing events, and append a set of events to the room cache and - // storage, notifying observers. + /// Remove existing events, and append a set of events to the room cache and + /// storage, notifying observers. async fn replace_all_events_by( &self, events: Vec, diff --git a/crates/matrix-sdk/src/event_cache/store.rs b/crates/matrix-sdk/src/event_cache/store.rs index 0b22b3f09..80b87fa50 100644 --- a/crates/matrix-sdk/src/event_cache/store.rs +++ b/crates/matrix-sdk/src/event_cache/store.rs @@ -65,7 +65,7 @@ impl RoomEvents { self.push_events(once(event)) } - /// Push events after existing events. + /// Push events after all events or gaps. /// /// The last event in `events` is the most recent one. pub fn push_events(&mut self, events: I) @@ -76,7 +76,7 @@ impl RoomEvents { self.chunks.push_items_back(events) } - /// Push a gap after existing events. + /// Push a gap after all events or gaps. pub fn push_gap(&mut self, gap: Gap) { self.chunks.push_gap_back(gap) }