refactor(search): Move event processing out.

This commit is contained in:
Shrey Patel
2025-08-27 16:54:49 +01:00
committed by Damir Jelić
parent bcabf1bda4
commit 412d4b80ee
7 changed files with 220 additions and 172 deletions

View File

@@ -15,8 +15,7 @@
use std::{fmt, fs, path::Path, sync::Arc};
use ruma::{
EventId, OwnedEventId, OwnedRoomId, RoomId, events::AnySyncMessageLikeEvent,
room_version_rules::RedactionRules,
EventId, OwnedEventId, OwnedRoomId, RoomId, events::room::message::OriginalSyncRoomMessageEvent,
};
use tantivy::{
Index, IndexReader, TantivyDocument,
@@ -35,16 +34,17 @@ use crate::{
};
/// A struct to represent the operations on a [`RoomIndex`]
pub(crate) enum RoomIndexOperation {
/// Add this document to the index.
Add(TantivyDocument),
#[derive(Debug)]
pub enum RoomIndexOperation {
/// Add this event to the index.
Add(OriginalSyncRoomMessageEvent),
/// Remove all documents in the index where
/// [`MatrixSearchIndexSchema::deletion_key()`] matches this event id.
/// `MatrixSearchIndexSchema::deletion_key()` matches this event id.
Remove(OwnedEventId),
/// Replace all documents in the index where
/// [`MatrixSearchIndexSchema::deletion_key()`] matches this event id with
/// the new document.
Edit(OwnedEventId, TantivyDocument),
/// `MatrixSearchIndexSchema::deletion_key()` matches this event id with
/// the new event.
Edit(OwnedEventId, OriginalSyncRoomMessageEvent),
/// Do nothing.
Noop,
}
@@ -137,37 +137,6 @@ impl RoomIndex {
RoomIndex::new_with(index, schema, room_id)
}
/// Handle [`AnySyncMessageLikeEvent`]
///
/// This which will add/remove/edit an event in the index based on the
/// event type.
pub fn handle_event(
&mut self,
event: AnySyncMessageLikeEvent,
redaction_rules: &RedactionRules,
) -> Result<(), IndexError> {
let event_id = event.event_id().to_owned();
match self.schema.handle_event(event, redaction_rules)? {
RoomIndexOperation::Add(document) => {
if !self.contains(&event_id) {
self.writer.add(document)?;
}
}
RoomIndexOperation::Remove(event_id) => {
self.writer.remove(&event_id);
}
RoomIndexOperation::Edit(remove_event_id, document) => {
self.writer.remove(&remove_event_id);
if !self.contains(&event_id) {
self.writer.add(document)?;
}
}
RoomIndexOperation::Noop => {}
}
Ok(())
}
/// Commit added events to [`RoomIndex`]
pub fn commit(&mut self) -> Result<OpStamp, IndexError> {
let last_commit_opstamp = self.writer.commit()?; // TODO: This is blocking. Handle it.
@@ -218,6 +187,31 @@ impl RoomIndex {
Ok(ret)
}
/// Execute [`RoomIndexOperation`]
///
/// This which will add/remove/edit an event in the index based on the
/// operation.
pub fn execute(&mut self, operation: RoomIndexOperation) -> Result<(), IndexError> {
match operation {
RoomIndexOperation::Add(event) => {
if !self.contains(&event.event_id) {
self.writer.add(self.schema.make_doc(event)?)?;
}
}
RoomIndexOperation::Remove(event_id) => {
self.writer.remove(&event_id);
}
RoomIndexOperation::Edit(remove_event_id, event) => {
self.writer.remove(&remove_event_id);
if !self.contains(&event.event_id) {
self.writer.add(self.schema.make_doc(event)?)?;
}
}
RoomIndexOperation::Noop => {}
}
Ok(())
}
fn contains(&self, event_id: &EventId) -> bool {
let search_result = self.search(format!("event_id:\"{event_id}\"").as_str(), 1);
match search_result {
@@ -236,11 +230,52 @@ mod tests {
use matrix_sdk_test::event_factory::EventFactory;
use ruma::{
event_id, events::room::message::RoomMessageEventContentWithoutRelation, room_id,
room_version_rules::RedactionRules, user_id,
EventId, event_id,
events::{
AnySyncMessageLikeEvent,
room::message::{OriginalSyncRoomMessageEvent, RoomMessageEventContentWithoutRelation},
},
room_id, user_id,
};
use crate::index::RoomIndex;
use crate::{
error::IndexError,
index::{RoomIndex, RoomIndexOperation},
};
/// Helper function to add a regular message to the index
///
/// # Panic
/// Panics when event is not a [`OriginalSyncRoomMessageEvent`] with no
/// relations.
fn index_message(
index: &mut RoomIndex,
event: AnySyncMessageLikeEvent,
) -> Result<(), IndexError> {
if let AnySyncMessageLikeEvent::RoomMessage(ev) = event
&& let Some(ev) = ev.as_original()
&& ev.content.relates_to.is_none()
{
return index.execute(RoomIndexOperation::Add(ev.clone()));
}
panic!("Event was not a relationless OriginalSyncRoomMessageEvent.")
}
/// Helper function to remove events to the index
fn index_remove(index: &mut RoomIndex, event_id: &EventId) -> Result<(), IndexError> {
index.execute(RoomIndexOperation::Remove(event_id.to_owned()))
}
/// Helper function to edit events in index
///
/// Edit event with `event_id` into new [`OriginalSyncRoomMessageEvent`]
fn index_edit(
index: &mut RoomIndex,
event_id: &EventId,
new: OriginalSyncRoomMessageEvent,
) -> Result<(), IndexError> {
index.execute(RoomIndexOperation::Edit(event_id.to_owned(), new))
}
#[test]
fn test_make_index_in_memory() {
@@ -251,7 +286,7 @@ mod tests {
}
#[test]
fn test_handle_event() {
fn test_add_event() {
let room_id = room_id!("!room_id:localhost");
let mut index =
RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}");
@@ -263,7 +298,7 @@ mod tests {
.sender(user_id!("@user_id:localhost"))
.into_any_sync_message_like_event();
index.handle_event(event, &RedactionRules::V11).expect("failed to add event: {res:?}");
index_message(&mut index, event).expect("failed to add event: {res:?}");
}
#[test]
@@ -278,23 +313,23 @@ mod tests {
let user_id = user_id!("@user_id:localhost");
let f = EventFactory::new().room(room_id).sender(user_id);
index.handle_event(
index_message(
&mut index,
f.text_msg("This is a sentence")
.event_id(event_id_1)
.into_any_sync_message_like_event(),
&RedactionRules::V11,
)?;
index.handle_event(
index_message(
&mut index,
f.text_msg("All new words").event_id(event_id_2).into_any_sync_message_like_event(),
&RedactionRules::V11,
)?;
index.handle_event(
index_message(
&mut index,
f.text_msg("A similar sentence")
.event_id(event_id_3)
.into_any_sync_message_like_event(),
&RedactionRules::V11,
)?;
index.commit_and_reload()?;
@@ -351,7 +386,7 @@ mod tests {
.sender(user_id!("@user_id:localhost"))
.into_any_sync_message_like_event();
index.handle_event(event, &RedactionRules::V11)?;
index_message(&mut index, event)?;
index.commit_and_reload()?;
@@ -361,7 +396,7 @@ mod tests {
}
#[test]
fn test_indexing_idempotency() -> Result<(), Box<dyn Error>> {
fn test_index_add_idempotency() -> Result<(), Box<dyn Error>> {
let room_id = room_id!("!room_id:localhost");
let mut index = RoomIndex::new_in_memory(room_id)?;
@@ -373,14 +408,14 @@ mod tests {
.sender(user_id!("@user_id:localhost"))
.into_any_sync_message_like_event();
index.handle_event(event.clone(), &RedactionRules::V11)?;
index_message(&mut index, event.clone())?;
index.commit_and_reload()?;
assert!(index.contains(event_id), "Index should contain event");
// indexing again should do nothing
index.handle_event(event, &RedactionRules::V11)?;
index_message(&mut index, event)?;
index.commit_and_reload()?;
@@ -394,7 +429,7 @@ mod tests {
}
#[test]
fn test_redaction_removes_event() -> Result<(), Box<dyn Error>> {
fn test_remove_event() -> Result<(), Box<dyn Error>> {
let room_id = room_id!("!room_id:localhost");
let mut index = RoomIndex::new_in_memory(room_id)?;
@@ -405,17 +440,13 @@ mod tests {
let event =
f.text_msg("This is a sentence").event_id(event_id).into_any_sync_message_like_event();
index.handle_event(event, &RedactionRules::V11)?;
index_message(&mut index, event)?;
index.commit_and_reload()?;
assert!(index.contains(event_id), "Index should contain event");
let redaction_event_id = event_id!("$redaction_event_id:localhost");
let redaction =
f.redaction(event_id).event_id(redaction_event_id).into_any_sync_message_like_event();
index.handle_event(redaction, &RedactionRules::V11)?;
index_remove(&mut index, event_id)?;
index.commit_and_reload()?;
@@ -438,7 +469,7 @@ mod tests {
.event_id(old_event_id)
.into_any_sync_message_like_event();
index.handle_event(old_event, &RedactionRules::V11)?;
index_message(&mut index, old_event)?;
index.commit_and_reload()?;
@@ -452,9 +483,9 @@ mod tests {
RoomMessageEventContentWithoutRelation::text_plain("This is a brand new sentence!"),
)
.event_id(new_event_id)
.into_any_sync_message_like_event();
.into_original_sync_room_message_event();
index.handle_event(edit, &RedactionRules::V11)?;
index_edit(&mut index, old_event_id, edit)?;
index.commit_and_reload()?;

View File

@@ -12,23 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use ruma::{
events::{
AnySyncMessageLikeEvent, SyncMessageLikeEvent,
room::message::{MessageType, Relation, RoomMessageEventContent},
},
room_version_rules::RedactionRules,
};
use ruma::events::room::message::{MessageType, OriginalSyncRoomMessageEvent, Relation};
use tantivy::{
DateTime, doc,
DateTime, TantivyDocument, doc,
schema::{DateOptions, DateTimePrecision, Field, INDEXED, STORED, STRING, Schema, TEXT},
};
use tracing::trace;
use crate::{
error::{IndexError, IndexSchemaError},
index::RoomIndexOperation,
};
use crate::error::{IndexError, IndexSchemaError};
pub(crate) trait MatrixSearchIndexSchema {
fn new() -> Self;
@@ -36,11 +26,7 @@ pub(crate) trait MatrixSearchIndexSchema {
fn primary_key(&self) -> Field;
fn deletion_key(&self) -> Field;
fn as_tantivy_schema(&self) -> Schema;
fn handle_event(
&self,
event: AnySyncMessageLikeEvent,
redaction_rules: &RedactionRules,
) -> Result<RoomIndexOperation, IndexError>;
fn make_doc(&self, event: OriginalSyncRoomMessageEvent) -> Result<TantivyDocument, IndexError>;
}
#[derive(Debug, Clone)]
@@ -57,39 +43,6 @@ pub(crate) struct RoomMessageSchema {
default_search_fields: Vec<Field>,
}
impl RoomMessageSchema {
/// Given an [`SyncMessageLikeEvent<RoomMessageEventContent>`] return a
/// [`RoomIndexOperation`].
fn handle_message(
&self,
event: SyncMessageLikeEvent<RoomMessageEventContent>,
) -> Result<RoomIndexOperation, IndexError> {
let unredacted = event.as_original().ok_or(IndexError::CannotIndexRedactedMessage)?;
let body = match &unredacted.content.msgtype {
MessageType::Text(content) => Ok(content.body.clone()),
_ => Err(IndexError::MessageTypeNotSupported),
}?;
let mut document = doc!(
self.event_id_field => unredacted.event_id.to_string(),
self.body_field => body,
self.date_field =>
DateTime::from_timestamp_millis(
unredacted.origin_server_ts.get().into()),
self.sender_field => unredacted.sender.to_string(),
);
if let Some(Relation::Replacement(replacement_data)) = &unredacted.content.relates_to {
document.add_text(self.original_event_id_field, replacement_data.event_id.clone());
Ok(RoomIndexOperation::Edit(replacement_data.event_id.clone(), document))
} else {
document.add_text(self.original_event_id_field, unredacted.event_id.clone());
Ok(RoomIndexOperation::Add(document))
}
}
}
impl MatrixSearchIndexSchema for RoomMessageSchema {
fn new() -> Self {
let mut schema = Schema::builder();
@@ -134,26 +87,30 @@ impl MatrixSearchIndexSchema for RoomMessageSchema {
self.inner.clone()
}
fn handle_event(
&self,
event: AnySyncMessageLikeEvent,
redaction_rules: &RedactionRules,
) -> Result<RoomIndexOperation, IndexError> {
match event {
AnySyncMessageLikeEvent::RoomMessage(event) => self.handle_message(event),
AnySyncMessageLikeEvent::RoomRedaction(redaction_event) => {
if let Some(redacted_event_id) = redaction_event.redacts(redaction_rules) {
Ok(RoomIndexOperation::Remove(redacted_event_id.to_owned()))
} else {
// If not acting on anything, we can just ignore it.
trace!("Room redaction in indexing redacts nothing, ignoring.");
Ok(RoomIndexOperation::Noop)
}
}
/// Given an [`OriginalSyncRoomMessageEvent`] return a
/// [`TantivyDocument`].
fn make_doc(&self, event: OriginalSyncRoomMessageEvent) -> Result<TantivyDocument, IndexError> {
let body = match &event.content.msgtype {
MessageType::Text(content) => Ok(content.body.clone()),
_ => Err(IndexError::MessageTypeNotSupported),
}?;
let mut document = doc!(
self.event_id_field => event.event_id.to_string(),
self.body_field => body,
self.date_field =>
DateTime::from_timestamp_millis(
event.origin_server_ts.get().into()),
self.sender_field => event.sender.to_string(),
);
if let Some(Relation::Replacement(replacement_data)) = &event.content.relates_to {
document.add_text(self.original_event_id_field, replacement_data.event_id.clone());
} else {
document.add_text(self.original_event_id_field, event.event_id);
}
Ok(document)
}
}

View File

@@ -16,7 +16,7 @@ features = ["docsrs"]
rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"]
[features]
default = ["e2e-encryption", "automatic-room-key-forwarding", "sqlite", "native-tls"]
default = ["e2e-encryption", "automatic-room-key-forwarding", "sqlite", "native-tls", "experimental-search"]
testing = [
"matrix-sdk-sqlite?/testing",
"matrix-sdk-indexeddb?/testing",

View File

@@ -14,11 +14,11 @@
use std::{collections::hash_map::HashMap, path::PathBuf, sync::Arc};
use matrix_sdk_search::{error::IndexError, index::RoomIndex};
use ruma::{
events::AnySyncMessageLikeEvent, room_version_rules::RedactionRules, OwnedEventId, OwnedRoomId,
RoomId,
use matrix_sdk_search::{
error::IndexError,
index::{RoomIndex, RoomIndexOperation},
};
use ruma::{OwnedEventId, OwnedRoomId, RoomId};
use tokio::sync::{Mutex, MutexGuard};
use tracing::{debug, error};
@@ -80,11 +80,10 @@ impl SearchIndexGuard<'_> {
///
/// This which will add/remove/edit an event in the index based on the
/// event type.
pub(crate) fn handle_event(
pub(crate) fn execute(
&mut self,
event: AnySyncMessageLikeEvent,
operation: RoomIndexOperation,
room_id: &RoomId,
redaction_rules: &RedactionRules,
) -> Result<(), IndexError> {
if !self.index_map.contains_key(room_id) {
let index = self.create_index(room_id)?;
@@ -92,7 +91,7 @@ impl SearchIndexGuard<'_> {
}
let index = self.index_map.get_mut(room_id).expect("index should exist");
let result = index.handle_event(event, redaction_rules);
let result = index.execute(operation);
match result {
Ok(_) => {}

View File

@@ -742,14 +742,12 @@ impl EventCache {
let mut search_index_guard = client.search_index().lock().await;
for event in timeline_events {
if let Some(message_event) =
search::parse_timeline_event(&room_cache, &event).await
if let Some(index_operation) =
search::parse_timeline_event(&room_cache, &event, &redaction_rules)
.await
{
if let Err(err) = search_index_guard.handle_event(
message_event,
&room_id,
&redaction_rules,
) {
if let Err(err) = search_index_guard.execute(index_operation, &room_id)
{
warn!("Failed to handle event for indexing: {err}")
}
}

View File

@@ -13,8 +13,16 @@
// limitations under the License.
use matrix_sdk_base::deserialized_responses::TimelineEvent;
use matrix_sdk_search::index::RoomIndexOperation;
use ruma::{
events::{room::message::Relation, AnySyncMessageLikeEvent},
events::{
room::{
message::{OriginalSyncRoomMessageEvent, Relation, SyncRoomMessageEvent},
redaction::SyncRoomRedactionEvent,
},
AnySyncMessageLikeEvent, AnySyncTimelineEvent,
},
room_version_rules::RedactionRules,
EventId,
};
use tracing::warn;
@@ -26,7 +34,7 @@ use crate::event_cache::RoomEventCache;
async fn get_most_recent_edit(
cache: &RoomEventCache,
original: &EventId,
) -> Option<AnySyncMessageLikeEvent> {
) -> Option<OriginalSyncRoomMessageEvent> {
use ruma::events::{relation::RelationType, AnySyncTimelineEvent};
let Some((original_ev, related)) =
@@ -37,36 +45,78 @@ async fn get_most_recent_edit(
};
match related.last().unwrap_or(&original_ev).raw().deserialize() {
Ok(AnySyncTimelineEvent::MessageLike(latest)) => Some(latest),
Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(latest))) => {
latest.as_original().cloned()
}
_ => None,
}
}
/// Given an event, if it is a message then we return its latest edit else we
/// return the event.
async fn handle_possible_edits(
/// If the given [`OriginalSyncRoomMessageEvent`] is an edit we make an
/// [`RoomIndexOperation::Edit`] with the new most recent version of the
/// original.
async fn handle_possible_edit(
event: &OriginalSyncRoomMessageEvent,
cache: &RoomEventCache,
event: &AnySyncMessageLikeEvent,
) -> Option<AnySyncMessageLikeEvent> {
match event {
AnySyncMessageLikeEvent::RoomMessage(ev) => {
let original_ev_id = ev.as_original().map(|ev| match &ev.content.relates_to {
Some(Relation::Replacement(replaces)) => &replaces.event_id,
_ => event.event_id(),
})?;
get_most_recent_edit(cache, original_ev_id).await
) -> Option<RoomIndexOperation> {
if let Some(Relation::Replacement(replacement_data)) = &event.content.relates_to {
if let Some(recent) = get_most_recent_edit(cache, &replacement_data.event_id).await {
return Some(RoomIndexOperation::Edit(replacement_data.event_id.clone(), recent));
} else {
return Some(RoomIndexOperation::Noop);
}
_ => Some(event.clone()),
}
None
}
/// Prepare a [`TimelineEvent`] into a [`AnySyncMessageLikeEvent`] for search
/// Return a [`RoomIndexOperation::Edit`] or [`RoomIndexOperation::Add`]
/// depending on the message.
async fn handle_room_message(
event: SyncRoomMessageEvent,
cache: &RoomEventCache,
) -> Option<RoomIndexOperation> {
if let Some(event) = event.as_original() {
return handle_possible_edit(event, cache).await.or(get_most_recent_edit(
cache,
&event.event_id,
)
.await
.map(RoomIndexOperation::Add));
}
None
}
/// Return a [`RoomIndexOperation::Edit`] or [`RoomIndexOperation::Remove`]
/// depending on the message.
async fn handle_room_redaction(
event: SyncRoomRedactionEvent,
cache: &RoomEventCache,
rules: &RedactionRules,
) -> Option<RoomIndexOperation> {
if let Some(redacted_event_id) = event.redacts(rules) {
if let Some(redacted_event) = cache.find_event(redacted_event_id).await {
if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
redacted_event,
))) = redacted_event.raw().deserialize()
{
if let Some(redacted_event) = redacted_event.as_original() {
return handle_possible_edit(redacted_event, cache)
.await
.or(Some(RoomIndexOperation::Remove(redacted_event.event_id.clone())));
}
}
}
}
None
}
/// Prepare a [`TimelineEvent`] into a [`RoomIndexOperation`] for search
/// indexing.
pub(crate) async fn parse_timeline_event(
cache: &RoomEventCache,
event: &TimelineEvent,
) -> Option<AnySyncMessageLikeEvent> {
redaction_rules: &RedactionRules,
) -> Option<RoomIndexOperation> {
use ruma::events::AnySyncTimelineEvent;
if event.kind.is_utd() {
@@ -75,7 +125,15 @@ pub(crate) async fn parse_timeline_event(
match event.raw().deserialize() {
Ok(event) => match event {
AnySyncTimelineEvent::MessageLike(event) => handle_possible_edits(cache, &event).await,
AnySyncTimelineEvent::MessageLike(event) => match event {
AnySyncMessageLikeEvent::RoomMessage(event) => {
handle_room_message(event, cache).await
}
AnySyncMessageLikeEvent::RoomRedaction(event) => {
handle_room_redaction(event, cache, redaction_rules).await
}
_ => None,
},
AnySyncTimelineEvent::State(_) => None,
},

View File

@@ -62,8 +62,9 @@ use ruma::{
member::{MembershipState, RoomMemberEventContent},
message::{
FormattedBody, GalleryItemType, GalleryMessageEventContent,
ImageMessageEventContent, MessageType, Relation, RelationWithoutReplacement,
RoomMessageEventContent, RoomMessageEventContentWithoutRelation,
ImageMessageEventContent, MessageType, OriginalSyncRoomMessageEvent, Relation,
RelationWithoutReplacement, RoomMessageEventContent,
RoomMessageEventContentWithoutRelation,
},
name::RoomNameEventContent,
power_levels::RoomPowerLevelsEventContent,
@@ -329,6 +330,10 @@ where
self.into_raw().deserialize().expect("expected message like event")
}
pub fn into_original_sync_room_message_event(self) -> OriginalSyncRoomMessageEvent {
self.into_raw().deserialize().expect("expected original sync room message event")
}
pub fn into_raw_sync(self) -> Raw<AnySyncTimelineEvent> {
Raw::new(&self.construct_json(false)).unwrap().cast_unchecked()
}