Listen to nats and trigger the indexing of uploaded files

This commit is contained in:
André Duffeck
2022-04-21 12:01:25 +02:00
parent c17717d7c9
commit 5598fc04ec
8 changed files with 154 additions and 10 deletions

View File

@@ -20,6 +20,16 @@ type Config struct {
Reva Reva `ocisConfig:"reva"`
TokenManager TokenManager `ocisConfig:"token_manager"`
Events Events `yaml:"events"`
MachineAuthAPIKey string `yaml:"machine_auth_api_key" env:"OCIS_MACHINE_AUTH_API_KEY;SEARCH_MACHINE_AUTH_API_KEY"`
Context context.Context `ocisConfig:"-" yaml:"-"`
}
// Events combines the configuration options for the event bus.
type Events struct {
Endpoint string `yaml:"events_endpoint" env:"SEARCH_EVENTS_ENDPOINT" desc:"the address of the streaming service"`
Cluster string `yaml:"events_cluster" env:"SEARCH_EVENTS_CLUSTER" desc:"the clusterID of the streaming service. Mandatory when using nats"`
ConsumerGroup string `yaml:"events_group" env:"SEARCH_EVENTS_GROUP" desc:"the customergroup of the service. One group will only get one copy of an event"`
}

View File

@@ -23,6 +23,12 @@ func DefaultConfig() *config.Config {
TokenManager: config.TokenManager{
JWTSecret: "Pive-Fumkiu4",
},
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
ConsumerGroup: "search",
},
MachineAuthAPIKey: "change-me-please",
}
}

View File

