From 3c8e2dacfd8261d6eed50aeeb631a9f1c7c29ac4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Mon, 4 Aug 2025 10:53:31 +0200 Subject: [PATCH 1/3] Improve indexing performance using batches --- services/search/pkg/engine/bleve.go | 84 +++++++++++++++- services/search/pkg/engine/bleve_test.go | 66 ++++++++++++- services/search/pkg/engine/engine.go | 3 + services/search/pkg/engine/mocks/engine.go | 95 +++++++++++++++++++ services/search/pkg/search/service.go | 7 ++ .../search/pkg/service/grpc/v0/service.go | 2 +- 6 files changed, 250 insertions(+), 7 deletions(-) diff --git a/services/search/pkg/engine/bleve.go b/services/search/pkg/engine/bleve.go index 37e9eff426..8976cbdf79 100644 --- a/services/search/pkg/engine/bleve.go +++ b/services/search/pkg/engine/bleve.go @@ -8,6 +8,7 @@ import ( "path/filepath" "reflect" "strings" + "sync" "time" "github.com/blevesearch/bleve/v2" @@ -20,10 +21,11 @@ import ( "github.com/blevesearch/bleve/v2/mapping" "github.com/blevesearch/bleve/v2/search/query" storageProvider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + libregraph "github.com/opencloud-eu/libre-graph-api-go" + "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/reva/v2/pkg/errtypes" "github.com/opencloud-eu/reva/v2/pkg/storagespace" "github.com/opencloud-eu/reva/v2/pkg/utils" - libregraph "github.com/opencloud-eu/libre-graph-api-go" "google.golang.org/protobuf/types/known/timestamppb" searchMessage "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/search/v0" @@ -32,10 +34,16 @@ import ( searchQuery "github.com/opencloud-eu/opencloud/services/search/pkg/query" ) +const _batchSize = 500 + // Bleve represents a search engine which utilizes bleve to search and store resources. type Bleve struct { index bleve.Index queryCreator searchQuery.Creator[query.Query] + batch *bleve.Batch + batchSize int + m sync.Mutex // batch operations in bleve are not thread-safe + log log.Logger } // NewBleveIndex returns a new bleve index @@ -60,10 +68,11 @@ func NewBleveIndex(root string) (bleve.Index, error) { } // NewBleveEngine creates a new Bleve instance -func NewBleveEngine(index bleve.Index, queryCreator searchQuery.Creator[query.Query]) *Bleve { +func NewBleveEngine(index bleve.Index, queryCreator searchQuery.Creator[query.Query], log log.Logger) *Bleve { return &Bleve{ index: index, queryCreator: queryCreator, + log: log, } } @@ -233,8 +242,60 @@ func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexReques }, nil } +func (b *Bleve) StartBatch(batchSize int) error { + b.m.Lock() + defer b.m.Unlock() + + if batchSize <= 0 { + return errors.New("batch size must be greater than 0") + } + + if b.batch != nil { + b.log.Debug().Msg("reusing another batch that has already been started") + return nil + } + + b.log.Debug().Msg("Starting new batch") + b.batch = b.index.NewBatch() + b.batchSize = batchSize + return nil +} + +func (b *Bleve) EndBatch() error { + b.m.Lock() + defer b.m.Unlock() + + if b.batch == nil { + return errors.New("no batch started") + } + + b.log.Debug().Int("size", b.batch.Size()).Msg("Ending batch") + if err := b.index.Batch(b.batch); err != nil { + return err + } + + b.batch = nil + return nil +} + // Upsert indexes or stores Resource data fields. func (b *Bleve) Upsert(id string, r Resource) error { + b.m.Lock() + defer b.m.Unlock() + + if b.batch != nil { + if err := b.batch.Index(id, r); err != nil { + return err + } + if b.batch.Size() >= b.batchSize { + b.log.Debug().Int("size", b.batch.Size()).Msg("Committing batch") + if err := b.index.Batch(b.batch); err != nil { + return err + } + b.batch = b.index.NewBatch() + } + return nil + } return b.index.Index(id, r) } @@ -298,6 +359,19 @@ func (b *Bleve) Restore(id string) error { // Purge removes a resource from the index, irreversible operation. func (b *Bleve) Purge(id string) error { + b.m.Lock() + defer b.m.Unlock() + + if b.batch != nil { + b.batch.Delete(id) + if b.batch.Size() >= b.batchSize { + if err := b.index.Batch(b.batch); err != nil { + return err + } + b.batch = b.index.NewBatch() + } + return nil + } return b.index.Delete(id) } @@ -452,7 +526,7 @@ func (b *Bleve) updateEntity(id string, mutateFunc func(r *Resource)) (*Resource mutateFunc(it) - return it, b.index.Index(it.ID, it) + return it, b.Upsert(id, *it) } func (b *Bleve) setDeleted(id string, deleted bool) error { @@ -468,6 +542,7 @@ func (b *Bleve) setDeleted(id string, deleted bool) error { bleve.NewQueryStringQuery("RootID:"+it.RootID), bleve.NewQueryStringQuery("Path:"+escapeQuery(it.Path+"/*")), ) + bleveReq := bleve.NewSearchRequest(q) bleveReq.Size = math.MaxInt bleveReq.Fields = []string{"*"} @@ -476,6 +551,8 @@ func (b *Bleve) setDeleted(id string, deleted bool) error { return err } + b.StartBatch(_batchSize) + defer b.EndBatch() for _, h := range res.Hits { _, err := b.updateEntity(h.ID, func(r *Resource) { r.Deleted = deleted @@ -484,6 +561,7 @@ func (b *Bleve) setDeleted(id string, deleted bool) error { return err } } + b.EndBatch() } return nil diff --git a/services/search/pkg/engine/bleve_test.go b/services/search/pkg/engine/bleve_test.go index 78039a68f7..05974d4ef7 100644 --- a/services/search/pkg/engine/bleve_test.go +++ b/services/search/pkg/engine/bleve_test.go @@ -8,9 +8,10 @@ import ( sprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/opencloud-eu/reva/v2/pkg/storagespace" libregraph "github.com/opencloud-eu/libre-graph-api-go" + "github.com/opencloud-eu/reva/v2/pkg/storagespace" + "github.com/opencloud-eu/opencloud/pkg/log" searchmsg "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/search/v0" searchsvc "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/services/search/v0" "github.com/opencloud-eu/opencloud/services/search/pkg/content" @@ -53,6 +54,7 @@ var _ = Describe("Bleve", func() { rootResource engine.Resource parentResource engine.Resource childResource engine.Resource + childResource2 engine.Resource ) BeforeEach(func() { @@ -62,7 +64,7 @@ var _ = Describe("Bleve", func() { idx, err = bleveSearch.NewMemOnly(mapping) Expect(err).ToNot(HaveOccurred()) - eng = engine.NewBleveEngine(idx, bleve.DefaultCreator) + eng = engine.NewBleveEngine(idx, bleve.DefaultCreator, log.Logger{}) Expect(err).ToNot(HaveOccurred()) rootResource = engine.Resource{ @@ -89,11 +91,20 @@ var _ = Describe("Bleve", func() { Type: uint64(sprovider.ResourceType_RESOURCE_TYPE_FILE), Document: content.Document{Name: "child.pdf"}, } + + childResource2 = engine.Resource{ + ID: "1$2!5", + ParentID: parentResource.ID, + RootID: rootResource.ID, + Path: "./parent d!r/child2.pdf", + Type: uint64(sprovider.ResourceType_RESOURCE_TYPE_FILE), + Document: content.Document{Name: "child2.pdf"}, + } }) Describe("New", func() { It("returns a new index instance", func() { - b := engine.NewBleveEngine(idx, bleve.DefaultCreator) + b := engine.NewBleveEngine(idx, bleve.DefaultCreator, log.Logger{}) Expect(b).ToNot(BeNil()) }) }) @@ -486,6 +497,55 @@ var _ = Describe("Bleve", func() { }) }) + Describe("StartBatch", func() { + It("starts a new batch", func() { + err := eng.StartBatch(100) + Expect(err).ToNot(HaveOccurred()) + + err = eng.Upsert(childResource.ID, childResource) + Expect(err).ToNot(HaveOccurred()) + + count, err := idx.DocCount() + Expect(err).ToNot(HaveOccurred()) + Expect(count).To(Equal(uint64(0))) + + err = eng.EndBatch() + Expect(err).ToNot(HaveOccurred()) + + count, err = idx.DocCount() + Expect(err).ToNot(HaveOccurred()) + Expect(count).To(Equal(uint64(1))) + + query := bleveSearch.NewMatchQuery("child.pdf") + res, err := idx.Search(bleveSearch.NewSearchRequest(query)) + Expect(err).ToNot(HaveOccurred()) + Expect(res.Hits.Len()).To(Equal(1)) + }) + + It("doesn't overwrite batches that are already in progress", func() { + err := eng.StartBatch(100) + Expect(err).ToNot(HaveOccurred()) + + err = eng.Upsert(childResource.ID, childResource) + Expect(err).ToNot(HaveOccurred()) + + count, err := idx.DocCount() + Expect(err).ToNot(HaveOccurred()) + Expect(count).To(Equal(uint64(0))) + + err = eng.StartBatch(100) + Expect(err).ToNot(HaveOccurred()) + + err = eng.Upsert(childResource2.ID, childResource2) + Expect(err).ToNot(HaveOccurred()) + + Expect(eng.EndBatch()).To(Succeed()) + count, err = idx.DocCount() + Expect(err).ToNot(HaveOccurred()) + Expect(count).To(Equal(uint64(2))) + }) + }) + Describe("File type specific metadata", func() { Context("with audio metadata", func() { diff --git a/services/search/pkg/engine/engine.go b/services/search/pkg/engine/engine.go index f61f060cfe..1553662f34 100644 --- a/services/search/pkg/engine/engine.go +++ b/services/search/pkg/engine/engine.go @@ -23,6 +23,9 @@ type Engine interface { Restore(id string) error Purge(id string) error DocCount() (uint64, error) + + StartBatch(batchSize int) error + EndBatch() error } // Resource is the entity that is stored in the index. diff --git a/services/search/pkg/engine/mocks/engine.go b/services/search/pkg/engine/mocks/engine.go index 5159c0e0d6..533f3bccf2 100644 --- a/services/search/pkg/engine/mocks/engine.go +++ b/services/search/pkg/engine/mocks/engine.go @@ -143,6 +143,50 @@ func (_c *Engine_DocCount_Call) RunAndReturn(run func() (uint64, error)) *Engine return _c } +// EndBatch provides a mock function for the type Engine +func (_mock *Engine) EndBatch() error { + ret := _mock.Called() + + if len(ret) == 0 { + panic("no return value specified for EndBatch") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func() error); ok { + r0 = returnFunc() + } else { + r0 = ret.Error(0) + } + return r0 +} + +// Engine_EndBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EndBatch' +type Engine_EndBatch_Call struct { + *mock.Call +} + +// EndBatch is a helper method to define mock.On call +func (_e *Engine_Expecter) EndBatch() *Engine_EndBatch_Call { + return &Engine_EndBatch_Call{Call: _e.mock.On("EndBatch")} +} + +func (_c *Engine_EndBatch_Call) Run(run func()) *Engine_EndBatch_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Engine_EndBatch_Call) Return(err error) *Engine_EndBatch_Call { + _c.Call.Return(err) + return _c +} + +func (_c *Engine_EndBatch_Call) RunAndReturn(run func() error) *Engine_EndBatch_Call { + _c.Call.Return(run) + return _c +} + // Move provides a mock function for the type Engine func (_mock *Engine) Move(id string, parentid string, target string) error { ret := _mock.Called(id, parentid, target) @@ -376,6 +420,57 @@ func (_c *Engine_Search_Call) RunAndReturn(run func(ctx context.Context, req *v0 return _c } +// StartBatch provides a mock function for the type Engine +func (_mock *Engine) StartBatch(batchSize int) error { + ret := _mock.Called(batchSize) + + if len(ret) == 0 { + panic("no return value specified for StartBatch") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(int) error); ok { + r0 = returnFunc(batchSize) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// Engine_StartBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StartBatch' +type Engine_StartBatch_Call struct { + *mock.Call +} + +// StartBatch is a helper method to define mock.On call +// - batchSize int +func (_e *Engine_Expecter) StartBatch(batchSize interface{}) *Engine_StartBatch_Call { + return &Engine_StartBatch_Call{Call: _e.mock.On("StartBatch", batchSize)} +} + +func (_c *Engine_StartBatch_Call) Run(run func(batchSize int)) *Engine_StartBatch_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 int + if args[0] != nil { + arg0 = args[0].(int) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *Engine_StartBatch_Call) Return(err error) *Engine_StartBatch_Call { + _c.Call.Return(err) + return _c +} + +func (_c *Engine_StartBatch_Call) RunAndReturn(run func(batchSize int) error) *Engine_StartBatch_Call { + _c.Call.Return(run) + return _c +} + // Upsert provides a mock function for the type Engine func (_mock *Engine) Upsert(id string, r engine.Resource) error { ret := _mock.Called(id, r) diff --git a/services/search/pkg/search/service.go b/services/search/pkg/search/service.go index e4a2769b32..25182d2ae5 100644 --- a/services/search/pkg/search/service.go +++ b/services/search/pkg/search/service.go @@ -41,6 +41,7 @@ const ( _spaceTypeProject = "project" _spaceTypeGrant = "grant" _slowQueryDuration = 500 * time.Millisecond + _batchSize = 500 ) // Searcher is the interface to the SearchService @@ -459,6 +460,12 @@ func (s *Service) IndexSpace(spaceID *provider.StorageSpaceId) error { }() w := walker.NewWalker(s.gatewaySelector) + s.engine.StartBatch(_batchSize) + defer func() { + if err := s.engine.EndBatch(); err != nil { + s.logger.Error().Err(err).Msg("failed to end batch") + } + }() err = w.Walk(ownerCtx, &rootID, func(wd string, info *provider.ResourceInfo, err error) error { if err != nil { s.logger.Error().Err(err).Msg("error walking the tree") diff --git a/services/search/pkg/service/grpc/v0/service.go b/services/search/pkg/service/grpc/v0/service.go index a89efd4dac..b12ca383fd 100644 --- a/services/search/pkg/service/grpc/v0/service.go +++ b/services/search/pkg/service/grpc/v0/service.go @@ -53,7 +53,7 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) _ = idx.Close() } - eng = engine.NewBleveEngine(idx, bleve.DefaultCreator) + eng = engine.NewBleveEngine(idx, bleve.DefaultCreator, logger) default: return nil, teardown, fmt.Errorf("unknown search engine: %s", cfg.Engine.Type) } From 7a7d148dcf569f9b74aa9e8714be4afc81081ef9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Mon, 4 Aug 2025 15:39:27 +0200 Subject: [PATCH 2/3] Fix tests --- services/search/pkg/search/service_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/search/pkg/search/service_test.go b/services/search/pkg/search/service_test.go index c6f8eb0ef9..e324c6c8a7 100644 --- a/services/search/pkg/search/service_test.go +++ b/services/search/pkg/search/service_test.go @@ -122,6 +122,8 @@ var _ = Describe("Searchprovider", func() { User: user, }, nil) extractor.On("Extract", mock.Anything, mock.Anything, mock.Anything).Return(content.Document{}, nil) + indexClient.On("StartBatch", mock.Anything, mock.Anything).Return(nil) + indexClient.On("EndBatch", mock.Anything, mock.Anything).Return(nil) indexClient.On("Upsert", mock.Anything, mock.Anything).Return(nil) indexClient.On("Search", mock.Anything, mock.Anything).Return(&searchsvc.SearchIndexResponse{}, nil) gatewayClient.On("Stat", mock.Anything, mock.Anything).Return(&sprovider.StatResponse{ From 7c59e57d436a3ae0fa6305503595c705379915e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Mon, 4 Aug 2025 16:14:56 +0200 Subject: [PATCH 3/3] Make batch size configurable --- services/search/pkg/config/config.go | 1 + services/search/pkg/config/defaults/defaultconfig.go | 1 + services/search/pkg/search/service.go | 7 +++++-- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/services/search/pkg/config/config.go b/services/search/pkg/config/config.go index 3d27ee7e4c..c00196b5f0 100644 --- a/services/search/pkg/config/config.go +++ b/services/search/pkg/config/config.go @@ -28,6 +28,7 @@ type Config struct { Engine Engine `yaml:"engine"` Extractor Extractor `yaml:"extractor"` ContentExtractionSizeLimit uint64 `yaml:"content_extraction_size_limit" env:"SEARCH_CONTENT_EXTRACTION_SIZE_LIMIT" desc:"Maximum file size in bytes that is allowed for content extraction." introductionVersion:"1.0.0"` + BatchSize int `yaml:"batch_size" env:"SEARCH_BATCH_SIZE" desc:"The number of documents to process in a single batch. Defaults to 500." introductionVersion:"1.0.0"` ServiceAccount ServiceAccount `yaml:"service_account"` diff --git a/services/search/pkg/config/defaults/defaultconfig.go b/services/search/pkg/config/defaults/defaultconfig.go index ba9aefaf1d..ff3bc07113 100644 --- a/services/search/pkg/config/defaults/defaultconfig.go +++ b/services/search/pkg/config/defaults/defaultconfig.go @@ -58,6 +58,7 @@ func DefaultConfig() *config.Config { AckWait: 1 * time.Minute, }, ContentExtractionSizeLimit: 20 * 1024 * 1024, // Limit content extraction to <20MB files by default + BatchSize: 500, } } diff --git a/services/search/pkg/search/service.go b/services/search/pkg/search/service.go index 25182d2ae5..673d94ec77 100644 --- a/services/search/pkg/search/service.go +++ b/services/search/pkg/search/service.go @@ -41,7 +41,6 @@ const ( _spaceTypeProject = "project" _spaceTypeGrant = "grant" _slowQueryDuration = 500 * time.Millisecond - _batchSize = 500 ) // Searcher is the interface to the SearchService @@ -65,6 +64,8 @@ type Service struct { serviceAccountID string serviceAccountSecret string + + batchSize int } var errSkipSpace error @@ -80,6 +81,8 @@ func NewService(gatewaySelector pool.Selectable[gateway.GatewayAPIClient], eng e serviceAccountID: cfg.ServiceAccount.ServiceAccountID, serviceAccountSecret: cfg.ServiceAccount.ServiceAccountSecret, + + batchSize: cfg.BatchSize, } return s @@ -460,7 +463,7 @@ func (s *Service) IndexSpace(spaceID *provider.StorageSpaceId) error { }() w := walker.NewWalker(s.gatewaySelector) - s.engine.StartBatch(_batchSize) + s.engine.StartBatch(s.batchSize) defer func() { if err := s.engine.EndBatch(); err != nil { s.logger.Error().Err(err).Msg("failed to end batch")