Add items back to the index recursively when the parent is restored

This commit is contained in:
André Duffeck
2022-04-29 10:31:02 +02:00
parent 15c60df064
commit be15d63747
6 changed files with 113 additions and 34 deletions

View File

@@ -80,12 +80,17 @@ func (i *Index) Add(ref *sprovider.Reference, ri *sprovider.ResourceInfo) error
return i.bleveIndex.Index(idToBleveId(ri.Id), entity)
}
// Delete marks an entity from the index as delete (still keeping it around)
// Delete marks an entity from the index as deleten (still keeping it around)
func (i *Index) Delete(id *sprovider.ResourceId) error {
return i.markAsDeleted(idToBleveId(id))
return i.markAsDeleted(idToBleveId(id), true)
}
func (i *Index) markAsDeleted(id string) error {
// Restore marks an entity from the index as not being deleted
func (i *Index) Restore(id *sprovider.ResourceId) error {
return i.markAsDeleted(idToBleveId(id), false)
}
func (i *Index) markAsDeleted(id string, deleted bool) error {
req := bleve.NewSearchRequest(bleve.NewDocIDQuery([]string{id}))
req.Fields = []string{"*"}
res, err := i.bleveIndex.Search(req)
@@ -96,7 +101,7 @@ func (i *Index) markAsDeleted(id string) error {
return errors.New("entity not found")
}
entity := fieldsToEntity(res.Hits[0].Fields)
entity.Deleted = true
entity.Deleted = deleted
if entity.Type == uint64(sprovider.ResourceType_RESOURCE_TYPE_CONTAINER) {
query := bleve.NewConjunctionQuery(
@@ -111,7 +116,7 @@ func (i *Index) markAsDeleted(id string) error {
}
for _, h := range res.Hits {
i.markAsDeleted(h.ID)
i.markAsDeleted(h.ID, deleted)
}
}

View File

@@ -278,7 +278,45 @@ var _ = Describe("Index", func() {
})
})
Describe("Remove", func() {
Describe("Restore", func() {
It("also marks child resources as deleted", func() {
err := i.Add(parentRef, parentRi)
Expect(err).ToNot(HaveOccurred())
err = i.Add(childRef, childRi)
Expect(err).ToNot(HaveOccurred())
err = i.Delete(parentRi.Id)
Expect(err).ToNot(HaveOccurred())
res, err := i.Search(ctx, &searchsvc.SearchIndexRequest{
Query: "subdir",
Ref: &searchmsg.Reference{
ResourceId: &searchmsg.ResourceID{
StorageId: rootId.StorageId,
OpaqueId: rootId.OpaqueId,
},
},
})
Expect(err).ToNot(HaveOccurred())
Expect(len(res.Matches)).To(Equal(0))
err = i.Restore(parentRi.Id)
Expect(err).ToNot(HaveOccurred())
res, err = i.Search(ctx, &searchsvc.SearchIndexRequest{
Query: "subdir",
Ref: &searchmsg.Reference{
ResourceId: &searchmsg.ResourceID{
StorageId: rootId.StorageId,
OpaqueId: rootId.OpaqueId,
},
},
})
Expect(err).ToNot(HaveOccurred())
Expect(len(res.Matches)).To(Equal(2))
})
})
Describe("Delete", func() {
It("marks a resource as deleted", func() {
err := i.Add(parentRef, parentRi)
Expect(err).ToNot(HaveOccurred())

View File

@@ -79,6 +79,20 @@ func (_m *IndexClient) Purge(ri *providerv1beta1.ResourceId) error {
return r0
}
// Restore provides a mock function with given fields: ri
func (_m *IndexClient) Restore(ri *providerv1beta1.ResourceId) error {
ret := _m.Called(ri)
var r0 error
if rf, ok := ret.Get(0).(func(*providerv1beta1.ResourceId) error); ok {
r0 = rf(ri)
} else {
r0 = ret.Error(0)
}
return r0
}
// Search provides a mock function with given fields: ctx, req
func (_m *IndexClient) Search(ctx context.Context, req *v0.SearchIndexRequest) (*v0.SearchIndexResponse, error) {
ret := _m.Called(ctx, req)

View File

@@ -46,6 +46,34 @@ func New(gwClient gateway.GatewayAPIClient, indexClient search.IndexClient, mach
var ref *provider.Reference
var owner *user.User
switch e := ev.(type) {
case events.ItemTrashed:
err := p.indexClient.Delete(e.ID)
if err != nil {
p.logger.Error().Err(err).Interface("Id", e.ID).Msg("failed to remove item from index")
}
continue
case events.ItemRestored:
ref = e.Ref
owner = &user.User{
Id: e.Executant,
}
statRes, err := p.statResource(ref, owner)
if err != nil {
p.logger.Error().Err(err).Msg("failed to stat the changed resource")
}
switch statRes.Status.Code {
case rpc.Code_CODE_OK:
err = p.indexClient.Restore(statRes.Info.Id)
if err != nil {
p.logger.Error().Err(err).Msg("failed to restore the changed resource in the index")
}
default:
p.logger.Error().Interface("statRes", statRes).Msg("failed to stat the changed resource")
}
continue
case events.FileUploaded:
ref = e.Ref
owner = &user.User{
@@ -56,41 +84,17 @@ func New(gwClient gateway.GatewayAPIClient, indexClient search.IndexClient, mach
owner = &user.User{
Id: e.Executant,
}
case events.ItemRestored:
ref = e.Ref
owner = &user.User{
Id: e.Executant,
}
case events.FileVersionRestored:
ref = e.Ref
owner = &user.User{
Id: e.Executant,
}
case events.ItemTrashed:
err := p.indexClient.Delete(e.Id)
if err != nil {
p.logger.Error().Err(err).Interface("Id", e.Id).Msg("failed to remove item from index")
}
continue
default:
// Not sure what to do here. Skip.
continue
}
// Get auth
ownerCtx := ctxpkg.ContextSetUser(context.Background(), owner)
authRes, err := p.gwClient.Authenticate(ownerCtx, &gateway.AuthenticateRequest{
Type: "machine",
ClientId: "userid:" + owner.Id.OpaqueId,
ClientSecret: machineAuthAPIKey,
})
if err != nil || authRes.GetStatus().GetCode() != rpc.Code_CODE_OK {
p.logger.Error().Err(err).Interface("authRes", authRes).Msg("error using machine auth")
}
ownerCtx = metadata.AppendToOutgoingContext(ownerCtx, ctxpkg.TokenHeader, authRes.Token)
// Stat changed resource resource
statRes, err := gwClient.Stat(ownerCtx, &provider.StatRequest{Ref: ref})
statRes, err := p.statResource(ref, owner)
if err != nil {
p.logger.Error().Err(err).Msg("failed to stat the changed resource")
}
@@ -114,6 +118,23 @@ func New(gwClient gateway.GatewayAPIClient, indexClient search.IndexClient, mach
return p
}
func (p *Provider) statResource(ref *provider.Reference, owner *user.User) (*provider.StatResponse, error) {
// Get auth
ownerCtx := ctxpkg.ContextSetUser(context.Background(), owner)
authRes, err := p.gwClient.Authenticate(ownerCtx, &gateway.AuthenticateRequest{
Type: "machine",
ClientId: "userid:" + owner.Id.OpaqueId,
ClientSecret: p.machineAuthAPIKey,
})
if err != nil || authRes.GetStatus().GetCode() != rpc.Code_CODE_OK {
p.logger.Error().Err(err).Interface("authRes", authRes).Msg("error using machine auth")
}
ownerCtx = metadata.AppendToOutgoingContext(ownerCtx, ctxpkg.TokenHeader, authRes.Token)
// Stat changed resource resource
return p.gwClient.Stat(ownerCtx, &provider.StatRequest{Ref: ref})
}
func (p *Provider) logDocCount() {
c, err := p.indexClient.DocCount()
if err != nil {

View File

@@ -129,7 +129,7 @@ var _ = Describe("Searchprovider", func() {
})
eventsChan <- events.ItemTrashed{
Ref: ref,
Id: ri.Id,
ID: ri.Id,
Executant: user.Id,
}
@@ -140,8 +140,8 @@ var _ = Describe("Searchprovider", func() {
It("indexes items when they are being restored", func() {
called := false
indexClient.On("Add", mock.Anything, mock.MatchedBy(func(riToIndex *sprovider.ResourceInfo) bool {
return riToIndex.Id.OpaqueId == ri.Id.OpaqueId
indexClient.On("Restore", mock.MatchedBy(func(id *sprovider.ResourceId) bool {
return id.OpaqueId == ri.Id.OpaqueId
})).Return(nil).Run(func(args mock.Arguments) {
called = true
})

View File

@@ -39,6 +39,7 @@ type IndexClient interface {
Search(ctx context.Context, req *searchsvc.SearchIndexRequest) (*searchsvc.SearchIndexResponse, error)
Add(ref *providerv1beta1.Reference, ri *providerv1beta1.ResourceInfo) error
Delete(ri *providerv1beta1.ResourceId) error
Restore(ri *providerv1beta1.ResourceId) error
Purge(ri *providerv1beta1.ResourceId) error
DocCount() (uint64, error)
}