diff --git a/extensions/search/pkg/config/config.go b/extensions/search/pkg/config/config.go index 706a09cd9b..788ceec85c 100644 --- a/extensions/search/pkg/config/config.go +++ b/extensions/search/pkg/config/config.go @@ -20,6 +20,16 @@ type Config struct { Reva Reva `ocisConfig:"reva"` TokenManager TokenManager `ocisConfig:"token_manager"` + Events Events `yaml:"events"` + + MachineAuthAPIKey string `yaml:"machine_auth_api_key" env:"OCIS_MACHINE_AUTH_API_KEY;SEARCH_MACHINE_AUTH_API_KEY"` Context context.Context `ocisConfig:"-" yaml:"-"` } + +// Events combines the configuration options for the event bus. +type Events struct { + Endpoint string `yaml:"events_endpoint" env:"SEARCH_EVENTS_ENDPOINT" desc:"the address of the streaming service"` + Cluster string `yaml:"events_cluster" env:"SEARCH_EVENTS_CLUSTER" desc:"the clusterID of the streaming service. Mandatory when using nats"` + ConsumerGroup string `yaml:"events_group" env:"SEARCH_EVENTS_GROUP" desc:"the customergroup of the service. One group will only get one copy of an event"` +} diff --git a/extensions/search/pkg/config/defaults/defaultconfig.go b/extensions/search/pkg/config/defaults/defaultconfig.go index 006c84d6c3..9f3368b0b4 100644 --- a/extensions/search/pkg/config/defaults/defaultconfig.go +++ b/extensions/search/pkg/config/defaults/defaultconfig.go @@ -23,6 +23,12 @@ func DefaultConfig() *config.Config { TokenManager: config.TokenManager{ JWTSecret: "Pive-Fumkiu4", }, + Events: config.Events{ + Endpoint: "127.0.0.1:9233", + Cluster: "ocis-cluster", + ConsumerGroup: "search", + }, + MachineAuthAPIKey: "change-me-please", } } diff --git a/extensions/search/pkg/search/index/index_test.go b/extensions/search/pkg/search/index/index_test.go index 3fe6fb2b85..0d54f42758 100644 --- a/extensions/search/pkg/search/index/index_test.go +++ b/extensions/search/pkg/search/index/index_test.go @@ -177,7 +177,7 @@ var _ = Describe("Index", func() { }) }) - Describe("Index", func() { + Describe("Add", func() { It("adds a resourceInfo to the index", func() { err := i.Add(ref, ri) Expect(err).ToNot(HaveOccurred()) diff --git a/extensions/search/pkg/search/mocks/IndexClient.go b/extensions/search/pkg/search/mocks/IndexClient.go index e8a62cecad..c29b8aa9f8 100644 --- a/extensions/search/pkg/search/mocks/IndexClient.go +++ b/extensions/search/pkg/search/mocks/IndexClient.go @@ -5,6 +5,7 @@ package mocks import ( context "context" + providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" mock "github.com/stretchr/testify/mock" v0 "github.com/owncloud/ocis/protogen/gen/ocis/services/search/v0" @@ -15,6 +16,20 @@ type IndexClient struct { mock.Mock } +// Add provides a mock function with given fields: ref, ri +func (_m *IndexClient) Add(ref *providerv1beta1.Reference, ri *providerv1beta1.ResourceInfo) error { + ret := _m.Called(ref, ri) + + var r0 error + if rf, ok := ret.Get(0).(func(*providerv1beta1.Reference, *providerv1beta1.ResourceInfo) error); ok { + r0 = rf(ref, ri) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Search provides a mock function with given fields: ctx, req func (_m *IndexClient) Search(ctx context.Context, req *v0.SearchIndexRequest) (*v0.SearchIndexResponse, error) { ret := _m.Called(ctx, req) diff --git a/extensions/search/pkg/search/provider/searchprovider.go b/extensions/search/pkg/search/provider/searchprovider.go index 8735188d5b..167f414185 100644 --- a/extensions/search/pkg/search/provider/searchprovider.go +++ b/extensions/search/pkg/search/provider/searchprovider.go @@ -5,26 +5,71 @@ import ( "strings" gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/utils" "github.com/owncloud/ocis/extensions/search/pkg/search" + "google.golang.org/grpc/metadata" searchmsg "github.com/owncloud/ocis/protogen/gen/ocis/messages/search/v0" searchsvc "github.com/owncloud/ocis/protogen/gen/ocis/services/search/v0" ) type Provider struct { - gwClient gateway.GatewayAPIClient - indexClient search.IndexClient + gwClient gateway.GatewayAPIClient + indexClient search.IndexClient + machineAuthAPIKey string } -func New(gwClient gateway.GatewayAPIClient, indexClient search.IndexClient) *Provider { +func New(gwClient gateway.GatewayAPIClient, indexClient search.IndexClient, machineAuthAPIKey string, eventsChan <-chan interface{}) *Provider { + go func() { + for { + ev := <-eventsChan + var ref *providerv1beta1.Reference + var owner *user.User + switch e := ev.(type) { + case events.FileUploaded: + ref = e.FileID + owner = &user.User{ + Id: e.Executant, + } + default: + // Not sure what to do here. Skip. + continue + } + + // Get auth + ownerCtx := ctxpkg.ContextSetUser(context.Background(), owner) + authRes, err := gwClient.Authenticate(ownerCtx, &gateway.AuthenticateRequest{ + Type: "machine", + ClientId: "userid:" + owner.Id.OpaqueId, + ClientSecret: machineAuthAPIKey, + }) + if err != nil || authRes.GetStatus().GetCode() != rpc.Code_CODE_OK { + // TODO: log error + } + ownerCtx = metadata.AppendToOutgoingContext(ownerCtx, ctxpkg.TokenHeader, authRes.Token) + + // Stat changed resource resource + statRes, err := gwClient.Stat(ownerCtx, &providerv1beta1.StatRequest{Ref: ref}) + if err != nil || statRes.Status.Code != rpc.Code_CODE_OK { + // TODO: log error + } + + indexClient.Add(ref, statRes.Info) + } + }() + return &Provider{ - gwClient: gwClient, - indexClient: indexClient, + gwClient: gwClient, + indexClient: indexClient, + machineAuthAPIKey: machineAuthAPIKey, } } diff --git a/extensions/search/pkg/search/provider/searchprovider_test.go b/extensions/search/pkg/search/provider/searchprovider_test.go index d883caf379..695dd143f3 100644 --- a/extensions/search/pkg/search/provider/searchprovider_test.go +++ b/extensions/search/pkg/search/provider/searchprovider_test.go @@ -7,9 +7,11 @@ import ( . "github.com/onsi/gomega" "github.com/stretchr/testify/mock" + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" sprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/rgrpc/status" cs3mocks "github.com/cs3org/reva/v2/tests/cs3mocks/mocks" "github.com/owncloud/ocis/extensions/search/pkg/search/mocks" @@ -24,7 +26,8 @@ var _ = Describe("Searchprovider", func() { gwClient *cs3mocks.GatewayAPIClient indexClient *mocks.IndexClient - ctx context.Context + ctx context.Context + eventsChan chan interface{} otherUser = &userv1beta1.User{ Id: &userv1beta1.UserId{ @@ -44,23 +47,68 @@ var _ = Describe("Searchprovider", func() { Root: &sprovider.ResourceId{OpaqueId: "personalspaceroot"}, Name: "personalspace", } + + ref = &sprovider.Reference{ + ResourceId: &sprovider.ResourceId{ + StorageId: "storageid", + OpaqueId: "rootopaqueid", + }, + Path: "./foo.pdf", + } + ri = &sprovider.ResourceInfo{ + Id: &sprovider.ResourceId{ + StorageId: "storageid", + OpaqueId: "opaqueid", + }, + Path: "foo.pdf", + Size: 12345, + } ) BeforeEach(func() { ctx = context.Background() + eventsChan = make(chan interface{}) gwClient = &cs3mocks.GatewayAPIClient{} indexClient = &mocks.IndexClient{} - p = provider.New(gwClient, indexClient) + p = provider.New(gwClient, indexClient, "", eventsChan) }) Describe("New", func() { It("returns a new instance", func() { - p := provider.New(gwClient, indexClient) + p := provider.New(gwClient, indexClient, "", eventsChan) Expect(p).ToNot(BeNil()) }) }) + Describe("events", func() { + BeforeEach(func() { + gwClient.On("Authenticate", mock.Anything, mock.Anything).Return(&gateway.AuthenticateResponse{ + Token: "authtoken", + }, nil) + gwClient.On("Stat", mock.Anything, mock.Anything).Return(&sprovider.StatResponse{ + Status: status.NewOK(context.Background()), + Info: ri, + }, nil) + }) + + It("trigger an index change", func() { + called := false + indexClient.On("Add", mock.Anything, mock.MatchedBy(func(riToIndex *sprovider.ResourceInfo) bool { + return riToIndex.Id.OpaqueId == ri.Id.OpaqueId + })).Return(nil).Run(func(args mock.Arguments) { + called = true + }) + eventsChan <- events.FileUploaded{ + FileID: ref, + } + + Eventually(func() bool { + return called + }).Should(BeTrue()) + }) + }) + Describe("Search", func() { It("fails when an empty query is given", func() { res, err := p.Search(ctx, &searchsvc.SearchRequest{ diff --git a/extensions/search/pkg/search/search.go b/extensions/search/pkg/search/search.go index b28563b779..4e23b568b0 100644 --- a/extensions/search/pkg/search/search.go +++ b/extensions/search/pkg/search/search.go @@ -21,6 +21,7 @@ package search import ( "context" + providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" searchsvc "github.com/owncloud/ocis/protogen/gen/ocis/services/search/v0" ) @@ -35,4 +36,5 @@ type ProviderClient interface { // IndexClient is the interface to the search index type IndexClient interface { Search(ctx context.Context, req *searchsvc.SearchIndexRequest) (*searchsvc.SearchIndexResponse, error) + Add(ref *providerv1beta1.Reference, ri *providerv1beta1.ResourceInfo) error } diff --git a/extensions/search/pkg/service/v0/service.go b/extensions/search/pkg/service/v0/service.go index e9ffd17829..5cedbf092c 100644 --- a/extensions/search/pkg/service/v0/service.go +++ b/extensions/search/pkg/service/v0/service.go @@ -3,9 +3,13 @@ package service import ( "context" + "github.com/asim/go-micro/plugins/events/natsjs/v4" "github.com/blevesearch/bleve/v2" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/events/server" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" + "github.com/owncloud/ocis/extensions/audit/pkg/types" "github.com/owncloud/ocis/extensions/search/pkg/config" "github.com/owncloud/ocis/extensions/search/pkg/search" "github.com/owncloud/ocis/extensions/search/pkg/search/index" @@ -20,6 +24,20 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, error) { logger := options.Logger cfg := options.Config + // Connect to nats to listen for changes that need to trigger an index update + evtsCfg := cfg.Events + client, err := server.NewNatsStream( + natsjs.Address(evtsCfg.Endpoint), + natsjs.ClusterID(evtsCfg.Cluster), + ) + if err != nil { + return nil, err + } + evts, err := events.Consume(client, evtsCfg.ConsumerGroup, types.RegisteredEvents()...) + if err != nil { + return nil, err + } + bleveIndex, err := bleve.NewMemOnly(index.BuildMapping()) if err != nil { return nil, err @@ -34,7 +52,7 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, error) { logger.Fatal().Err(err).Str("addr", cfg.Reva.Address).Msg("could not get reva client") } - provider := searchprovider.New(gwclient, index) + provider := searchprovider.New(gwclient, index, cfg.MachineAuthAPIKey, evts) return &Service{ id: cfg.GRPC.Namespace + "." + cfg.Service.Name,