|
|
|
@@ -23,10 +23,15 @@ import (
|
|
|
|
|
"github.com/owncloud/ocis/v2/services/search/pkg/config"
|
|
|
|
|
"github.com/owncloud/ocis/v2/services/search/pkg/content"
|
|
|
|
|
"github.com/owncloud/ocis/v2/services/search/pkg/engine"
|
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
//go:generate mockery --name=Searcher
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
_spaceStateTrashed = "trashed"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Searcher is the interface to the SearchService
|
|
|
|
|
type Searcher interface {
|
|
|
|
|
Search(ctx context.Context, req *searchsvc.SearchRequest) (*searchsvc.SearchResponse, error)
|
|
|
|
@@ -47,6 +52,8 @@ type Service struct {
|
|
|
|
|
secret string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var errSkipSpace error
|
|
|
|
|
|
|
|
|
|
// NewService creates a new Provider instance.
|
|
|
|
|
func NewService(gw gateway.GatewayAPIClient, eng engine.Engine, extractor content.Extractor, logger log.Logger, cfg *config.Config) *Service {
|
|
|
|
|
var s = &Service{
|
|
|
|
@@ -80,8 +87,17 @@ func (s *Service) Search(ctx context.Context, req *searchsvc.SearchRequest) (*se
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mountpointMap := map[string]string{}
|
|
|
|
|
spaces := []*provider.StorageSpace{}
|
|
|
|
|
for _, space := range listSpacesRes.StorageSpaces {
|
|
|
|
|
if utils.ReadPlainFromOpaque(space.Opaque, "trashed") == _spaceStateTrashed {
|
|
|
|
|
// Do not consider disabled spaces
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
spaces = append(spaces, space)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mountpointMap := map[string]string{}
|
|
|
|
|
for _, space := range spaces {
|
|
|
|
|
if space.SpaceType != "mountpoint" {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
@@ -96,94 +112,70 @@ func (s *Service) Search(ctx context.Context, req *searchsvc.SearchRequest) (*se
|
|
|
|
|
|
|
|
|
|
matches := matchArray{}
|
|
|
|
|
total := int32(0)
|
|
|
|
|
for _, space := range listSpacesRes.StorageSpaces {
|
|
|
|
|
searchRootID := &searchmsg.ResourceID{
|
|
|
|
|
StorageId: space.Root.StorageId,
|
|
|
|
|
SpaceId: space.Root.SpaceId,
|
|
|
|
|
OpaqueId: space.Root.OpaqueId,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if req.Ref != nil &&
|
|
|
|
|
(req.Ref.ResourceId.StorageId != searchRootID.StorageId ||
|
|
|
|
|
req.Ref.ResourceId.SpaceId != searchRootID.SpaceId ||
|
|
|
|
|
req.Ref.ResourceId.OpaqueId != searchRootID.OpaqueId) {
|
|
|
|
|
errg, ctx := errgroup.WithContext(ctx)
|
|
|
|
|
work := make(chan *provider.StorageSpace, len(spaces))
|
|
|
|
|
results := make(chan *searchsvc.SearchIndexResponse, len(spaces))
|
|
|
|
|
|
|
|
|
|
// Distribute work
|
|
|
|
|
errg.Go(func() error {
|
|
|
|
|
defer close(work)
|
|
|
|
|
for _, space := range spaces {
|
|
|
|
|
select {
|
|
|
|
|
case work <- space:
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
// Spawn workers that'll concurrently work the queue
|
|
|
|
|
numWorkers := 20
|
|
|
|
|
if len(spaces) < numWorkers {
|
|
|
|
|
numWorkers = len(spaces)
|
|
|
|
|
}
|
|
|
|
|
for i := 0; i < numWorkers; i++ {
|
|
|
|
|
errg.Go(func() error {
|
|
|
|
|
for space := range work {
|
|
|
|
|
res, err := s.searchIndex(ctx, req, space, mountpointMap[space.Id.OpaqueId])
|
|
|
|
|
if err != nil && err != errSkipSpace {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case results <- res:
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Wait for things to settle down, then close results chan
|
|
|
|
|
go func() {
|
|
|
|
|
_ = errg.Wait() // error is checked later
|
|
|
|
|
close(results)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
responses := make([]*searchsvc.SearchIndexResponse, len(spaces))
|
|
|
|
|
i := 0
|
|
|
|
|
for r := range results {
|
|
|
|
|
responses[i] = r
|
|
|
|
|
i++
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := errg.Wait(); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, res := range responses {
|
|
|
|
|
if res == nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
mountpointRootID *searchmsg.ResourceID
|
|
|
|
|
rootName string
|
|
|
|
|
permissions *provider.ResourcePermissions
|
|
|
|
|
)
|
|
|
|
|
mountpointPrefix := ""
|
|
|
|
|
switch space.SpaceType {
|
|
|
|
|
case "mountpoint":
|
|
|
|
|
continue // mountpoint spaces are only "links" to the shared spaces. we have to search the shared "grant" space instead
|
|
|
|
|
case "grant":
|
|
|
|
|
// In case of grant spaces we search the root of the outer space and translate the paths to the according mountpoint
|
|
|
|
|
searchRootID.OpaqueId = space.Root.SpaceId
|
|
|
|
|
mountpointID, ok := mountpointMap[space.Id.OpaqueId]
|
|
|
|
|
if !ok {
|
|
|
|
|
s.logger.Warn().Interface("space", space).Msg("could not find mountpoint space for grant space")
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
gpRes, err := s.gateway.GetPath(ctx, &provider.GetPathRequest{
|
|
|
|
|
ResourceId: space.Root,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
s.logger.Error().Err(err).Str("space", space.Id.OpaqueId).Msg("failed to get path for grant space root")
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if gpRes.Status.Code != rpcv1beta1.Code_CODE_OK {
|
|
|
|
|
s.logger.Error().Interface("status", gpRes.Status).Str("space", space.Id.OpaqueId).Msg("failed to get path for grant space root")
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
mountpointPrefix = utils.MakeRelativePath(gpRes.Path)
|
|
|
|
|
sid, spid, oid, err := storagespace.SplitID(mountpointID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
s.logger.Error().Err(err).Str("space", space.Id.OpaqueId).Str("mountpointId", mountpointID).Msg("invalid mountpoint space id")
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
mountpointRootID = &searchmsg.ResourceID{
|
|
|
|
|
StorageId: sid,
|
|
|
|
|
SpaceId: spid,
|
|
|
|
|
OpaqueId: oid,
|
|
|
|
|
}
|
|
|
|
|
rootName = space.GetRootInfo().GetPath()
|
|
|
|
|
permissions = space.GetRootInfo().GetPermissionSet()
|
|
|
|
|
s.logger.Debug().Interface("grantSpace", space).Interface("mountpointRootId", mountpointRootID).Msg("searching a grant")
|
|
|
|
|
case "personal":
|
|
|
|
|
permissions = space.GetRootInfo().GetPermissionSet()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
res, err := s.engine.Search(ctx, &searchsvc.SearchIndexRequest{
|
|
|
|
|
Query: req.Query,
|
|
|
|
|
Ref: &searchmsg.Reference{
|
|
|
|
|
ResourceId: searchRootID,
|
|
|
|
|
Path: mountpointPrefix,
|
|
|
|
|
},
|
|
|
|
|
PageSize: req.PageSize,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
s.logger.Error().Err(err).Str("space", space.Id.OpaqueId).Msg("failed to search the index")
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
s.logger.Debug().Str("space", space.Id.OpaqueId).Int("hits", len(res.Matches)).Msg("space search done")
|
|
|
|
|
|
|
|
|
|
total += res.TotalMatches
|
|
|
|
|
for _, match := range res.Matches {
|
|
|
|
|
if mountpointPrefix != "" {
|
|
|
|
|
match.Entity.Ref.Path = utils.MakeRelativePath(strings.TrimPrefix(match.Entity.Ref.Path, mountpointPrefix))
|
|
|
|
|
}
|
|
|
|
|
if mountpointRootID != nil {
|
|
|
|
|
match.Entity.Ref.ResourceId = mountpointRootID
|
|
|
|
|
}
|
|
|
|
|
match.Entity.ShareRootName = rootName
|
|
|
|
|
|
|
|
|
|
isShared := match.GetEntity().GetRef().GetResourceId().GetSpaceId() == utils.ShareStorageSpaceID
|
|
|
|
|
isMountpoint := isShared && match.GetEntity().GetRef().GetPath() == "."
|
|
|
|
|
isDir := match.GetEntity().GetMimeType() == "httpd/unix-directory"
|
|
|
|
|
match.Entity.Permissions = convertToWebDAVPermissions(isShared, isMountpoint, isDir, permissions)
|
|
|
|
|
matches = append(matches, match)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@@ -204,6 +196,96 @@ func (s *Service) Search(ctx context.Context, req *searchsvc.SearchRequest) (*se
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Service) searchIndex(ctx context.Context, req *searchsvc.SearchRequest, space *provider.StorageSpace, mountpointID string) (*searchsvc.SearchIndexResponse, error) {
|
|
|
|
|
if req.Ref != nil &&
|
|
|
|
|
(req.Ref.ResourceId.StorageId != space.Root.StorageId ||
|
|
|
|
|
req.Ref.ResourceId.SpaceId != space.Root.SpaceId ||
|
|
|
|
|
req.Ref.ResourceId.OpaqueId != space.Root.OpaqueId) {
|
|
|
|
|
return nil, errSkipSpace
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
searchRootID := &searchmsg.ResourceID{
|
|
|
|
|
StorageId: space.Root.StorageId,
|
|
|
|
|
SpaceId: space.Root.SpaceId,
|
|
|
|
|
OpaqueId: space.Root.OpaqueId,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
mountpointRootID *searchmsg.ResourceID
|
|
|
|
|
rootName string
|
|
|
|
|
permissions *provider.ResourcePermissions
|
|
|
|
|
)
|
|
|
|
|
mountpointPrefix := ""
|
|
|
|
|
switch space.SpaceType {
|
|
|
|
|
case "mountpoint":
|
|
|
|
|
return nil, errSkipSpace // mountpoint spaces are only "links" to the shared spaces. we have to search the shared "grant" space instead
|
|
|
|
|
case "grant":
|
|
|
|
|
// In case of grant spaces we search the root of the outer space and translate the paths to the according mountpoint
|
|
|
|
|
searchRootID.OpaqueId = space.Root.SpaceId
|
|
|
|
|
if mountpointID == "" {
|
|
|
|
|
s.logger.Warn().Interface("space", space).Msg("could not find mountpoint space for grant space")
|
|
|
|
|
return nil, errSkipSpace
|
|
|
|
|
}
|
|
|
|
|
gpRes, err := s.gateway.GetPath(ctx, &provider.GetPathRequest{
|
|
|
|
|
ResourceId: space.Root,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
s.logger.Error().Err(err).Str("space", space.Id.OpaqueId).Msg("failed to get path for grant space root")
|
|
|
|
|
return nil, errSkipSpace
|
|
|
|
|
}
|
|
|
|
|
if gpRes.Status.Code != rpcv1beta1.Code_CODE_OK {
|
|
|
|
|
s.logger.Error().Interface("status", gpRes.Status).Str("space", space.Id.OpaqueId).Msg("failed to get path for grant space root")
|
|
|
|
|
return nil, errSkipSpace
|
|
|
|
|
}
|
|
|
|
|
mountpointPrefix = utils.MakeRelativePath(gpRes.Path)
|
|
|
|
|
sid, spid, oid, err := storagespace.SplitID(mountpointID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
s.logger.Error().Err(err).Str("space", space.Id.OpaqueId).Str("mountpointId", mountpointID).Msg("invalid mountpoint space id")
|
|
|
|
|
return nil, errSkipSpace
|
|
|
|
|
}
|
|
|
|
|
mountpointRootID = &searchmsg.ResourceID{
|
|
|
|
|
StorageId: sid,
|
|
|
|
|
SpaceId: spid,
|
|
|
|
|
OpaqueId: oid,
|
|
|
|
|
}
|
|
|
|
|
rootName = space.GetRootInfo().GetPath()
|
|
|
|
|
permissions = space.GetRootInfo().GetPermissionSet()
|
|
|
|
|
s.logger.Debug().Interface("grantSpace", space).Interface("mountpointRootId", mountpointRootID).Msg("searching a grant")
|
|
|
|
|
case "personal":
|
|
|
|
|
permissions = space.GetRootInfo().GetPermissionSet()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
res, err := s.engine.Search(ctx, &searchsvc.SearchIndexRequest{
|
|
|
|
|
Query: req.Query,
|
|
|
|
|
Ref: &searchmsg.Reference{
|
|
|
|
|
ResourceId: searchRootID,
|
|
|
|
|
Path: mountpointPrefix,
|
|
|
|
|
},
|
|
|
|
|
PageSize: req.PageSize,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
s.logger.Error().Err(err).Str("space", space.Id.OpaqueId).Msg("failed to search the index")
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
s.logger.Debug().Str("space", space.Id.OpaqueId).Int("hits", len(res.Matches)).Msg("space search done")
|
|
|
|
|
|
|
|
|
|
for _, match := range res.Matches {
|
|
|
|
|
if mountpointPrefix != "" {
|
|
|
|
|
match.Entity.Ref.Path = utils.MakeRelativePath(strings.TrimPrefix(match.Entity.Ref.Path, mountpointPrefix))
|
|
|
|
|
}
|
|
|
|
|
if mountpointRootID != nil {
|
|
|
|
|
match.Entity.Ref.ResourceId = mountpointRootID
|
|
|
|
|
}
|
|
|
|
|
match.Entity.ShareRootName = rootName
|
|
|
|
|
|
|
|
|
|
isShared := match.GetEntity().GetRef().GetResourceId().GetSpaceId() == utils.ShareStorageSpaceID
|
|
|
|
|
isMountpoint := isShared && match.GetEntity().GetRef().GetPath() == "."
|
|
|
|
|
isDir := match.GetEntity().GetMimeType() == "httpd/unix-directory"
|
|
|
|
|
match.Entity.Permissions = convertToWebDAVPermissions(isShared, isMountpoint, isDir, permissions)
|
|
|
|
|
}
|
|
|
|
|
return res, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// IndexSpace (re)indexes all resources of a given space.
|
|
|
|
|
func (s *Service) IndexSpace(spaceID *provider.StorageSpaceId, uID *user.UserId) error {
|
|
|
|
|
ownerCtx, err := getAuthContext(&user.User{Id: uID}, s.gateway, s.secret, s.logger)
|
|
|
|
|