diff --git a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs index 0d5cf15c7..a11c61f2c 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs @@ -862,14 +862,12 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore { .unwrap(); // Now let's find the event. - let (position, event) = self + let event = self .find_event(room_id, event_comte.event_id().unwrap().as_ref()) .await .expect("failed to query for finding an event") .expect("failed to find an event"); - assert_eq!(position.chunk_identifier(), 0); - assert_eq!(position.index(), 0); assert_eq!(event.event_id(), event_comte.event_id()); // Now let's try to find an event that exists, but not in the expected room. diff --git a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs index 666d97a5a..dc07dc4fe 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs @@ -206,16 +206,14 @@ impl EventCacheStore for MemoryStore { &self, room_id: &RoomId, event_id: &EventId, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let inner = self.inner.read().unwrap(); - let pos_and_event = - inner.events.items_with_positions().find_map(|(position, event, this_room_id)| { - (room_id == this_room_id && event.event_id()? == event_id) - .then_some((position, event.clone())) - }); + let event = inner.events.items().find_map(|(event, this_room_id)| { + (room_id == this_room_id && event.event_id()? == event_id).then_some(event.clone()) + }); - Ok(pos_and_event) + Ok(event) } async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> { diff --git a/crates/matrix-sdk-base/src/event_cache/store/traits.rs b/crates/matrix-sdk-base/src/event_cache/store/traits.rs index d65a337b3..db4b0f351 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/traits.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/traits.rs @@ -114,7 +114,7 @@ pub trait EventCacheStore: AsyncTraitDeps { &self, room_id: &RoomId, event_id: &EventId, - ) -> Result, Self::Error>; + ) -> Result, Self::Error>; /// Save an event, that might or might not be part of an existing linked /// chunk. @@ -319,7 +319,7 @@ impl EventCacheStore for EraseEventCacheStoreError { &self, room_id: &RoomId, event_id: &EventId, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { self.0.find_event(room_id, event_id).await.map_err(Into::into) } diff --git a/crates/matrix-sdk-common/src/linked_chunk/relational.rs b/crates/matrix-sdk-common/src/linked_chunk/relational.rs index b1074b62d..a94c1751f 100644 --- a/crates/matrix-sdk-common/src/linked_chunk/relational.rs +++ b/crates/matrix-sdk-common/src/linked_chunk/relational.rs @@ -363,23 +363,6 @@ where .flat_map(|(room_id, items)| items.values().map(|item| (item, room_id.as_ref()))) } - /// Return an iterator over all items of all rooms, with their actual - /// positions, in no particular order. - /// - /// This will *not* include out of band items. - // TODO(bnjbvr): see if I can delete this one — reviewier, if this comment is - // still here during review, please let me know. - pub fn items_with_positions(&self) -> impl Iterator { - self.items_chunks.iter().filter_map(|item_row| { - if let Either::Item(item_id) = &item_row.item { - let item = self.items.get(&item_row.room_id)?.get(item_id)?; - Some((item_row.position, item, item_row.room_id.as_ref())) - } else { - None - } - }) - } - /// Save a single item "out-of-band" in the relational linked chunk. pub fn save_item(&mut self, room_id: OwnedRoomId, item: Item) { let id = item.id(); diff --git a/crates/matrix-sdk-sqlite/migrations/event_cache_store/007_events_chunks.sql b/crates/matrix-sdk-sqlite/migrations/event_cache_store/007_events_chunks.sql index 0a830c704..794c1995a 100644 --- a/crates/matrix-sdk-sqlite/migrations/event_cache_store/007_events_chunks.sql +++ b/crates/matrix-sdk-sqlite/migrations/event_cache_store/007_events_chunks.sql @@ -11,6 +11,9 @@ DROP TABLE "events"; -- Events and their content. CREATE TABLE "events" ( + -- The room in which the event is located. + "room_id" BLOB NOT NULL, + -- `OwnedEventId` for events. "event_id" BLOB NOT NULL, diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index b7cc21476..288c1c7ed 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -336,7 +336,7 @@ impl TransactionExtForLinkedChunks for Transaction<'_> { r#" SELECT content FROM events_chunks ec - INNER JOIN events USING (event_id) + INNER JOIN events USING (event_id, room_id) WHERE ec.chunk_id = ? AND ec.room_id = ? ORDER BY ec.position ASC "#, @@ -584,7 +584,7 @@ impl EventCacheStore for SqliteEventCacheStore { // already inserted in the database. This is the case when an event is // deduplicated and moved to another position. let mut content_statement = txn.prepare( - "INSERT OR REPLACE INTO events(event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?)" + "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)" )?; let invalid_event = |event: TimelineEvent| { @@ -603,7 +603,7 @@ impl EventCacheStore for SqliteEventCacheStore { // Now, insert the event content into the database. let encoded_event = this.encode_event(&event)?; - content_statement.execute((event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; + content_statement.execute((&hashed_room_id, event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; } } @@ -625,8 +625,8 @@ impl EventCacheStore for SqliteEventCacheStore { // of the new event. let encoded_event = this.encode_event(&event)?; txn.execute( - "INSERT OR REPLACE INTO events(event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?)" - , (&event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; + "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)" + , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; // Replace the event id in the linked chunk, in case it changed. txn.execute( @@ -966,7 +966,7 @@ impl EventCacheStore for SqliteEventCacheStore { &self, room_id: &RoomId, event_id: &EventId, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id); let event_id = event_id.to_owned(); let this = self.clone(); @@ -974,22 +974,9 @@ impl EventCacheStore for SqliteEventCacheStore { self.acquire() .await? .with_transaction(move |txn| -> Result<_> { - let Some((chunk_identifier, index, event)) = txn - .prepare( - r#" - SELECT chunk_id, position, content - FROM events_chunks ec - INNER JOIN events USING (event_id) - WHERE ec.room_id = ? AND ec.event_id = ? - "#, - )? - .query_row((hashed_room_id, event_id.as_str()), |row| { - Ok(( - row.get::<_, u64>(0)?, - row.get::<_, usize>(1)?, - row.get::<_, Vec>(2)?, - )) - }) + let Some(event) = txn + .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")? + .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec>(0)) .optional()? else { // Event is not found. @@ -998,7 +985,7 @@ impl EventCacheStore for SqliteEventCacheStore { let event = serde_json::from_slice(&this.decode_value(&event)?)?; - Ok(Some((Position::new(ChunkIdentifier::new(chunk_identifier), index), event))) + Ok(Some(event)) }) .await } @@ -1009,6 +996,7 @@ impl EventCacheStore for SqliteEventCacheStore { return Ok(()); }; + let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id); let event_id = event_id.to_string(); let encoded_event = self.encode_event(&event)?; @@ -1016,8 +1004,8 @@ impl EventCacheStore for SqliteEventCacheStore { .await? .with_transaction(move |txn| -> Result<_> { txn.execute( - "INSERT OR REPLACE INTO events(event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?)" - , (&event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; + "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)" + , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; Ok(()) }) diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 319238333..9315556ac 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -29,6 +29,7 @@ use eyeball::SharedObservable; use eyeball_im::VectorDiff; use matrix_sdk_base::{ deserialized_responses::{AmbiguityChange, TimelineEvent}, + linked_chunk::Position, sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline}, }; use ruma::{ @@ -40,7 +41,7 @@ use tokio::sync::{ broadcast::{Receiver, Sender}, mpsc, Notify, RwLock, }; -use tracing::{error, instrument, trace, warn}; +use tracing::{instrument, trace, warn}; use super::{ deduplicator::DeduplicationOutcome, AllEventsCache, AutoShrinkChannelPayload, EventsOrigin, @@ -188,17 +189,14 @@ impl RoomEventCache { /// Try to find an event by id in this room. pub async fn event(&self, event_id: &EventId) -> Option { // Search in all loaded or stored events. - let Ok(maybe_position_and_event) = self.inner.state.read().await.find_event(event_id).await - else { - error!("Failed to find the event"); - - return None; - }; + if let Ok(Some((_location, event))) = + self.inner.state.read().await.find_event(event_id).await + { + return Some(event); + } // Search in `AllEventsCache` for known events that are not stored. - if let Some(event) = maybe_position_and_event.map(|(_location, _position, event)| event) { - Some(event) - } else if let Some((room_id, event)) = + if let Some((room_id, event)) = self.inner.all_events.read().await.events.get(event_id).cloned() { (room_id == self.inner.room_id).then_some(event) @@ -1241,14 +1239,12 @@ mod private { pub async fn find_event( &self, event_id: &EventId, - ) -> Result, EventCacheError> { - let room_id = self.room.as_ref(); - + ) -> Result, EventCacheError> { // There are supposedly fewer events loaded in memory than in the store. Let's // start by looking up in the `RoomEvents`. for (position, event) in self.events().revents() { if event.event_id().as_deref() == Some(event_id) { - return Ok(Some((EventLocation::Memory, position, event.clone()))); + return Ok(Some((EventLocation::Memory(position), event.clone()))); } } @@ -1260,9 +1256,9 @@ mod private { let store = store.lock().await?; Ok(store - .find_event(room_id, event_id) + .find_event(&self.room, event_id) .await? - .map(|(position, event)| (EventLocation::Store, position, event))) + .map(|event| (EventLocation::Store, event))) } /// Gives a temporary mutable handle to the underlying in-memory events, @@ -1339,7 +1335,7 @@ mod private { }; // Replace the redacted event by a redacted form, if we knew about it. - if let Some((location, position, target_event)) = self.find_event(event_id).await? { + if let Some((location, target_event)) = self.find_event(event_id).await? { // Don't redact already redacted events. if let Ok(deserialized) = target_event.raw().deserialize() { match deserialized { @@ -1370,17 +1366,24 @@ mod private { copy.replace_raw(redacted_event.cast()); match location { - EventLocation::Memory => { + EventLocation::Memory(position) => { self.events .replace_event_at(position, copy) .expect("should have been a valid position of an item"); } EventLocation::Store => { - self.send_updates_to_store(vec![Update::ReplaceItem { - at: position, - item: copy, - }]) - .await?; + if let Some(store) = self.store.get() { + let store = store.clone(); + let room_id = self.room.clone(); + // Spawn a task so the save is uninterrupted by task cancellation. + spawn(async move { + let store = store.lock().await?; + store.save_event(&room_id, copy).await?; + super::Result::Ok(()) + }) + .await + .expect("joining failed")?; + } } } } @@ -1398,7 +1401,7 @@ mod private { /// An enum representing where an event has been found. pub(super) enum EventLocation { /// Event lives in memory (and likely in the store!). - Memory, + Memory(Position), /// Event lives in the store only, it has not been loaded in memory yet. Store, @@ -1943,9 +1946,11 @@ mod tests { assert_eq!(diffs.len(), 1); assert_let!(VectorDiff::Clear = &diffs[0]); - // The room event cache has forgotten about the events. - assert!(room_event_cache.event(event_id1).await.is_none()); + // Events individually are not forgotten by the event cache, after clearing a + // room. + assert!(room_event_cache.event(event_id1).await.is_some()); + // But their presence in a linked chunk is forgotten. let (items, _) = room_event_cache.subscribe().await; assert!(items.is_empty());