feat(search): Add bulk processing.

This commit is contained in:
Shrey Patel
2025-09-11 16:41:00 +01:00
committed by Damir Jelić
parent a8ef44306a
commit 79aa0ab60d
6 changed files with 245 additions and 91 deletions

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{fmt, fs, path::Path, sync::Arc};
use std::{collections::HashSet, fmt, fs, path::Path, sync::Arc};
use ruma::{
EventId, OwnedEventId, OwnedRoomId, RoomId, events::room::message::OriginalSyncRoomMessageEvent,
@@ -24,7 +24,7 @@ use tantivy::{
query::QueryParser,
schema::Value,
};
use tracing::{error, warn};
use tracing::{debug, error, warn};
use crate::{
OpStamp, TANTIVY_INDEX_MEMORY_BUDGET,
@@ -34,7 +34,7 @@ use crate::{
};
/// A struct to represent the operations on a [`RoomIndex`]
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum RoomIndexOperation {
/// Add this event to the index.
Add(OriginalSyncRoomMessageEvent),
@@ -52,11 +52,12 @@ pub enum RoomIndexOperation {
/// A struct that holds all data pertaining to a particular room's
/// message index.
pub struct RoomIndex {
index: Index,
schema: RoomMessageSchema,
writer: SearchIndexWriter,
reader: IndexReader,
query_parser: QueryParser,
room_id: OwnedRoomId,
uncommited_adds: HashSet<OwnedEventId>,
uncommited_removes: HashSet<OwnedEventId>,
}
impl fmt::Debug for RoomIndex {
@@ -74,16 +75,14 @@ impl RoomIndex {
schema: RoomMessageSchema,
room_id: &RoomId,
) -> Result<RoomIndex, IndexError> {
let writer = index.writer(TANTIVY_INDEX_MEMORY_BUDGET)?;
let reader = index.reader_builder().try_into()?;
let query_parser = QueryParser::for_index(&index, schema.default_search_fields());
Ok(Self {
writer: SearchIndexWriter::new(writer, schema.clone()),
index,
schema,
reader,
query_parser,
room_id: room_id.to_owned(),
uncommited_adds: HashSet::new(),
uncommited_removes: HashSet::new(),
})
}
@@ -137,9 +136,25 @@ impl RoomIndex {
RoomIndex::new_with(index, schema, room_id)
}
/// Commit added events to [`RoomIndex`]
pub fn commit(&mut self) -> Result<OpStamp, IndexError> {
let last_commit_opstamp = self.writer.commit()?; // TODO: This is blocking. Handle it.
/// Get a [`SearchIndexWriter`] for this index.
fn get_writer(&self) -> Result<SearchIndexWriter, IndexError> {
let writer = self.index.writer(TANTIVY_INDEX_MEMORY_BUDGET)?;
Ok(SearchIndexWriter::new(writer, self.schema.clone()))
}
/// Get a [`IndexReader`] for this index.
fn get_reader(&self) -> Result<IndexReader, IndexError> {
Ok(self.index.reader_builder().try_into()?)
}
/// Commit added events to [`RoomIndex`]. The changes are not reflected in
/// the search results until the serchers are reloaded.
///
/// Use [`RoomIndex::commit_and_reload`] for this purpose.
fn commit(&mut self, writer: &mut SearchIndexWriter) -> Result<OpStamp, IndexError> {
let last_commit_opstamp = writer.commit()?; // TODO: This is blocking. Handle it.
self.uncommited_adds.clear();
self.uncommited_removes.clear();
Ok(last_commit_opstamp)
}
@@ -153,9 +168,13 @@ impl RoomIndex {
/// This automatic reload can take 10s of milliseconds to kick in however,
/// and in unit tests it can be nice to deterministically force the
/// reload of searchers.
pub fn commit_and_reload(&mut self) -> Result<OpStamp, IndexError> {
let last_commit_opstamp = self.writer.commit()?; // TODO: This is blocking. Handle it.
self.reader.reload()?;
fn commit_and_reload(&mut self, writer: &mut SearchIndexWriter) -> Result<OpStamp, IndexError> {
debug!(
"RoomIndex: committing and reloading: uncommited: {:?}, {:?}",
self.uncommited_adds, self.uncommited_removes
);
let last_commit_opstamp = self.commit(writer)?;
self.get_reader()?.reload()?;
Ok(last_commit_opstamp)
}
@@ -173,7 +192,7 @@ impl RoomIndex {
pagination_offset: Option<usize>,
) -> Result<Vec<OwnedEventId>, IndexError> {
let query = self.query_parser.parse_query(query)?;
let searcher = self.reader.searcher();
let searcher = self.get_reader()?.searcher();
let offset = pagination_offset.unwrap_or(0);
@@ -196,35 +215,161 @@ impl RoomIndex {
Ok(ret)
}
/// Execute [`RoomIndexOperation`]
///
/// This which will add/remove/edit an event in the index based on the
/// operation.
pub fn execute(&mut self, operation: RoomIndexOperation) -> Result<(), IndexError> {
match operation {
fn get_events_to_be_removed(
&self,
event_id: &EventId,
) -> Result<Vec<OwnedEventId>, IndexError> {
self.search(
format!("{}:\"{event_id}\"", self.schema.get_field_name(self.schema.deletion_key()))
.as_str(),
10000,
None,
)
}
fn add(
&mut self,
writer: &mut SearchIndexWriter,
event: OriginalSyncRoomMessageEvent,
) -> Result<(), IndexError> {
if !self.contains(&event.event_id) {
writer.add(self.schema.make_doc(event.clone())?)?;
}
self.uncommited_removes.remove(&event.event_id);
self.uncommited_adds.insert(event.event_id);
Ok(())
}
fn remove(
&mut self,
writer: &mut SearchIndexWriter,
event_id: OwnedEventId,
) -> Result<(), IndexError> {
let events = self.get_events_to_be_removed(&event_id)?;
writer.remove(&event_id);
for event in events.into_iter() {
self.uncommited_adds.remove(&event);
self.uncommited_removes.insert(event);
}
Ok(())
}
fn execute_impl(
&mut self,
writer: &mut SearchIndexWriter,
operation: &RoomIndexOperation,
) -> Result<(), IndexError> {
debug!("INDEX: executing {operation:?}");
match operation.clone() {
RoomIndexOperation::Add(event) => {
if !self.contains(&event.event_id) {
self.writer.add(self.schema.make_doc(event)?)?;
}
self.add(writer, event)?;
}
RoomIndexOperation::Remove(event_id) => {
self.writer.remove(&event_id);
self.remove(writer, event_id)?;
}
RoomIndexOperation::Edit(remove_event_id, event) => {
self.writer.remove(&remove_event_id);
if !self.contains(&event.event_id) {
self.writer.add(self.schema.make_doc(event)?)?;
}
self.remove(writer, remove_event_id)?;
self.add(writer, event)?;
}
RoomIndexOperation::Noop => {}
}
Ok(())
}
/// Execute [`RoomIndexOperation`] with retry
fn execute_with_retry(
&mut self,
writer: &mut SearchIndexWriter,
operation: &RoomIndexOperation,
retries: usize,
) -> Result<(), IndexError> {
let mut num_tries = 0;
while let Err(err) = self.execute_impl(writer, operation) {
if num_tries == retries {
return Err(err);
}
match err {
// Retry
IndexError::TantivyError(_)
| IndexError::IndexSchemaError(_)
| IndexError::IndexWriteError(_)
| IndexError::IO(_) => {
num_tries += 1;
}
IndexError::OpenDirectoryError(ref e) => match e {
// Retry
OpenDirectoryError::IoError { io_error: _, directory_path: _ } => {
num_tries += 1;
}
// Bubble
OpenDirectoryError::DoesNotExist(_)
| OpenDirectoryError::FailedToCreateTempDir(_)
| OpenDirectoryError::NotADirectory(_) => return Err(err),
},
// Bubble
IndexError::QueryParserError(_) => return Err(err),
// Ignore
IndexError::CannotIndexRedactedMessage
| IndexError::EmptyMessage
| IndexError::MessageTypeNotSupported => break,
}
debug!("Failed to execute operation in room index (try {num_tries}): {err}");
}
Ok(())
}
/// Execute [`RoomIndexOperation`]
///
/// If an error occurs, retry 5 times if possible.
///
/// This which will add/remove/edit an event in the index based on the
/// operation.
///
/// Prefer [`RoomIndex::bulk_execute`] for multiple operations.
pub fn execute(&mut self, operation: RoomIndexOperation) -> Result<(), IndexError> {
let mut writer = self.get_writer()?;
self.execute_with_retry(&mut writer, &operation, 5)?;
self.commit_and_reload(&mut writer)?;
Ok(())
}
/// Bulk execute [`RoomIndexOperation`]s
///
/// If an error occurs in the batch it retries 5 times if possible.
///
/// This which will add/remove/edit an events in the index based on the
/// operations.
pub fn bulk_execute(&mut self, operations: Vec<RoomIndexOperation>) -> Result<(), IndexError> {
let mut writer = self.get_writer()?;
let mut operations = operations.into_iter();
let mut next_operation = operations.next();
while let Some(ref operation) = next_operation {
self.execute_with_retry(&mut writer, operation, 5)?;
next_operation = operations.next();
}
self.commit_and_reload(&mut writer)?;
Ok(())
}
fn contains(&self, event_id: &EventId) -> bool {
let search_result = self.search(format!("event_id:\"{event_id}\"").as_str(), 1, None);
let search_result = self.search(
format!("{}:\"{event_id}\"", self.schema.get_field_name(self.schema.primary_key()))
.as_str(),
1,
None,
);
match search_result {
Ok(results) => !results.is_empty(),
Ok(results) => {
!self.uncommited_removes.contains(event_id)
&& (!results.is_empty() || self.uncommited_adds.contains(event_id))
}
Err(err) => {
warn!("Failed to check if event has been indexed, assuming it has: {err}");
true
@@ -341,8 +486,6 @@ mod tests {
.into_any_sync_message_like_event(),
)?;
index.commit_and_reload()?;
let result = index.search("sentence", 10, None).expect("search failed with: {result:?}");
let result: HashSet<_> = result.iter().collect();
@@ -357,11 +500,9 @@ mod tests {
#[test]
fn test_search_empty_index() -> Result<(), Box<dyn Error>> {
let room_id = room_id!("!room_id:localhost");
let mut index =
let index =
RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}");
index.commit_and_reload()?;
let result = index.search("sentence", 10, None).expect("search failed with: {result:?}");
assert!(result.is_empty(), "search result not empty: {result:?}");
@@ -372,13 +513,11 @@ mod tests {
#[test]
fn test_index_contains_false() {
let room_id = room_id!("!room_id:localhost");
let mut index =
let index =
RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}");
let event_id = event_id!("$event_id:localhost");
index.commit_and_reload().unwrap();
assert!(!index.contains(event_id), "Index should not contain event");
}
@@ -397,8 +536,6 @@ mod tests {
index_message(&mut index, event)?;
index.commit_and_reload()?;
assert!(index.contains(event_id), "Index should contain event");
Ok(())
@@ -419,15 +556,11 @@ mod tests {
index_message(&mut index, event.clone())?;
index.commit_and_reload()?;
assert!(index.contains(event_id), "Index should contain event");
// indexing again should do nothing
index_message(&mut index, event)?;
index.commit_and_reload()?;
assert!(index.contains(event_id), "Index should still contain event");
let result = index.search("sentence", 10, None).expect("search failed with: {result:?}");
@@ -451,14 +584,10 @@ mod tests {
index_message(&mut index, event)?;
index.commit_and_reload()?;
assert!(index.contains(event_id), "Index should contain event");
index_remove(&mut index, event_id)?;
index.commit_and_reload()?;
assert!(!index.contains(event_id), "Index should not contain event");
Ok(())
@@ -480,8 +609,6 @@ mod tests {
index_message(&mut index, old_event)?;
index.commit_and_reload()?;
assert!(index.contains(old_event_id), "Index should contain event");
let new_event_id = event_id!("$new_event_id:localhost");
@@ -496,8 +623,6 @@ mod tests {
index_edit(&mut index, old_event_id, edit)?;
index.commit_and_reload()?;
assert!(!index.contains(old_event_id), "Index should not contain old event");
assert!(index.contains(new_event_id), "Index should contain edited event");

View File

@@ -25,6 +25,7 @@ pub(crate) trait MatrixSearchIndexSchema {
fn default_search_fields(&self) -> Vec<Field>;
fn primary_key(&self) -> Field;
fn deletion_key(&self) -> Field;
fn get_field_name(&self, field: Field) -> &str;
fn as_tantivy_schema(&self) -> Schema;
fn make_doc(&self, event: OriginalSyncRoomMessageEvent) -> Result<TantivyDocument, IndexError>;
}
@@ -83,6 +84,10 @@ impl MatrixSearchIndexSchema for RoomMessageSchema {
self.original_event_id_field
}
fn get_field_name(&self, field: Field) -> &str {
self.inner.get_field_name(field)
}
fn as_tantivy_schema(&self) -> Schema {
self.inner.clone()
}

View File

@@ -33,8 +33,7 @@ impl SearchIndexWriter {
}
pub(crate) fn add(&self, document: TantivyDocument) -> Result<OpStamp, IndexError> {
Ok(self.inner.add_document(document)?) // TODO: This is blocking. Handle
// it.
Ok(self.inner.add_document(document)?) // TODO: This is blocking. Handle it.
}
pub(crate) fn remove(&self, event_id: &EventId) {

View File

@@ -741,13 +741,16 @@ impl EventCache {
let mut search_index_guard = client.search_index().lock().await;
for event in timeline_events {
if let Err(err) = search_index_guard
.handle_timeline_event(&event, &room_cache, &room_id, &redaction_rules)
.await
{
warn!("Failed to handle event for indexing: {err}")
}
if let Err(err) = search_index_guard
.bulk_handle_timeline_event(
timeline_events,
&room_cache,
&room_id,
&redaction_rules,
)
.await
{
error!("Failed to handle events for indexing: {err}")
}
}
Err(RecvError::Closed) => {

View File

@@ -4123,8 +4123,7 @@ impl Room {
max_number_of_results: usize,
pagination_offset: Option<usize>,
) -> Option<Vec<OwnedEventId>> {
let mut search_index_guard = self.client.search_index().lock().await;
search_index_guard.commit_and_reload(self.room_id());
let search_index_guard = self.client.search_index().lock().await;
search_index_guard.search(query, max_number_of_results, pagination_offset, self.room_id())
}

View File

@@ -18,6 +18,7 @@
use std::{collections::hash_map::HashMap, path::PathBuf, sync::Arc};
use futures_util::future::join_all;
use matrix_sdk_base::deserialized_responses::TimelineEvent;
use matrix_sdk_search::{
error::IndexError,
@@ -95,11 +96,13 @@ impl SearchIndexGuard<'_> {
Ok(index)
}
/// Handle an [`AnySyncMessageLikeEvent`] in the [`RoomIndex`] of a given
/// Handle a [`RoomIndexOperation`] in the [`RoomIndex`] of a given
/// [`RoomId`]
///
/// This which will add/remove/edit an event in the index based on the
/// event type.
///
/// Prefer [`SearchIndexGuard::bulk_execute`] for multiple operations.
pub(crate) fn execute(
&mut self,
operation: RoomIndexOperation,
@@ -111,21 +114,28 @@ impl SearchIndexGuard<'_> {
}
let index = self.index_map.get_mut(room_id).expect("index should exist");
let result = index.execute(operation);
match result {
Ok(_) => {}
Err(IndexError::CannotIndexRedactedMessage)
| Err(IndexError::EmptyMessage)
| Err(IndexError::MessageTypeNotSupported) => {
debug!("failed to parse event for indexing: {result:?}")
}
Err(IndexError::TantivyError(err)) => {
error!("failed to handle event in index: {err:?}")
}
Err(_) => error!("unexpected error during indexing: {result:?}"),
index.execute(operation)
}
/// Handle a [`RoomIndexOperation`] in the [`RoomIndex`] of a given
/// [`RoomId`]
///
/// This which will add/remove/edit an event in the index based on the
/// event type.
pub(crate) fn bulk_execute(
&mut self,
operations: Vec<RoomIndexOperation>,
room_id: &RoomId,
) -> Result<(), IndexError> {
if !self.index_map.contains_key(room_id) {
let index = self.create_index(room_id)?;
self.index_map.insert(room_id.to_owned(), index);
}
Ok(())
let index = self.index_map.get_mut(room_id).expect("index should exist");
index.bulk_execute(operations)
}
/// Search a [`Room`]'s index for the query and return at most
@@ -150,21 +160,15 @@ impl SearchIndexGuard<'_> {
}
}
/// Commit a [`Room`]'s [`RoomIndex`] and reload searchers
pub(crate) fn commit_and_reload(&mut self, room_id: &RoomId) {
if let Some(index) = self.index_map.get_mut(room_id) {
let _ = index.commit_and_reload().inspect_err(|err| {
error!("error occurred while committing: {err:?}");
});
}
}
/// Given a [`TimelineEvent`] this function will derive a
/// [`RoomIndexOperation`], if it should be handled, and execute it;
/// returning the result.
///
/// Prefer [`SearchIndexGuard::bulk_handle_timeline_event`] for multiple
/// events.
pub async fn handle_timeline_event(
&mut self,
event: &TimelineEvent,
event: TimelineEvent,
room_cache: &RoomEventCache,
room_id: &RoomId,
redaction_rules: &RedactionRules,
@@ -177,6 +181,25 @@ impl SearchIndexGuard<'_> {
Ok(())
}
}
/// Run [`SearchIndexGuard::handle_timeline_event`] for multiple
/// [`TimelineEvent`].
pub async fn bulk_handle_timeline_event<T>(
&mut self,
events: T,
room_cache: &RoomEventCache,
room_id: &RoomId,
redaction_rules: &RedactionRules,
) -> Result<(), IndexError>
where
T: Iterator<Item = TimelineEvent>,
{
let futures = events.map(|ev| parse_timeline_event(room_cache, ev, redaction_rules));
let operations: Vec<_> = join_all(futures).await.into_iter().flatten().collect();
self.bulk_execute(operations, room_id)
}
}
/// Given an event id this function returns the most recent edit on said event
@@ -261,7 +284,7 @@ async fn handle_room_redaction(
/// indexing.
async fn parse_timeline_event(
cache: &RoomEventCache,
event: &TimelineEvent,
event: TimelineEvent,
redaction_rules: &RedactionRules,
) -> Option<RoomIndexOperation> {
use ruma::events::AnySyncTimelineEvent;