From d540c4405d5ae96ebc06fa03b848169df308ad19 Mon Sep 17 00:00:00 2001 From: Andre Duffeck Date: Wed, 4 May 2022 13:15:28 +0200 Subject: [PATCH] [full-ci] Fix searching shares (#3668) * Be more robust when limiting the search to a directory in the space * Add more debug logs * Fix searching in shares * Delay indexing by a second so that everything has time to settle Indexing immediately when the event arrives sometimes causes issues trying to stat the changed resource. * Pick up the machine auth secret from the default config * Adapt to the resourceid refactoring in reva * Fix unit tests --- .../pkg/config/defaults/defaultconfig.go | 2 +- extensions/search/pkg/search/index/index.go | 2 +- .../search/pkg/search/provider/events.go | 128 ++++++++++++++++ .../pkg/search/provider/searchprovider.go | 144 +++--------------- .../search/provider/searchprovider_test.go | 37 ++--- 5 files changed, 166 insertions(+), 147 deletions(-) create mode 100644 extensions/search/pkg/search/provider/events.go diff --git a/extensions/search/pkg/config/defaults/defaultconfig.go b/extensions/search/pkg/config/defaults/defaultconfig.go index c3c40a7fea..62aadc787f 100644 --- a/extensions/search/pkg/config/defaults/defaultconfig.go +++ b/extensions/search/pkg/config/defaults/defaultconfig.go @@ -37,7 +37,7 @@ func DefaultConfig() *config.Config { Cluster: "ocis-cluster", ConsumerGroup: "search", }, - MachineAuthAPIKey: "change-me-please", + MachineAuthAPIKey: "", } } diff --git a/extensions/search/pkg/search/index/index.go b/extensions/search/pkg/search/index/index.go index 8cba7f7a6c..6d82884427 100644 --- a/extensions/search/pkg/search/index/index.go +++ b/extensions/search/pkg/search/index/index.go @@ -207,7 +207,7 @@ func (i *Index) Search(ctx context.Context, req *searchsvc.SearchIndexRequest) ( bleve.NewQueryStringQuery("Name:"+req.Query), deletedQuery, // Skip documents that have been marked as deleted bleve.NewQueryStringQuery("RootID:"+req.Ref.ResourceId.StorageId+"!"+req.Ref.ResourceId.OpaqueId), // Limit search to the space - bleve.NewQueryStringQuery("Path:"+req.Ref.Path+"*"), // Limit search to this directory in the space + bleve.NewQueryStringQuery("Path:"+utils.MakeRelativePath(path.Join(req.Ref.Path, "/"))+"*"), // Limit search to this directory in the space ) bleveReq := bleve.NewSearchRequest(query) bleveReq.Size = 200 diff --git a/extensions/search/pkg/search/provider/events.go b/extensions/search/pkg/search/provider/events.go new file mode 100644 index 0000000000..9060676070 --- /dev/null +++ b/extensions/search/pkg/search/provider/events.go @@ -0,0 +1,128 @@ +package provider + +import ( + "context" + + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" + "github.com/cs3org/reva/v2/pkg/events" + "google.golang.org/grpc/metadata" +) + +func (p *Provider) handleEvent(ev interface{}) { + var ref *provider.Reference + var owner *user.User + switch e := ev.(type) { + case events.ItemTrashed: + p.logger.Debug().Interface("event", ev).Msg("marking document as deleted") + err := p.indexClient.Delete(e.ID) + if err != nil { + p.logger.Error().Err(err).Interface("Id", e.ID).Msg("failed to remove item from index") + } + return + case events.ItemRestored: + p.logger.Debug().Interface("event", ev).Msg("marking document as restored") + ref = e.Ref + owner = &user.User{ + Id: e.Executant, + } + + statRes, err := p.statResource(ref, owner) + if err != nil { + p.logger.Error().Err(err).Msg("failed to stat the changed resource") + return + } + + switch statRes.Status.Code { + case rpc.Code_CODE_OK: + err = p.indexClient.Restore(statRes.Info.Id) + if err != nil { + p.logger.Error().Err(err).Msg("failed to restore the changed resource in the index") + } + default: + p.logger.Error().Interface("statRes", statRes).Msg("failed to stat the changed resource") + } + + return + case events.ItemMoved: + p.logger.Debug().Interface("event", ev).Msg("resource has been moved, updating the document") + ref = e.Ref + owner = &user.User{ + Id: e.Executant, + } + + statRes, err := p.statResource(ref, owner) + if err != nil { + p.logger.Error().Err(err).Msg("failed to stat the changed resource") + return + } + + switch statRes.Status.Code { + case rpc.Code_CODE_OK: + err = p.indexClient.Move(statRes.Info) + if err != nil { + p.logger.Error().Err(err).Msg("failed to restore the changed resource in the index") + } + default: + p.logger.Error().Interface("statRes", statRes).Msg("failed to stat the changed resource") + } + + return + case events.ContainerCreated: + ref = e.Ref + owner = &user.User{ + Id: e.Executant, + } + case events.FileUploaded: + ref = e.Ref + owner = &user.User{ + Id: e.Executant, + } + case events.FileVersionRestored: + ref = e.Ref + owner = &user.User{ + Id: e.Executant, + } + default: + // Not sure what to do here. Skip. + return + } + p.logger.Debug().Interface("event", ev).Msg("resource has been changed, updating the document") + + statRes, err := p.statResource(ref, owner) + if err != nil { + p.logger.Error().Err(err).Msg("failed to stat the changed resource") + return + } + if statRes.Status.Code != rpc.Code_CODE_OK { + p.logger.Error().Interface("statRes", statRes).Msg("failed to stat the changed resource") + return + } + + err = p.indexClient.Add(ref, statRes.Info) + if err != nil { + p.logger.Error().Err(err).Msg("error adding updating the resource in the index") + } else { + p.logDocCount() + } +} + +func (p *Provider) statResource(ref *provider.Reference, owner *user.User) (*provider.StatResponse, error) { + // Get auth + ownerCtx := ctxpkg.ContextSetUser(context.Background(), owner) + authRes, err := p.gwClient.Authenticate(ownerCtx, &gateway.AuthenticateRequest{ + Type: "machine", + ClientId: "userid:" + owner.Id.OpaqueId, + ClientSecret: p.machineAuthAPIKey, + }) + if err != nil || authRes.GetStatus().GetCode() != rpc.Code_CODE_OK { + p.logger.Error().Err(err).Interface("authRes", authRes).Msg("error using machine auth") + } + ownerCtx = metadata.AppendToOutgoingContext(ownerCtx, ctxpkg.TokenHeader, authRes.Token) + + // Stat changed resource resource + return p.gwClient.Stat(ownerCtx, &provider.StatRequest{Ref: ref}) +} diff --git a/extensions/search/pkg/search/provider/searchprovider.go b/extensions/search/pkg/search/provider/searchprovider.go index 3aa2f6cda1..58865f4c0a 100644 --- a/extensions/search/pkg/search/provider/searchprovider.go +++ b/extensions/search/pkg/search/provider/searchprovider.go @@ -5,17 +5,17 @@ import ( "fmt" "path/filepath" "strings" + "time" gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" - "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage/utils/walker" + "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" "github.com/owncloud/ocis/extensions/search/pkg/search" "github.com/owncloud/ocis/ocis-pkg/log" @@ -43,138 +43,28 @@ func New(gwClient gateway.GatewayAPIClient, indexClient search.IndexClient, mach go func() { for { ev := <-eventsChan - var ref *provider.Reference - var owner *user.User - switch e := ev.(type) { - case events.ItemTrashed: - err := p.indexClient.Delete(e.ID) - if err != nil { - p.logger.Error().Err(err).Interface("Id", e.ID).Msg("failed to remove item from index") - } - continue - case events.ItemRestored: - ref = e.Ref - owner = &user.User{ - Id: e.Executant, - } - - statRes, err := p.statResource(ref, owner) - if err != nil { - p.logger.Error().Err(err).Msg("failed to stat the changed resource") - } - - switch statRes.Status.Code { - case rpc.Code_CODE_OK: - err = p.indexClient.Restore(statRes.Info.Id) - if err != nil { - p.logger.Error().Err(err).Msg("failed to restore the changed resource in the index") - } - default: - p.logger.Error().Interface("statRes", statRes).Msg("failed to stat the changed resource") - } - - continue - case events.ItemMoved: - ref = e.Ref - owner = &user.User{ - Id: e.Executant, - } - - statRes, err := p.statResource(ref, owner) - if err != nil { - p.logger.Error().Err(err).Msg("failed to stat the changed resource") - } - - switch statRes.Status.Code { - case rpc.Code_CODE_OK: - err = p.indexClient.Move(statRes.Info) - if err != nil { - p.logger.Error().Err(err).Msg("failed to restore the changed resource in the index") - } - default: - p.logger.Error().Interface("statRes", statRes).Msg("failed to stat the changed resource") - } - - continue - case events.ContainerCreated: - ref = e.Ref - owner = &user.User{ - Id: e.Executant, - } - case events.FileUploaded: - ref = e.Ref - owner = &user.User{ - Id: e.Executant, - } - case events.FileVersionRestored: - ref = e.Ref - owner = &user.User{ - Id: e.Executant, - } - default: - // Not sure what to do here. Skip. - continue - } - - statRes, err := p.statResource(ref, owner) - if err != nil { - p.logger.Error().Err(err).Msg("failed to stat the changed resource") - } - - switch statRes.Status.Code { - case rpc.Code_CODE_OK: - err = p.indexClient.Add(ref, statRes.Info) - if err != nil { - p.logger.Error().Err(err).Msg("error adding updating the resource in the index") - } else { - p.logDocCount() - } - default: - p.logger.Error().Interface("statRes", statRes).Msg("failed to stat the changed resource") - } + go func() { + time.Sleep(1 * time.Second) // Give some time to let everything settle down before trying to access it when indexing + p.handleEvent(ev) + }() } }() return p } -func (p *Provider) statResource(ref *provider.Reference, owner *user.User) (*provider.StatResponse, error) { - // Get auth - ownerCtx := ctxpkg.ContextSetUser(context.Background(), owner) - authRes, err := p.gwClient.Authenticate(ownerCtx, &gateway.AuthenticateRequest{ - Type: "machine", - ClientId: "userid:" + owner.Id.OpaqueId, - ClientSecret: p.machineAuthAPIKey, - }) - if err != nil || authRes.GetStatus().GetCode() != rpc.Code_CODE_OK { - p.logger.Error().Err(err).Interface("authRes", authRes).Msg("error using machine auth") - } - ownerCtx = metadata.AppendToOutgoingContext(ownerCtx, ctxpkg.TokenHeader, authRes.Token) - - // Stat changed resource resource - return p.gwClient.Stat(ownerCtx, &provider.StatRequest{Ref: ref}) -} - -func (p *Provider) logDocCount() { - c, err := p.indexClient.DocCount() - if err != nil { - p.logger.Error().Err(err).Msg("error getting document count from the index") - } - p.logger.Debug().Interface("count", c).Msg("new document count") -} - func (p *Provider) Search(ctx context.Context, req *searchsvc.SearchRequest) (*searchsvc.SearchResponse, error) { if req.Query == "" { return nil, errtypes.PreconditionFailed("empty query provided") } listSpacesRes, err := p.gwClient.ListStorageSpaces(ctx, &provider.ListStorageSpacesRequest{ - Opaque: &typesv1beta1.Opaque{Map: map[string]*typesv1beta1.OpaqueEntry{ - "path": { - Decoder: "plain", - Value: []byte("/"), + Filters: []*provider.ListStorageSpacesRequest_Filter{ + { + Type: provider.ListStorageSpacesRequest_Filter_TYPE_SPACE_TYPE, + Term: &provider.ListStorageSpacesRequest_Filter_SpaceType{SpaceType: "+grant"}, }, - }}, + }, }) if err != nil { return nil, err @@ -196,12 +86,14 @@ func (p *Provider) Search(ctx context.Context, req *searchsvc.SearchRequest) (*s pathPrefix = utils.MakeRelativePath(gpRes.Path) } + _, rootStorageID := storagespace.SplitStorageID(space.Root.StorageId) + res, err := p.indexClient.Search(ctx, &searchsvc.SearchIndexRequest{ Query: req.Query, Ref: &searchmsg.Reference{ ResourceId: &searchmsg.ResourceID{ StorageId: space.Root.StorageId, - OpaqueId: space.Root.OpaqueId, + OpaqueId: rootStorageID, }, Path: pathPrefix, }, @@ -276,3 +168,11 @@ func (p *Provider) IndexSpace(ctx context.Context, req *searchsvc.IndexSpaceRequ p.logDocCount() return &searchsvc.IndexSpaceResponse{}, nil } + +func (p *Provider) logDocCount() { + c, err := p.indexClient.DocCount() + if err != nil { + p.logger.Error().Err(err).Msg("error getting document count from the index") + } + p.logger.Debug().Interface("count", c).Msg("new document count") +} diff --git a/extensions/search/pkg/search/provider/searchprovider_test.go b/extensions/search/pkg/search/provider/searchprovider_test.go index f6ea0de395..183501e5e4 100644 --- a/extensions/search/pkg/search/provider/searchprovider_test.go +++ b/extensions/search/pkg/search/provider/searchprovider_test.go @@ -51,7 +51,7 @@ var _ = Describe("Searchprovider", func() { }, }, Id: &sprovider.StorageSpaceId{OpaqueId: "personalspace"}, - Root: &sprovider.ResourceId{OpaqueId: "personalspaceroot"}, + Root: &sprovider.ResourceId{StorageId: "storageid", OpaqueId: "storageid"}, Name: "personalspace", } @@ -113,7 +113,7 @@ var _ = Describe("Searchprovider", func() { Eventually(func() bool { return called - }).Should(BeTrue()) + }, "2s").Should(BeTrue()) }) It("removes an entry from the index when the file has been deleted", func() { @@ -135,7 +135,7 @@ var _ = Describe("Searchprovider", func() { Eventually(func() bool { return called - }).Should(BeTrue()) + }, "2s").Should(BeTrue()) }) It("indexes items when they are being restored", func() { @@ -152,7 +152,7 @@ var _ = Describe("Searchprovider", func() { Eventually(func() bool { return called - }).Should(BeTrue()) + }, "2s").Should(BeTrue()) }) It("indexes items when a version has been restored", func() { @@ -169,12 +169,12 @@ var _ = Describe("Searchprovider", func() { Eventually(func() bool { return called - }).Should(BeTrue()) + }, "2s").Should(BeTrue()) }) It("indexes items when they are being moved", func() { called := false - indexClient.On("Move", mock.Anything, mock.MatchedBy(func(riToIndex *sprovider.ResourceInfo) bool { + indexClient.On("Move", mock.MatchedBy(func(riToIndex *sprovider.ResourceInfo) bool { return riToIndex.Id.OpaqueId == ri.Id.OpaqueId })).Return(nil).Run(func(args mock.Arguments) { called = true @@ -186,7 +186,7 @@ var _ = Describe("Searchprovider", func() { Eventually(func() bool { return called - }).Should(BeTrue()) + }, "2s").Should(BeTrue()) }) }) @@ -220,10 +220,7 @@ var _ = Describe("Searchprovider", func() { Context("with a personal space", func() { BeforeEach(func() { - gwClient.On("ListStorageSpaces", mock.Anything, mock.MatchedBy(func(req *sprovider.ListStorageSpacesRequest) bool { - p := string(req.Opaque.Map["path"].Value) - return p == "/" - })).Return(&sprovider.ListStorageSpacesResponse{ + gwClient.On("ListStorageSpaces", mock.Anything, mock.Anything).Return(&sprovider.ListStorageSpacesResponse{ Status: status.NewOK(ctx), StorageSpaces: []*sprovider.StorageSpace{personalSpace}, }, nil) @@ -278,7 +275,7 @@ var _ = Describe("Searchprovider", func() { SpaceType: "grant", Owner: otherUser, Id: &sprovider.StorageSpaceId{OpaqueId: "otherspaceroot!otherspacegrant"}, - Root: &sprovider.ResourceId{StorageId: "otherspaceroot", OpaqueId: "otherspacegrant"}, + Root: &sprovider.ResourceId{StorageId: "otherspaceroot", OpaqueId: "otherspaceroot"}, Name: "grantspace", } gwClient.On("GetPath", mock.Anything, mock.Anything).Return(&sprovider.GetPathResponse{ @@ -288,10 +285,7 @@ var _ = Describe("Searchprovider", func() { }) It("searches the received spaces (grants)", func() { - gwClient.On("ListStorageSpaces", mock.Anything, mock.MatchedBy(func(req *sprovider.ListStorageSpacesRequest) bool { - p := string(req.Opaque.Map["path"].Value) - return p == "/" - })).Return(&sprovider.ListStorageSpacesResponse{ + gwClient.On("ListStorageSpaces", mock.Anything, mock.Anything).Return(&sprovider.ListStorageSpacesResponse{ Status: status.NewOK(ctx), StorageSpaces: []*sprovider.StorageSpace{grantSpace}, }, nil) @@ -329,20 +323,17 @@ var _ = Describe("Searchprovider", func() { Expect(match.Entity.Ref.Path).To(Equal("./to/Shared.pdf")) indexClient.AssertCalled(GinkgoT(), "Search", mock.Anything, mock.MatchedBy(func(req *searchsvc.SearchIndexRequest) bool { - return req.Query == "foo" && req.Ref.ResourceId.OpaqueId == grantSpace.Root.OpaqueId && req.Ref.Path == "./grant/path" + return req.Query == "foo" && req.Ref.ResourceId.StorageId == grantSpace.Root.StorageId && req.Ref.Path == "./grant/path" })) }) It("finds matches in both the personal space AND the grant", func() { - gwClient.On("ListStorageSpaces", mock.Anything, mock.MatchedBy(func(req *sprovider.ListStorageSpacesRequest) bool { - p := string(req.Opaque.Map["path"].Value) - return p == "/" - })).Return(&sprovider.ListStorageSpacesResponse{ + gwClient.On("ListStorageSpaces", mock.Anything, mock.Anything).Return(&sprovider.ListStorageSpacesResponse{ Status: status.NewOK(ctx), StorageSpaces: []*sprovider.StorageSpace{personalSpace, grantSpace}, }, nil) indexClient.On("Search", mock.Anything, mock.MatchedBy(func(req *searchsvc.SearchIndexRequest) bool { - return req.Ref.ResourceId.OpaqueId == grantSpace.Root.OpaqueId + return req.Ref.ResourceId.StorageId == grantSpace.Root.StorageId })).Return(&searchsvc.SearchIndexResponse{ Matches: []*searchmsg.Match{ { @@ -364,7 +355,7 @@ var _ = Describe("Searchprovider", func() { }, }, nil) indexClient.On("Search", mock.Anything, mock.MatchedBy(func(req *searchsvc.SearchIndexRequest) bool { - return req.Ref.ResourceId.OpaqueId == personalSpace.Root.OpaqueId + return req.Ref.ResourceId.StorageId == personalSpace.Root.StorageId })).Return(&searchsvc.SearchIndexResponse{ Matches: []*searchmsg.Match{ {