From 3c8e2dacfd8261d6eed50aeeb631a9f1c7c29ac4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Mon, 4 Aug 2025 10:53:31 +0200 Subject: [PATCH] Improve indexing performance using batches --- services/search/pkg/engine/bleve.go | 84 +++++++++++++++- services/search/pkg/engine/bleve_test.go | 66 ++++++++++++- services/search/pkg/engine/engine.go | 3 + services/search/pkg/engine/mocks/engine.go | 95 +++++++++++++++++++ services/search/pkg/search/service.go | 7 ++ .../search/pkg/service/grpc/v0/service.go | 2 +- 6 files changed, 250 insertions(+), 7 deletions(-) diff --git a/services/search/pkg/engine/bleve.go b/services/search/pkg/engine/bleve.go index 37e9eff426..8976cbdf79 100644 --- a/services/search/pkg/engine/bleve.go +++ b/services/search/pkg/engine/bleve.go @@ -8,6 +8,7 @@ import ( "path/filepath" "reflect" "strings" + "sync" "time" "github.com/blevesearch/bleve/v2" @@ -20,10 +21,11 @@ import ( "github.com/blevesearch/bleve/v2/mapping" "github.com/blevesearch/bleve/v2/search/query" storageProvider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + libregraph "github.com/opencloud-eu/libre-graph-api-go" + "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/reva/v2/pkg/errtypes" "github.com/opencloud-eu/reva/v2/pkg/storagespace" "github.com/opencloud-eu/reva/v2/pkg/utils" - libregraph "github.com/opencloud-eu/libre-graph-api-go" "google.golang.org/protobuf/types/known/timestamppb" searchMessage "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/search/v0" @@ -32,10 +34,16 @@ import ( searchQuery "github.com/opencloud-eu/opencloud/services/search/pkg/query" ) +const _batchSize = 500 + // Bleve represents a search engine which utilizes bleve to search and store resources. type Bleve struct { index bleve.Index queryCreator searchQuery.Creator[query.Query] + batch *bleve.Batch + batchSize int + m sync.Mutex // batch operations in bleve are not thread-safe + log log.Logger } // NewBleveIndex returns a new bleve index @@ -60,10 +68,11 @@ func NewBleveIndex(root string) (bleve.Index, error) { } // NewBleveEngine creates a new Bleve instance -func NewBleveEngine(index bleve.Index, queryCreator searchQuery.Creator[query.Query]) *Bleve { +func NewBleveEngine(index bleve.Index, queryCreator searchQuery.Creator[query.Query], log log.Logger) *Bleve { return &Bleve{ index: index, queryCreator: queryCreator, + log: log, } } @@ -233,8 +242,60 @@ func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexReques }, nil } +func (b *Bleve) StartBatch(batchSize int) error { + b.m.Lock() + defer b.m.Unlock() + + if batchSize <= 0 { + return errors.New("batch size must be greater than 0") + } + + if b.batch != nil { + b.log.Debug().Msg("reusing another batch that has already been started") + return nil + } + + b.log.Debug().Msg("Starting new batch") + b.batch = b.index.NewBatch() + b.batchSize = batchSize + return nil +} + +func (b *Bleve) EndBatch() error { + b.m.Lock() + defer b.m.Unlock() + + if b.batch == nil { + return errors.New("no batch started") + } + + b.log.Debug().Int("size", b.batch.Size()).Msg("Ending batch") + if err := b.index.Batch(b.batch); err != nil { + return err + } + + b.batch = nil + return nil +} + // Upsert indexes or stores Resource data fields. func (b *Bleve) Upsert(id string, r Resource) error { + b.m.Lock() + defer b.m.Unlock() + + if b.batch != nil { + if err := b.batch.Index(id, r); err != nil { + return err + } + if b.batch.Size() >= b.batchSize { + b.log.Debug().Int("size", b.batch.Size()).Msg("Committing batch") + if err := b.index.Batch(b.batch); err != nil { + return err + } + b.batch = b.index.NewBatch() + } + return nil + } return b.index.Index(id, r) } @@ -298,6 +359,19 @@ func (b *Bleve) Restore(id string) error { // Purge removes a resource from the index, irreversible operation. func (b *Bleve) Purge(id string) error { + b.m.Lock() + defer b.m.Unlock() + + if b.batch != nil { + b.batch.Delete(id) + if b.batch.Size() >= b.batchSize { + if err := b.index.Batch(b.batch); err != nil { + return err + } + b.batch = b.index.NewBatch() + } + return nil + } return b.index.Delete(id) } @@ -452,7 +526,7 @@ func (b *Bleve) updateEntity(id string, mutateFunc func(r *Resource)) (*Resource mutateFunc(it) - return it, b.index.Index(it.ID, it) + return it, b.Upsert(id, *it) } func (b *Bleve) setDeleted(id string, deleted bool) error { @@ -468,6 +542,7 @@ func (b *Bleve) setDeleted(id string, deleted bool) error { bleve.NewQueryStringQuery("RootID:"+it.RootID), bleve.NewQueryStringQuery("Path:"+escapeQuery(it.Path+"/*")), ) + bleveReq := bleve.NewSearchRequest(q) bleveReq.Size = math.MaxInt bleveReq.Fields = []string{"*"} @@ -476,6 +551,8 @@ func (b *Bleve) setDeleted(id string, deleted bool) error { return err } + b.StartBatch(_batchSize) + defer b.EndBatch() for _, h := range res.Hits { _, err := b.updateEntity(h.ID, func(r *Resource) { r.Deleted = deleted @@ -484,6 +561,7 @@ func (b *Bleve) setDeleted(id string, deleted bool) error { return err } } + b.EndBatch() } return nil diff --git a/services/search/pkg/engine/bleve_test.go b/services/search/pkg/engine/bleve_test.go index 78039a68f7..05974d4ef7 100644 --- a/services/search/pkg/engine/bleve_test.go +++ b/services/search/pkg/engine/bleve_test.go @@ -8,9 +8,10 @@ import ( sprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/opencloud-eu/reva/v2/pkg/storagespace" libregraph "github.com/opencloud-eu/libre-graph-api-go" + "github.com/opencloud-eu/reva/v2/pkg/storagespace" + "github.com/opencloud-eu/opencloud/pkg/log" searchmsg "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/search/v0" searchsvc "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/services/search/v0" "github.com/opencloud-eu/opencloud/services/search/pkg/content" @@ -53,6 +54,7 @@ var _ = Describe("Bleve", func() { rootResource engine.Resource parentResource engine.Resource childResource engine.Resource + childResource2 engine.Resource ) BeforeEach(func() { @@ -62,7 +64,7 @@ var _ = Describe("Bleve", func() { idx, err = bleveSearch.NewMemOnly(mapping) Expect(err).ToNot(HaveOccurred()) - eng = engine.NewBleveEngine(idx, bleve.DefaultCreator) + eng = engine.NewBleveEngine(idx, bleve.DefaultCreator, log.Logger{}) Expect(err).ToNot(HaveOccurred()) rootResource = engine.Resource{ @@ -89,11 +91,20 @@ var _ = Describe("Bleve", func() { Type: uint64(sprovider.ResourceType_RESOURCE_TYPE_FILE), Document: content.Document{Name: "child.pdf"}, } + + childResource2 = engine.Resource{ + ID: "1$2!5", + ParentID: parentResource.ID, + RootID: rootResource.ID, + Path: "./parent d!r/child2.pdf", + Type: uint64(sprovider.ResourceType_RESOURCE_TYPE_FILE), + Document: content.Document{Name: "child2.pdf"}, + } }) Describe("New", func() { It("returns a new index instance", func() { - b := engine.NewBleveEngine(idx, bleve.DefaultCreator) + b := engine.NewBleveEngine(idx, bleve.DefaultCreator, log.Logger{}) Expect(b).ToNot(BeNil()) }) }) @@ -486,6 +497,55 @@ var _ = Describe("Bleve", func() { }) }) + Describe("StartBatch", func() { + It("starts a new batch", func() { + err := eng.StartBatch(100) + Expect(err).ToNot(HaveOccurred()) + + err = eng.Upsert(childResource.ID, childResource) + Expect(err).ToNot(HaveOccurred()) + + count, err := idx.DocCount() + Expect(err).ToNot(HaveOccurred()) + Expect(count).To(Equal(uint64(0))) + + err = eng.EndBatch() + Expect(err).ToNot(HaveOccurred()) + + count, err = idx.DocCount() + Expect(err).ToNot(HaveOccurred()) + Expect(count).To(Equal(uint64(1))) + + query := bleveSearch.NewMatchQuery("child.pdf") + res, err := idx.Search(bleveSearch.NewSearchRequest(query)) + Expect(err).ToNot(HaveOccurred()) + Expect(res.Hits.Len()).To(Equal(1)) + }) + + It("doesn't overwrite batches that are already in progress", func() { + err := eng.StartBatch(100) + Expect(err).ToNot(HaveOccurred()) + + err = eng.Upsert(childResource.ID, childResource) + Expect(err).ToNot(HaveOccurred()) + + count, err := idx.DocCount() + Expect(err).ToNot(HaveOccurred()) + Expect(count).To(Equal(uint64(0))) + + err = eng.StartBatch(100) + Expect(err).ToNot(HaveOccurred()) + + err = eng.Upsert(childResource2.ID, childResource2) + Expect(err).ToNot(HaveOccurred()) + + Expect(eng.EndBatch()).To(Succeed()) + count, err = idx.DocCount() + Expect(err).ToNot(HaveOccurred()) + Expect(count).To(Equal(uint64(2))) + }) + }) + Describe("File type specific metadata", func() { Context("with audio metadata", func() { diff --git a/services/search/pkg/engine/engine.go b/services/search/pkg/engine/engine.go index f61f060cfe..1553662f34 100644 --- a/services/search/pkg/engine/engine.go +++ b/services/search/pkg/engine/engine.go @@ -23,6 +23,9 @@ type Engine interface { Restore(id string) error Purge(id string) error DocCount() (uint64, error) + + StartBatch(batchSize int) error + EndBatch() error } // Resource is the entity that is stored in the index. diff --git a/services/search/pkg/engine/mocks/engine.go b/services/search/pkg/engine/mocks/engine.go index 5159c0e0d6..533f3bccf2 100644 --- a/services/search/pkg/engine/mocks/engine.go +++ b/services/search/pkg/engine/mocks/engine.go @@ -143,6 +143,50 @@ func (_c *Engine_DocCount_Call) RunAndReturn(run func() (uint64, error)) *Engine return _c } +// EndBatch provides a mock function for the type Engine +func (_mock *Engine) EndBatch() error { + ret := _mock.Called() + + if len(ret) == 0 { + panic("no return value specified for EndBatch") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func() error); ok { + r0 = returnFunc() + } else { + r0 = ret.Error(0) + } + return r0 +} + +// Engine_EndBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EndBatch' +type Engine_EndBatch_Call struct { + *mock.Call +} + +// EndBatch is a helper method to define mock.On call +func (_e *Engine_Expecter) EndBatch() *Engine_EndBatch_Call { + return &Engine_EndBatch_Call{Call: _e.mock.On("EndBatch")} +} + +func (_c *Engine_EndBatch_Call) Run(run func()) *Engine_EndBatch_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Engine_EndBatch_Call) Return(err error) *Engine_EndBatch_Call { + _c.Call.Return(err) + return _c +} + +func (_c *Engine_EndBatch_Call) RunAndReturn(run func() error) *Engine_EndBatch_Call { + _c.Call.Return(run) + return _c +} + // Move provides a mock function for the type Engine func (_mock *Engine) Move(id string, parentid string, target string) error { ret := _mock.Called(id, parentid, target) @@ -376,6 +420,57 @@ func (_c *Engine_Search_Call) RunAndReturn(run func(ctx context.Context, req *v0 return _c } +// StartBatch provides a mock function for the type Engine +func (_mock *Engine) StartBatch(batchSize int) error { + ret := _mock.Called(batchSize) + + if len(ret) == 0 { + panic("no return value specified for StartBatch") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(int) error); ok { + r0 = returnFunc(batchSize) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// Engine_StartBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StartBatch' +type Engine_StartBatch_Call struct { + *mock.Call +} + +// StartBatch is a helper method to define mock.On call +// - batchSize int +func (_e *Engine_Expecter) StartBatch(batchSize interface{}) *Engine_StartBatch_Call { + return &Engine_StartBatch_Call{Call: _e.mock.On("StartBatch", batchSize)} +} + +func (_c *Engine_StartBatch_Call) Run(run func(batchSize int)) *Engine_StartBatch_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 int + if args[0] != nil { + arg0 = args[0].(int) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *Engine_StartBatch_Call) Return(err error) *Engine_StartBatch_Call { + _c.Call.Return(err) + return _c +} + +func (_c *Engine_StartBatch_Call) RunAndReturn(run func(batchSize int) error) *Engine_StartBatch_Call { + _c.Call.Return(run) + return _c +} + // Upsert provides a mock function for the type Engine func (_mock *Engine) Upsert(id string, r engine.Resource) error { ret := _mock.Called(id, r) diff --git a/services/search/pkg/search/service.go b/services/search/pkg/search/service.go index e4a2769b32..25182d2ae5 100644 --- a/services/search/pkg/search/service.go +++ b/services/search/pkg/search/service.go @@ -41,6 +41,7 @@ const ( _spaceTypeProject = "project" _spaceTypeGrant = "grant" _slowQueryDuration = 500 * time.Millisecond + _batchSize = 500 ) // Searcher is the interface to the SearchService @@ -459,6 +460,12 @@ func (s *Service) IndexSpace(spaceID *provider.StorageSpaceId) error { }() w := walker.NewWalker(s.gatewaySelector) + s.engine.StartBatch(_batchSize) + defer func() { + if err := s.engine.EndBatch(); err != nil { + s.logger.Error().Err(err).Msg("failed to end batch") + } + }() err = w.Walk(ownerCtx, &rootID, func(wd string, info *provider.ResourceInfo, err error) error { if err != nil { s.logger.Error().Err(err).Msg("error walking the tree") diff --git a/services/search/pkg/service/grpc/v0/service.go b/services/search/pkg/service/grpc/v0/service.go index a89efd4dac..b12ca383fd 100644 --- a/services/search/pkg/service/grpc/v0/service.go +++ b/services/search/pkg/service/grpc/v0/service.go @@ -53,7 +53,7 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) _ = idx.Close() } - eng = engine.NewBleveEngine(idx, bleve.DefaultCreator) + eng = engine.NewBleveEngine(idx, bleve.DefaultCreator, logger) default: return nil, teardown, fmt.Errorf("unknown search engine: %s", cfg.Engine.Type) }