@@ -177,7 +177,7 @@ var _ = Describe("Index", func() {
})
})
Describe("Index", func() {
Describe("Add", func() {
It("adds a resourceInfo to the index", func() {
err := i.Add(ref, ri)
Expect(err).ToNot(HaveOccurred())

View File

@@ -5,6 +5,7 @@ package mocks
import (
context "context"
providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
mock "github.com/stretchr/testify/mock"
v0 "github.com/owncloud/ocis/protogen/gen/ocis/services/search/v0"
@@ -15,6 +16,20 @@ type IndexClient struct {
mock.Mock
}
// Add provides a mock function with given fields: ref, ri
func (_m *IndexClient) Add(ref *providerv1beta1.Reference, ri *providerv1beta1.ResourceInfo) error {
ret := _m.Called(ref, ri)
var r0 error
if rf, ok := ret.Get(0).(func(*providerv1beta1.Reference, *providerv1beta1.ResourceInfo) error); ok {
r0 = rf(ref, ri)
} else {
r0 = ret.Error(0)
}
return r0
}
// Search provides a mock function with given fields: ctx, req
func (_m *IndexClient) Search(ctx context.Context, req *v0.SearchIndexRequest) (*v0.SearchIndexResponse, error) {
ret := _m.Called(ctx, req)

View File

@@ -5,26 +5,71 @@ import (
"strings"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/extensions/search/pkg/search"
"google.golang.org/grpc/metadata"
searchmsg "github.com/owncloud/ocis/protogen/gen/ocis/messages/search/v0"
searchsvc "github.com/owncloud/ocis/protogen/gen/ocis/services/search/v0"
)
type Provider struct {
gwClient gateway.GatewayAPIClient
indexClient search.IndexClient
gwClient gateway.GatewayAPIClient
indexClient search.IndexClient
machineAuthAPIKey string
}
func New(gwClient gateway.GatewayAPIClient, indexClient search.IndexClient) *Provider {
func New(gwClient gateway.GatewayAPIClient, indexClient search.IndexClient, machineAuthAPIKey string, eventsChan <-chan interface{}) *Provider {
go func() {
for {
ev := <-eventsChan
var ref *providerv1beta1.Reference
var owner *user.User
switch e := ev.(type) {
case events.FileUploaded:
ref = e.FileID
owner = &user.User{
Id: e.Executant,
}
default:
// Not sure what to do here. Skip.
continue
}
// Get auth
ownerCtx := ctxpkg.ContextSetUser(context.Background(), owner)
authRes, err := gwClient.Authenticate(ownerCtx, &gateway.AuthenticateRequest{
Type: "machine",
ClientId: "userid:" + owner.Id.OpaqueId,
ClientSecret: machineAuthAPIKey,
})
if err != nil || authRes.GetStatus().GetCode() != rpc.Code_CODE_OK {
// TODO: log error
}
ownerCtx = metadata.AppendToOutgoingContext(ownerCtx, ctxpkg.TokenHeader, authRes.Token)
// Stat changed resource resource
statRes, err := gwClient.Stat(ownerCtx, &providerv1beta1.StatRequest{Ref: ref})
if err != nil || statRes.Status.Code != rpc.Code_CODE_OK {
// TODO: log error
}
indexClient.Add(ref, statRes.Info)
}
}()
return &Provider{
gwClient: gwClient,
indexClient: indexClient,
gwClient: gwClient,
indexClient: indexClient,
machineAuthAPIKey: machineAuthAPIKey,
}
}

View File

@@ -7,9 +7,11 @@ import (
. "github.com/onsi/gomega"
"github.com/stretchr/testify/mock"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
sprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rgrpc/status"
cs3mocks "github.com/cs3org/reva/v2/tests/cs3mocks/mocks"
"github.com/owncloud/ocis/extensions/search/pkg/search/mocks"
@@ -24,7 +26,8 @@ var _ = Describe("Searchprovider", func() {
gwClient *cs3mocks.GatewayAPIClient
indexClient *mocks.IndexClient
ctx context.Context
ctx context.Context
eventsChan chan interface{}
otherUser = &userv1beta1.User{
Id: &userv1beta1.UserId{
@@ -44,23 +47,68 @@ var _ = Describe("Searchprovider", func() {
Root: &sprovider.ResourceId{OpaqueId: "personalspaceroot"},
Name: "personalspace",
}
ref = &sprovider.Reference{
ResourceId: &sprovider.ResourceId{
StorageId: "storageid",
OpaqueId: "rootopaqueid",
},
Path: "./foo.pdf",
}
ri = &sprovider.ResourceInfo{
Id: &sprovider.ResourceId{
StorageId: "storageid",
OpaqueId: "opaqueid",
},
Path: "foo.pdf",
Size: 12345,
}
)
BeforeEach(func() {
ctx = context.Background()
eventsChan = make(chan interface{})
gwClient = &cs3mocks.GatewayAPIClient{}
indexClient = &mocks.IndexClient{}
p = provider.New(gwClient, indexClient)
p = provider.New(gwClient, indexClient, "", eventsChan)
})
Describe("New", func() {
It("returns a new instance", func() {
p := provider.New(gwClient, indexClient)
p := provider.New(gwClient, indexClient, "", eventsChan)
Expect(p).ToNot(BeNil())
})
})
Describe("events", func() {
BeforeEach(func() {
gwClient.On("Authenticate", mock.Anything, mock.Anything).Return(&gateway.AuthenticateResponse{
Token: "authtoken",
}, nil)
gwClient.On("Stat", mock.Anything, mock.Anything).Return(&sprovider.StatResponse{
Status: status.NewOK(context.Background()),
Info: ri,
}, nil)
})
It("trigger an index change", func() {
called := false
indexClient.On("Add", mock.Anything, mock.MatchedBy(func(riToIndex *sprovider.ResourceInfo) bool {
return riToIndex.Id.OpaqueId == ri.Id.OpaqueId
})).Return(nil).Run(func(args mock.Arguments) {
called = true
})
eventsChan <- events.FileUploaded{
FileID: ref,
}
Eventually(func() bool {
return called
}).Should(BeTrue())
})
})
Describe("Search", func() {
It("fails when an empty query is given", func() {
res, err := p.Search(ctx, &searchsvc.SearchRequest{

View File

@@ -21,6 +21,7 @@ package search
import (
"context"
providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
searchsvc "github.com/owncloud/ocis/protogen/gen/ocis/services/search/v0"
)
@@ -35,4 +36,5 @@ type ProviderClient interface {
// IndexClient is the interface to the search index
type IndexClient interface {
Search(ctx context.Context, req *searchsvc.SearchIndexRequest) (*searchsvc.SearchIndexResponse, error)
Add(ref *providerv1beta1.Reference, ri *providerv1beta1.ResourceInfo) error
}

View File

@@ -3,9 +3,13 @@ package service
import (
"context"
"github.com/asim/go-micro/plugins/events/natsjs/v4"
"github.com/blevesearch/bleve/v2"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/server"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/owncloud/ocis/extensions/audit/pkg/types"
"github.com/owncloud/ocis/extensions/search/pkg/config"
"github.com/owncloud/ocis/extensions/search/pkg/search"
"github.com/owncloud/ocis/extensions/search/pkg/search/index"
@@ -20,6 +24,20 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, error) {
logger := options.Logger
cfg := options.Config
// Connect to nats to listen for changes that need to trigger an index update
evtsCfg := cfg.Events
client, err := server.NewNatsStream(
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
if err != nil {
return nil, err
}
evts, err := events.Consume(client, evtsCfg.ConsumerGroup, types.RegisteredEvents()...)
if err != nil {
return nil, err
}
bleveIndex, err := bleve.NewMemOnly(index.BuildMapping())
if err != nil {
return nil, err
@@ -34,7 +52,7 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, error) {
logger.Fatal().Err(err).Str("addr", cfg.Reva.Address).Msg("could not get reva client")
}
provider := searchprovider.New(gwclient, index)
provider := searchprovider.New(gwclient, index, cfg.MachineAuthAPIKey, evts)
return &Service{
id: cfg.GRPC.Namespace + "." + cfg.Service.Name,