From 31df84f5a11039ff97d2ee2e41d7da5888ac4925 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 19 Jun 2025 14:33:10 +0200 Subject: [PATCH] feat(event cache): return the event's positions in `find_event_relations` --- .../event_cache/store/integration_tests.rs | 46 ++++++++++++++++--- .../src/event_cache/store/memory_store.rs | 8 ++-- .../src/event_cache/store/traits.rs | 10 ++-- .../src/event_cache_store.rs | 34 ++++++++++---- crates/matrix-sdk/src/event_cache/room/mod.rs | 8 +++- 5 files changed, 83 insertions(+), 23 deletions(-) diff --git a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs index d44f4f8eb..43d1a921c 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs @@ -1020,7 +1020,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore { // Save All The Things! self.save_event(room_id, e1).await.unwrap(); self.save_event(room_id, edit_e1).await.unwrap(); - self.save_event(room_id, reaction_e1).await.unwrap(); + self.save_event(room_id, reaction_e1.clone()).await.unwrap(); self.save_event(room_id, e2).await.unwrap(); self.save_event(another_room_id, e3).await.unwrap(); self.save_event(another_room_id, reaction_e3).await.unwrap(); @@ -1028,8 +1028,13 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore { // Finding relations without a filter returns all of them. let relations = self.find_event_relations(room_id, eid1, None).await.unwrap(); assert_eq!(relations.len(), 2); - assert!(relations.iter().any(|r| r.event_id().as_deref() == Some(edit_eid1))); - assert!(relations.iter().any(|r| r.event_id().as_deref() == Some(reaction_eid1))); + // The position is `None` for items outside the linked chunk. + assert!(relations + .iter() + .any(|(ev, pos)| ev.event_id().as_deref() == Some(edit_eid1) && pos.is_none())); + assert!(relations + .iter() + .any(|(ev, pos)| ev.event_id().as_deref() == Some(reaction_eid1) && pos.is_none())); // Finding relations with a filter only returns a subset. let relations = self @@ -1037,7 +1042,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore { .await .unwrap(); assert_eq!(relations.len(), 1); - assert_eq!(relations[0].event_id().as_deref(), Some(edit_eid1)); + assert_eq!(relations[0].0.event_id().as_deref(), Some(edit_eid1)); let relations = self .find_event_relations( @@ -1048,8 +1053,8 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore { .await .unwrap(); assert_eq!(relations.len(), 2); - assert!(relations.iter().any(|r| r.event_id().as_deref() == Some(edit_eid1))); - assert!(relations.iter().any(|r| r.event_id().as_deref() == Some(reaction_eid1))); + assert!(relations.iter().any(|r| r.0.event_id().as_deref() == Some(edit_eid1))); + assert!(relations.iter().any(|r| r.0.event_id().as_deref() == Some(reaction_eid1))); // We can't find relations using the wrong room. let relations = self @@ -1057,6 +1062,35 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore { .await .unwrap(); assert!(relations.is_empty()); + + // But if an event exists in the linked chunk, we may have its position when + // it's found as a relationship. + + // Add reaction_e1 to the room's linked chunk. + self.handle_linked_chunk_updates( + LinkedChunkId::Room(room_id), + vec![ + Update::NewItemsChunk { previous: None, new: CId::new(0), next: None }, + Update::PushItems { at: Position::new(CId::new(0), 0), items: vec![reaction_e1] }, + ], + ) + .await + .unwrap(); + + // When looking for aggregations to e1, we should have the position for + // reaction_e1. + let relations = self.find_event_relations(room_id, eid1, None).await.unwrap(); + + // The position is set for `reaction_eid1` now. + assert!(relations.iter().any(|(ev, pos)| { + ev.event_id().as_deref() == Some(reaction_eid1) + && *pos == Some(Position::new(CId::new(0), 0)) + })); + + // But it's still not set for the other related events. + assert!(relations + .iter() + .any(|(ev, pos)| ev.event_id().as_deref() == Some(edit_eid1) && pos.is_none())); } async fn test_save_event(&self) { diff --git a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs index cb1bf5fef..eb7d70381 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs @@ -238,7 +238,7 @@ impl EventCacheStore for MemoryStore { room_id: &RoomId, event_id: &EventId, filters: Option<&[RelationType]>, - ) -> Result, Self::Error> { + ) -> Result)>, Self::Error> { let inner = self.inner.read().unwrap(); let target_linked_chunk_id = OwnedLinkedChunkId::Room(room_id.to_owned()); @@ -248,7 +248,7 @@ impl EventCacheStore for MemoryStore { let related_events = inner .events .items(&target_linked_chunk_id) - .filter_map(|(event, _pos)| { + .filter_map(|(event, pos)| { // Must have a relation. let (related_to, rel_type) = extract_event_relation(event.raw())?; @@ -259,9 +259,9 @@ impl EventCacheStore for MemoryStore { // Must not be filtered out. if let Some(filters) = &filters { - filters.contains(&rel_type).then_some(event.clone()) + filters.contains(&rel_type).then_some((event.clone(), pos)) } else { - Some(event.clone()) + Some((event.clone(), pos)) } }) .collect(); diff --git a/crates/matrix-sdk-base/src/event_cache/store/traits.rs b/crates/matrix-sdk-base/src/event_cache/store/traits.rs index 1a5f9f321..e48a46c92 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/traits.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/traits.rs @@ -134,7 +134,11 @@ pub trait EventCacheStore: AsyncTraitDeps { event_id: &EventId, ) -> Result, Self::Error>; - /// Find all the events that relate to a given event. + /// Find all the events (alongside their position in the room's linked + /// chunk, if available) that relate to a given event. + /// + /// The only events which don't have a position are those which have been + /// saved out-of-band using [`Self::save_event`]. /// /// Note: it doesn't process relations recursively: for instance, if /// requesting only thread events, it will NOT return the aggregated @@ -148,7 +152,7 @@ pub trait EventCacheStore: AsyncTraitDeps { room_id: &RoomId, event_id: &EventId, filter: Option<&[RelationType]>, - ) -> Result, Self::Error>; + ) -> Result)>, Self::Error>; /// Save an event, that might or might not be part of an existing linked /// chunk. @@ -373,7 +377,7 @@ impl EventCacheStore for EraseEventCacheStoreError { room_id: &RoomId, event_id: &EventId, filter: Option<&[RelationType]>, - ) -> Result, Self::Error> { + ) -> Result)>, Self::Error> { self.0.find_event_relations(room_id, event_id, filter).await.map_err(Into::into) } diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index f3f0f6298..fc7c5fe15 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -1165,9 +1165,12 @@ impl EventCacheStore for SqliteEventCacheStore { room_id: &RoomId, event_id: &EventId, filters: Option<&[RelationType]>, - ) -> Result, Self::Error> { + ) -> Result)>, Self::Error> { let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id); + let hashed_linked_chunk_id = + self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key()); + let event_id = event_id.to_owned(); let filters = filters.map(ToOwned::to_owned); let this = self.clone(); @@ -1190,19 +1193,34 @@ impl EventCacheStore for SqliteEventCacheStore { }; let query = format!( - "SELECT content FROM events WHERE relates_to = ? AND room_id = ? {filter_query}" + "SELECT events.content, event_chunks.chunk_id, event_chunks.position + FROM events + LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ? + WHERE relates_to = ? AND room_id = ? {filter_query}" ); // Collect related events. let mut related = Vec::new(); - for ev in - txn.prepare(&query)?.query_map((event_id.as_str(), hashed_room_id), |row| { - row.get::<_, Vec>(0) + for result in + txn.prepare(&query)?.query_map((hashed_linked_chunk_id, event_id.as_str(), hashed_room_id), |row| { + Ok(( + row.get::<_, Vec>(0)?, + row.get::<_, Option>(1)?, + row.get::<_, Option>(2)?, + )) })? { - let ev = ev?; - let ev = serde_json::from_slice(&this.decode_value(&ev)?)?; - related.push(ev); + let (event_blob, chunk_id, index) = result?; + + let event: Event = serde_json::from_slice(&this.decode_value(&event_blob)?)?; + + // Only build the position if both the chunk_id and position were present; in + // theory, they should either be present at the same time, or not at all. + let pos = chunk_id.zip(index).map(|(chunk_id, index)| { + Position::new(ChunkIdentifier::new(chunk_id), index) + }); + + related.push((event, pos)); } Ok(related) diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 04edee51f..1ee6e5bdf 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -1176,7 +1176,8 @@ mod private { // transitive closure of all the related events. let mut related = store.find_event_relations(&self.room, event_id, filters.as_deref()).await?; - let mut stack = related.iter().filter_map(|event| event.event_id()).collect::>(); + let mut stack = + related.iter().filter_map(|(event, _pos)| event.event_id()).collect::>(); // Also keep track of already seen events, in case there's a loop in the // relation graph. @@ -1195,7 +1196,7 @@ mod private { let other_related = store.find_event_relations(&self.room, &event_id, filters.as_deref()).await?; - stack.extend(other_related.iter().filter_map(|event| event.event_id())); + stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id())); related.extend(other_related); num_iters += 1; @@ -1203,6 +1204,9 @@ mod private { trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events"); + // Keep only the events, not their positions. + let related = related.into_iter().map(|(event, _pos)| event).collect(); + Ok(Some((target, related))) }