From 79aa0ab60d13ead4d66367fe369e93c1bdfb705e Mon Sep 17 00:00:00 2001 From: Shrey Patel Date: Thu, 11 Sep 2025 16:41:00 +0100 Subject: [PATCH] feat(search): Add bulk processing. --- crates/matrix-sdk-search/src/index.rs | 235 +++++++++++++++++----- crates/matrix-sdk-search/src/schema.rs | 5 + crates/matrix-sdk-search/src/writer.rs | 3 +- crates/matrix-sdk/src/event_cache/mod.rs | 17 +- crates/matrix-sdk/src/room/mod.rs | 3 +- crates/matrix-sdk/src/search_index/mod.rs | 73 ++++--- 6 files changed, 245 insertions(+), 91 deletions(-) diff --git a/crates/matrix-sdk-search/src/index.rs b/crates/matrix-sdk-search/src/index.rs index 1f34d4f63..58987cefb 100644 --- a/crates/matrix-sdk-search/src/index.rs +++ b/crates/matrix-sdk-search/src/index.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fmt, fs, path::Path, sync::Arc}; +use std::{collections::HashSet, fmt, fs, path::Path, sync::Arc}; use ruma::{ EventId, OwnedEventId, OwnedRoomId, RoomId, events::room::message::OriginalSyncRoomMessageEvent, @@ -24,7 +24,7 @@ use tantivy::{ query::QueryParser, schema::Value, }; -use tracing::{error, warn}; +use tracing::{debug, error, warn}; use crate::{ OpStamp, TANTIVY_INDEX_MEMORY_BUDGET, @@ -34,7 +34,7 @@ use crate::{ }; /// A struct to represent the operations on a [`RoomIndex`] -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum RoomIndexOperation { /// Add this event to the index. Add(OriginalSyncRoomMessageEvent), @@ -52,11 +52,12 @@ pub enum RoomIndexOperation { /// A struct that holds all data pertaining to a particular room's /// message index. pub struct RoomIndex { + index: Index, schema: RoomMessageSchema, - writer: SearchIndexWriter, - reader: IndexReader, query_parser: QueryParser, room_id: OwnedRoomId, + uncommited_adds: HashSet, + uncommited_removes: HashSet, } impl fmt::Debug for RoomIndex { @@ -74,16 +75,14 @@ impl RoomIndex { schema: RoomMessageSchema, room_id: &RoomId, ) -> Result { - let writer = index.writer(TANTIVY_INDEX_MEMORY_BUDGET)?; - let reader = index.reader_builder().try_into()?; - let query_parser = QueryParser::for_index(&index, schema.default_search_fields()); Ok(Self { - writer: SearchIndexWriter::new(writer, schema.clone()), + index, schema, - reader, query_parser, room_id: room_id.to_owned(), + uncommited_adds: HashSet::new(), + uncommited_removes: HashSet::new(), }) } @@ -137,9 +136,25 @@ impl RoomIndex { RoomIndex::new_with(index, schema, room_id) } - /// Commit added events to [`RoomIndex`] - pub fn commit(&mut self) -> Result { - let last_commit_opstamp = self.writer.commit()?; // TODO: This is blocking. Handle it. + /// Get a [`SearchIndexWriter`] for this index. + fn get_writer(&self) -> Result { + let writer = self.index.writer(TANTIVY_INDEX_MEMORY_BUDGET)?; + Ok(SearchIndexWriter::new(writer, self.schema.clone())) + } + + /// Get a [`IndexReader`] for this index. + fn get_reader(&self) -> Result { + Ok(self.index.reader_builder().try_into()?) + } + + /// Commit added events to [`RoomIndex`]. The changes are not reflected in + /// the search results until the serchers are reloaded. + /// + /// Use [`RoomIndex::commit_and_reload`] for this purpose. + fn commit(&mut self, writer: &mut SearchIndexWriter) -> Result { + let last_commit_opstamp = writer.commit()?; // TODO: This is blocking. Handle it. + self.uncommited_adds.clear(); + self.uncommited_removes.clear(); Ok(last_commit_opstamp) } @@ -153,9 +168,13 @@ impl RoomIndex { /// This automatic reload can take 10s of milliseconds to kick in however, /// and in unit tests it can be nice to deterministically force the /// reload of searchers. - pub fn commit_and_reload(&mut self) -> Result { - let last_commit_opstamp = self.writer.commit()?; // TODO: This is blocking. Handle it. - self.reader.reload()?; + fn commit_and_reload(&mut self, writer: &mut SearchIndexWriter) -> Result { + debug!( + "RoomIndex: committing and reloading: uncommited: {:?}, {:?}", + self.uncommited_adds, self.uncommited_removes + ); + let last_commit_opstamp = self.commit(writer)?; + self.get_reader()?.reload()?; Ok(last_commit_opstamp) } @@ -173,7 +192,7 @@ impl RoomIndex { pagination_offset: Option, ) -> Result, IndexError> { let query = self.query_parser.parse_query(query)?; - let searcher = self.reader.searcher(); + let searcher = self.get_reader()?.searcher(); let offset = pagination_offset.unwrap_or(0); @@ -196,35 +215,161 @@ impl RoomIndex { Ok(ret) } - /// Execute [`RoomIndexOperation`] - /// - /// This which will add/remove/edit an event in the index based on the - /// operation. - pub fn execute(&mut self, operation: RoomIndexOperation) -> Result<(), IndexError> { - match operation { + fn get_events_to_be_removed( + &self, + event_id: &EventId, + ) -> Result, IndexError> { + self.search( + format!("{}:\"{event_id}\"", self.schema.get_field_name(self.schema.deletion_key())) + .as_str(), + 10000, + None, + ) + } + + fn add( + &mut self, + writer: &mut SearchIndexWriter, + event: OriginalSyncRoomMessageEvent, + ) -> Result<(), IndexError> { + if !self.contains(&event.event_id) { + writer.add(self.schema.make_doc(event.clone())?)?; + } + self.uncommited_removes.remove(&event.event_id); + self.uncommited_adds.insert(event.event_id); + Ok(()) + } + + fn remove( + &mut self, + writer: &mut SearchIndexWriter, + event_id: OwnedEventId, + ) -> Result<(), IndexError> { + let events = self.get_events_to_be_removed(&event_id)?; + + writer.remove(&event_id); + + for event in events.into_iter() { + self.uncommited_adds.remove(&event); + self.uncommited_removes.insert(event); + } + + Ok(()) + } + + fn execute_impl( + &mut self, + writer: &mut SearchIndexWriter, + operation: &RoomIndexOperation, + ) -> Result<(), IndexError> { + debug!("INDEX: executing {operation:?}"); + match operation.clone() { RoomIndexOperation::Add(event) => { - if !self.contains(&event.event_id) { - self.writer.add(self.schema.make_doc(event)?)?; - } + self.add(writer, event)?; } RoomIndexOperation::Remove(event_id) => { - self.writer.remove(&event_id); + self.remove(writer, event_id)?; } RoomIndexOperation::Edit(remove_event_id, event) => { - self.writer.remove(&remove_event_id); - if !self.contains(&event.event_id) { - self.writer.add(self.schema.make_doc(event)?)?; - } + self.remove(writer, remove_event_id)?; + self.add(writer, event)?; } RoomIndexOperation::Noop => {} } Ok(()) } + /// Execute [`RoomIndexOperation`] with retry + fn execute_with_retry( + &mut self, + writer: &mut SearchIndexWriter, + operation: &RoomIndexOperation, + retries: usize, + ) -> Result<(), IndexError> { + let mut num_tries = 0; + + while let Err(err) = self.execute_impl(writer, operation) { + if num_tries == retries { + return Err(err); + } + match err { + // Retry + IndexError::TantivyError(_) + | IndexError::IndexSchemaError(_) + | IndexError::IndexWriteError(_) + | IndexError::IO(_) => { + num_tries += 1; + } + IndexError::OpenDirectoryError(ref e) => match e { + // Retry + OpenDirectoryError::IoError { io_error: _, directory_path: _ } => { + num_tries += 1; + } + // Bubble + OpenDirectoryError::DoesNotExist(_) + | OpenDirectoryError::FailedToCreateTempDir(_) + | OpenDirectoryError::NotADirectory(_) => return Err(err), + }, + // Bubble + IndexError::QueryParserError(_) => return Err(err), + // Ignore + IndexError::CannotIndexRedactedMessage + | IndexError::EmptyMessage + | IndexError::MessageTypeNotSupported => break, + } + debug!("Failed to execute operation in room index (try {num_tries}): {err}"); + } + Ok(()) + } + + /// Execute [`RoomIndexOperation`] + /// + /// If an error occurs, retry 5 times if possible. + /// + /// This which will add/remove/edit an event in the index based on the + /// operation. + /// + /// Prefer [`RoomIndex::bulk_execute`] for multiple operations. + pub fn execute(&mut self, operation: RoomIndexOperation) -> Result<(), IndexError> { + let mut writer = self.get_writer()?; + self.execute_with_retry(&mut writer, &operation, 5)?; + self.commit_and_reload(&mut writer)?; + Ok(()) + } + + /// Bulk execute [`RoomIndexOperation`]s + /// + /// If an error occurs in the batch it retries 5 times if possible. + /// + /// This which will add/remove/edit an events in the index based on the + /// operations. + pub fn bulk_execute(&mut self, operations: Vec) -> Result<(), IndexError> { + let mut writer = self.get_writer()?; + let mut operations = operations.into_iter(); + let mut next_operation = operations.next(); + + while let Some(ref operation) = next_operation { + self.execute_with_retry(&mut writer, operation, 5)?; + next_operation = operations.next(); + } + + self.commit_and_reload(&mut writer)?; + + Ok(()) + } + fn contains(&self, event_id: &EventId) -> bool { - let search_result = self.search(format!("event_id:\"{event_id}\"").as_str(), 1, None); + let search_result = self.search( + format!("{}:\"{event_id}\"", self.schema.get_field_name(self.schema.primary_key())) + .as_str(), + 1, + None, + ); match search_result { - Ok(results) => !results.is_empty(), + Ok(results) => { + !self.uncommited_removes.contains(event_id) + && (!results.is_empty() || self.uncommited_adds.contains(event_id)) + } Err(err) => { warn!("Failed to check if event has been indexed, assuming it has: {err}"); true @@ -341,8 +486,6 @@ mod tests { .into_any_sync_message_like_event(), )?; - index.commit_and_reload()?; - let result = index.search("sentence", 10, None).expect("search failed with: {result:?}"); let result: HashSet<_> = result.iter().collect(); @@ -357,11 +500,9 @@ mod tests { #[test] fn test_search_empty_index() -> Result<(), Box> { let room_id = room_id!("!room_id:localhost"); - let mut index = + let index = RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}"); - index.commit_and_reload()?; - let result = index.search("sentence", 10, None).expect("search failed with: {result:?}"); assert!(result.is_empty(), "search result not empty: {result:?}"); @@ -372,13 +513,11 @@ mod tests { #[test] fn test_index_contains_false() { let room_id = room_id!("!room_id:localhost"); - let mut index = + let index = RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}"); let event_id = event_id!("$event_id:localhost"); - index.commit_and_reload().unwrap(); - assert!(!index.contains(event_id), "Index should not contain event"); } @@ -397,8 +536,6 @@ mod tests { index_message(&mut index, event)?; - index.commit_and_reload()?; - assert!(index.contains(event_id), "Index should contain event"); Ok(()) @@ -419,15 +556,11 @@ mod tests { index_message(&mut index, event.clone())?; - index.commit_and_reload()?; - assert!(index.contains(event_id), "Index should contain event"); // indexing again should do nothing index_message(&mut index, event)?; - index.commit_and_reload()?; - assert!(index.contains(event_id), "Index should still contain event"); let result = index.search("sentence", 10, None).expect("search failed with: {result:?}"); @@ -451,14 +584,10 @@ mod tests { index_message(&mut index, event)?; - index.commit_and_reload()?; - assert!(index.contains(event_id), "Index should contain event"); index_remove(&mut index, event_id)?; - index.commit_and_reload()?; - assert!(!index.contains(event_id), "Index should not contain event"); Ok(()) @@ -480,8 +609,6 @@ mod tests { index_message(&mut index, old_event)?; - index.commit_and_reload()?; - assert!(index.contains(old_event_id), "Index should contain event"); let new_event_id = event_id!("$new_event_id:localhost"); @@ -496,8 +623,6 @@ mod tests { index_edit(&mut index, old_event_id, edit)?; - index.commit_and_reload()?; - assert!(!index.contains(old_event_id), "Index should not contain old event"); assert!(index.contains(new_event_id), "Index should contain edited event"); diff --git a/crates/matrix-sdk-search/src/schema.rs b/crates/matrix-sdk-search/src/schema.rs index af2ba0551..d107f52d5 100644 --- a/crates/matrix-sdk-search/src/schema.rs +++ b/crates/matrix-sdk-search/src/schema.rs @@ -25,6 +25,7 @@ pub(crate) trait MatrixSearchIndexSchema { fn default_search_fields(&self) -> Vec; fn primary_key(&self) -> Field; fn deletion_key(&self) -> Field; + fn get_field_name(&self, field: Field) -> &str; fn as_tantivy_schema(&self) -> Schema; fn make_doc(&self, event: OriginalSyncRoomMessageEvent) -> Result; } @@ -83,6 +84,10 @@ impl MatrixSearchIndexSchema for RoomMessageSchema { self.original_event_id_field } + fn get_field_name(&self, field: Field) -> &str { + self.inner.get_field_name(field) + } + fn as_tantivy_schema(&self) -> Schema { self.inner.clone() } diff --git a/crates/matrix-sdk-search/src/writer.rs b/crates/matrix-sdk-search/src/writer.rs index 1bee90448..68c8e958c 100644 --- a/crates/matrix-sdk-search/src/writer.rs +++ b/crates/matrix-sdk-search/src/writer.rs @@ -33,8 +33,7 @@ impl SearchIndexWriter { } pub(crate) fn add(&self, document: TantivyDocument) -> Result { - Ok(self.inner.add_document(document)?) // TODO: This is blocking. Handle - // it. + Ok(self.inner.add_document(document)?) // TODO: This is blocking. Handle it. } pub(crate) fn remove(&self, event_id: &EventId) { diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 2ab3cf08c..b37303e5d 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -741,13 +741,16 @@ impl EventCache { let mut search_index_guard = client.search_index().lock().await; - for event in timeline_events { - if let Err(err) = search_index_guard - .handle_timeline_event(&event, &room_cache, &room_id, &redaction_rules) - .await - { - warn!("Failed to handle event for indexing: {err}") - } + if let Err(err) = search_index_guard + .bulk_handle_timeline_event( + timeline_events, + &room_cache, + &room_id, + &redaction_rules, + ) + .await + { + error!("Failed to handle events for indexing: {err}") } } Err(RecvError::Closed) => { diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index 1763b4e17..59aa97827 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -4123,8 +4123,7 @@ impl Room { max_number_of_results: usize, pagination_offset: Option, ) -> Option> { - let mut search_index_guard = self.client.search_index().lock().await; - search_index_guard.commit_and_reload(self.room_id()); + let search_index_guard = self.client.search_index().lock().await; search_index_guard.search(query, max_number_of_results, pagination_offset, self.room_id()) } diff --git a/crates/matrix-sdk/src/search_index/mod.rs b/crates/matrix-sdk/src/search_index/mod.rs index 07f116e34..705709f2f 100644 --- a/crates/matrix-sdk/src/search_index/mod.rs +++ b/crates/matrix-sdk/src/search_index/mod.rs @@ -18,6 +18,7 @@ use std::{collections::hash_map::HashMap, path::PathBuf, sync::Arc}; +use futures_util::future::join_all; use matrix_sdk_base::deserialized_responses::TimelineEvent; use matrix_sdk_search::{ error::IndexError, @@ -95,11 +96,13 @@ impl SearchIndexGuard<'_> { Ok(index) } - /// Handle an [`AnySyncMessageLikeEvent`] in the [`RoomIndex`] of a given + /// Handle a [`RoomIndexOperation`] in the [`RoomIndex`] of a given /// [`RoomId`] /// /// This which will add/remove/edit an event in the index based on the /// event type. + /// + /// Prefer [`SearchIndexGuard::bulk_execute`] for multiple operations. pub(crate) fn execute( &mut self, operation: RoomIndexOperation, @@ -111,21 +114,28 @@ impl SearchIndexGuard<'_> { } let index = self.index_map.get_mut(room_id).expect("index should exist"); - let result = index.execute(operation); - match result { - Ok(_) => {} - Err(IndexError::CannotIndexRedactedMessage) - | Err(IndexError::EmptyMessage) - | Err(IndexError::MessageTypeNotSupported) => { - debug!("failed to parse event for indexing: {result:?}") - } - Err(IndexError::TantivyError(err)) => { - error!("failed to handle event in index: {err:?}") - } - Err(_) => error!("unexpected error during indexing: {result:?}"), + index.execute(operation) + } + + /// Handle a [`RoomIndexOperation`] in the [`RoomIndex`] of a given + /// [`RoomId`] + /// + /// This which will add/remove/edit an event in the index based on the + /// event type. + pub(crate) fn bulk_execute( + &mut self, + operations: Vec, + room_id: &RoomId, + ) -> Result<(), IndexError> { + if !self.index_map.contains_key(room_id) { + let index = self.create_index(room_id)?; + self.index_map.insert(room_id.to_owned(), index); } - Ok(()) + + let index = self.index_map.get_mut(room_id).expect("index should exist"); + + index.bulk_execute(operations) } /// Search a [`Room`]'s index for the query and return at most @@ -150,21 +160,15 @@ impl SearchIndexGuard<'_> { } } - /// Commit a [`Room`]'s [`RoomIndex`] and reload searchers - pub(crate) fn commit_and_reload(&mut self, room_id: &RoomId) { - if let Some(index) = self.index_map.get_mut(room_id) { - let _ = index.commit_and_reload().inspect_err(|err| { - error!("error occurred while committing: {err:?}"); - }); - } - } - /// Given a [`TimelineEvent`] this function will derive a /// [`RoomIndexOperation`], if it should be handled, and execute it; /// returning the result. + /// + /// Prefer [`SearchIndexGuard::bulk_handle_timeline_event`] for multiple + /// events. pub async fn handle_timeline_event( &mut self, - event: &TimelineEvent, + event: TimelineEvent, room_cache: &RoomEventCache, room_id: &RoomId, redaction_rules: &RedactionRules, @@ -177,6 +181,25 @@ impl SearchIndexGuard<'_> { Ok(()) } } + + /// Run [`SearchIndexGuard::handle_timeline_event`] for multiple + /// [`TimelineEvent`]. + pub async fn bulk_handle_timeline_event( + &mut self, + events: T, + room_cache: &RoomEventCache, + room_id: &RoomId, + redaction_rules: &RedactionRules, + ) -> Result<(), IndexError> + where + T: Iterator, + { + let futures = events.map(|ev| parse_timeline_event(room_cache, ev, redaction_rules)); + + let operations: Vec<_> = join_all(futures).await.into_iter().flatten().collect(); + + self.bulk_execute(operations, room_id) + } } /// Given an event id this function returns the most recent edit on said event @@ -261,7 +284,7 @@ async fn handle_room_redaction( /// indexing. async fn parse_timeline_event( cache: &RoomEventCache, - event: &TimelineEvent, + event: TimelineEvent, redaction_rules: &RedactionRules, ) -> Option { use ruma::events::AnySyncTimelineEvent;