feat(event cache): return the event's positions in find_event_relations

This commit is contained in:
Benjamin Bouvier
2025-06-19 14:33:10 +02:00
parent e68bdf8460
commit 31df84f5a1
5 changed files with 83 additions and 23 deletions

View File

@@ -1020,7 +1020,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
// Save All The Things!
self.save_event(room_id, e1).await.unwrap();
self.save_event(room_id, edit_e1).await.unwrap();
self.save_event(room_id, reaction_e1).await.unwrap();
self.save_event(room_id, reaction_e1.clone()).await.unwrap();
self.save_event(room_id, e2).await.unwrap();
self.save_event(another_room_id, e3).await.unwrap();
self.save_event(another_room_id, reaction_e3).await.unwrap();
@@ -1028,8 +1028,13 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
// Finding relations without a filter returns all of them.
let relations = self.find_event_relations(room_id, eid1, None).await.unwrap();
assert_eq!(relations.len(), 2);
assert!(relations.iter().any(|r| r.event_id().as_deref() == Some(edit_eid1)));
assert!(relations.iter().any(|r| r.event_id().as_deref() == Some(reaction_eid1)));
// The position is `None` for items outside the linked chunk.
assert!(relations
.iter()
.any(|(ev, pos)| ev.event_id().as_deref() == Some(edit_eid1) && pos.is_none()));
assert!(relations
.iter()
.any(|(ev, pos)| ev.event_id().as_deref() == Some(reaction_eid1) && pos.is_none()));
// Finding relations with a filter only returns a subset.
let relations = self
@@ -1037,7 +1042,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
.await
.unwrap();
assert_eq!(relations.len(), 1);
assert_eq!(relations[0].event_id().as_deref(), Some(edit_eid1));
assert_eq!(relations[0].0.event_id().as_deref(), Some(edit_eid1));
let relations = self
.find_event_relations(
@@ -1048,8 +1053,8 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
.await
.unwrap();
assert_eq!(relations.len(), 2);
assert!(relations.iter().any(|r| r.event_id().as_deref() == Some(edit_eid1)));
assert!(relations.iter().any(|r| r.event_id().as_deref() == Some(reaction_eid1)));
assert!(relations.iter().any(|r| r.0.event_id().as_deref() == Some(edit_eid1)));
assert!(relations.iter().any(|r| r.0.event_id().as_deref() == Some(reaction_eid1)));
// We can't find relations using the wrong room.
let relations = self
@@ -1057,6 +1062,35 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
.await
.unwrap();
assert!(relations.is_empty());
// But if an event exists in the linked chunk, we may have its position when
// it's found as a relationship.
// Add reaction_e1 to the room's linked chunk.
self.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Update::PushItems { at: Position::new(CId::new(0), 0), items: vec![reaction_e1] },
],
)
.await
.unwrap();
// When looking for aggregations to e1, we should have the position for
// reaction_e1.
let relations = self.find_event_relations(room_id, eid1, None).await.unwrap();
// The position is set for `reaction_eid1` now.
assert!(relations.iter().any(|(ev, pos)| {
ev.event_id().as_deref() == Some(reaction_eid1)
&& *pos == Some(Position::new(CId::new(0), 0))
}));
// But it's still not set for the other related events.
assert!(relations
.iter()
.any(|(ev, pos)| ev.event_id().as_deref() == Some(edit_eid1) && pos.is_none()));
}
async fn test_save_event(&self) {

View File

@@ -238,7 +238,7 @@ impl EventCacheStore for MemoryStore {
room_id: &RoomId,
event_id: &EventId,
filters: Option<&[RelationType]>,
) -> Result<Vec<Event>, Self::Error> {
) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
let inner = self.inner.read().unwrap();
let target_linked_chunk_id = OwnedLinkedChunkId::Room(room_id.to_owned());
@@ -248,7 +248,7 @@ impl EventCacheStore for MemoryStore {
let related_events = inner
.events
.items(&target_linked_chunk_id)
.filter_map(|(event, _pos)| {
.filter_map(|(event, pos)| {
// Must have a relation.
let (related_to, rel_type) = extract_event_relation(event.raw())?;
@@ -259,9 +259,9 @@ impl EventCacheStore for MemoryStore {
// Must not be filtered out.
if let Some(filters) = &filters {
filters.contains(&rel_type).then_some(event.clone())
filters.contains(&rel_type).then_some((event.clone(), pos))
} else {
Some(event.clone())
Some((event.clone(), pos))
}
})
.collect();

View File

@@ -134,7 +134,11 @@ pub trait EventCacheStore: AsyncTraitDeps {
event_id: &EventId,
) -> Result<Option<Event>, Self::Error>;
/// Find all the events that relate to a given event.
/// Find all the events (alongside their position in the room's linked
/// chunk, if available) that relate to a given event.
///
/// The only events which don't have a position are those which have been
/// saved out-of-band using [`Self::save_event`].
///
/// Note: it doesn't process relations recursively: for instance, if
/// requesting only thread events, it will NOT return the aggregated
@@ -148,7 +152,7 @@ pub trait EventCacheStore: AsyncTraitDeps {
room_id: &RoomId,
event_id: &EventId,
filter: Option<&[RelationType]>,
) -> Result<Vec<Event>, Self::Error>;
) -> Result<Vec<(Event, Option<Position>)>, Self::Error>;
/// Save an event, that might or might not be part of an existing linked
/// chunk.
@@ -373,7 +377,7 @@ impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
room_id: &RoomId,
event_id: &EventId,
filter: Option<&[RelationType]>,
) -> Result<Vec<Event>, Self::Error> {
) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
self.0.find_event_relations(room_id, event_id, filter).await.map_err(Into::into)
}

View File

@@ -1165,9 +1165,12 @@ impl EventCacheStore for SqliteEventCacheStore {
room_id: &RoomId,
event_id: &EventId,
filters: Option<&[RelationType]>,
) -> Result<Vec<Event>, Self::Error> {
) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
let hashed_linked_chunk_id =
self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
let event_id = event_id.to_owned();
let filters = filters.map(ToOwned::to_owned);
let this = self.clone();
@@ -1190,19 +1193,34 @@ impl EventCacheStore for SqliteEventCacheStore {
};
let query = format!(
"SELECT content FROM events WHERE relates_to = ? AND room_id = ? {filter_query}"
"SELECT events.content, event_chunks.chunk_id, event_chunks.position
FROM events
LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
WHERE relates_to = ? AND room_id = ? {filter_query}"
);
// Collect related events.
let mut related = Vec::new();
for ev in
txn.prepare(&query)?.query_map((event_id.as_str(), hashed_room_id), |row| {
row.get::<_, Vec<u8>>(0)
for result in
txn.prepare(&query)?.query_map((hashed_linked_chunk_id, event_id.as_str(), hashed_room_id), |row| {
Ok((
row.get::<_, Vec<u8>>(0)?,
row.get::<_, Option<u64>>(1)?,
row.get::<_, Option<usize>>(2)?,
))
})?
{
let ev = ev?;
let ev = serde_json::from_slice(&this.decode_value(&ev)?)?;
related.push(ev);
let (event_blob, chunk_id, index) = result?;
let event: Event = serde_json::from_slice(&this.decode_value(&event_blob)?)?;
// Only build the position if both the chunk_id and position were present; in
// theory, they should either be present at the same time, or not at all.
let pos = chunk_id.zip(index).map(|(chunk_id, index)| {
Position::new(ChunkIdentifier::new(chunk_id), index)
});
related.push((event, pos));
}
Ok(related)

View File

@@ -1176,7 +1176,8 @@ mod private {
// transitive closure of all the related events.
let mut related =
store.find_event_relations(&self.room, event_id, filters.as_deref()).await?;
let mut stack = related.iter().filter_map(|event| event.event_id()).collect::<Vec<_>>();
let mut stack =
related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
// Also keep track of already seen events, in case there's a loop in the
// relation graph.
@@ -1195,7 +1196,7 @@ mod private {
let other_related =
store.find_event_relations(&self.room, &event_id, filters.as_deref()).await?;
stack.extend(other_related.iter().filter_map(|event| event.event_id()));
stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
related.extend(other_related);
num_iters += 1;
@@ -1203,6 +1204,9 @@ mod private {
trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
// Keep only the events, not their positions.
let related = related.into_iter().map(|(event, _pos)| event).collect();
Ok(Some((target, related)))
}