mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-03-17 23:17:09 -04:00
bump reva to 2.19.4
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
This commit is contained in:
116
vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go
generated
vendored
116
vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go
generated
vendored
@@ -29,14 +29,6 @@ import (
|
||||
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
|
||||
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
|
||||
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
||||
"github.com/google/uuid"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rs/zerolog/log"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/genproto/protobuf/field_mask"
|
||||
|
||||
"github.com/cs3org/reva/v2/pkg/appctx"
|
||||
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
|
||||
"github.com/cs3org/reva/v2/pkg/errtypes"
|
||||
@@ -52,6 +44,13 @@ import (
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata" // nolint:staticcheck // we need the legacy package to convert V1 to V2 messages
|
||||
"github.com/cs3org/reva/v2/pkg/storagespace"
|
||||
"github.com/cs3org/reva/v2/pkg/utils"
|
||||
"github.com/google/uuid"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/pkg/errors"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/genproto/protobuf/field_mask"
|
||||
"google.golang.org/protobuf/types/known/fieldmaskpb"
|
||||
)
|
||||
|
||||
/*
|
||||
@@ -153,8 +152,8 @@ type Manager struct {
|
||||
|
||||
MaxConcurrency int
|
||||
|
||||
gateway gatewayv1beta1.GatewayAPIClient
|
||||
eventStream events.Stream
|
||||
gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient]
|
||||
eventStream events.Stream
|
||||
}
|
||||
|
||||
// NewDefault returns a new manager instance with default dependencies
|
||||
@@ -170,7 +169,7 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
gc, err := pool.GetGatewayServiceClient(c.GatewayAddr)
|
||||
gatewaySelector, err := pool.GatewaySelector(c.GatewayAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -183,11 +182,11 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) {
|
||||
}
|
||||
}
|
||||
|
||||
return New(s, gc, c.CacheTTL, es, c.MaxConcurrency)
|
||||
return New(s, gatewaySelector, c.CacheTTL, es, c.MaxConcurrency)
|
||||
}
|
||||
|
||||
// New returns a new manager instance.
|
||||
func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) {
|
||||
func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) {
|
||||
ttl := time.Duration(ttlSeconds) * time.Second
|
||||
return &Manager{
|
||||
Cache: providercache.New(s, ttl),
|
||||
@@ -195,7 +194,7 @@ func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int,
|
||||
UserReceivedStates: receivedsharecache.New(s, ttl),
|
||||
GroupReceivedCache: sharecache.New(s, "groups", "received.json", ttl),
|
||||
storage: s,
|
||||
gateway: gc,
|
||||
gatewaySelector: gatewaySelector,
|
||||
eventStream: es,
|
||||
MaxConcurrency: maxconcurrency,
|
||||
}, nil
|
||||
@@ -411,6 +410,7 @@ func (m *Manager) get(ctx context.Context, ref *collaboration.ShareReference) (s
|
||||
func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.Share, error) {
|
||||
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "GetShare")
|
||||
defer span.End()
|
||||
sublog := appctx.GetLogger(ctx).With().Str("id", ref.GetId().GetOpaqueId()).Str("key", ref.GetKey().String()).Str("driver", "jsoncs3").Str("handler", "GetShare").Logger()
|
||||
if err := m.initialize(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -421,7 +421,7 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc
|
||||
}
|
||||
if share.IsExpired(s) {
|
||||
if err := m.removeShare(ctx, s); err != nil {
|
||||
log.Error().Err(err).
|
||||
sublog.Error().Err(err).
|
||||
Msg("failed to unshare expired share")
|
||||
}
|
||||
if err := events.Publish(ctx, m.eventStream, events.ShareExpired{
|
||||
@@ -432,7 +432,7 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc
|
||||
GranteeUserID: s.GetGrantee().GetUserId(),
|
||||
GranteeGroupID: s.GetGrantee().GetGroupId(),
|
||||
}); err != nil {
|
||||
log.Error().Err(err).
|
||||
sublog.Error().Err(err).
|
||||
Msg("failed to publish share expired event")
|
||||
}
|
||||
}
|
||||
@@ -445,8 +445,15 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc
|
||||
|
||||
req := &provider.StatRequest{
|
||||
Ref: &provider.Reference{ResourceId: s.ResourceId},
|
||||
FieldMask: &fieldmaskpb.FieldMask{
|
||||
Paths: []string{"permissions"},
|
||||
},
|
||||
}
|
||||
res, err := m.gateway.Stat(ctx, req)
|
||||
client, err := m.gatewaySelector.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res, err := client.Stat(ctx, req)
|
||||
if err == nil &&
|
||||
res.Status.Code == rpcv1beta1.Code_CODE_OK &&
|
||||
res.Info.PermissionSet.ListGrants {
|
||||
@@ -523,8 +530,15 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer
|
||||
if !share.IsCreatedByUser(toUpdate, user) {
|
||||
req := &provider.StatRequest{
|
||||
Ref: &provider.Reference{ResourceId: toUpdate.ResourceId},
|
||||
FieldMask: &fieldmaskpb.FieldMask{
|
||||
Paths: []string{"permissions"},
|
||||
},
|
||||
}
|
||||
res, err := m.gateway.Stat(ctx, req)
|
||||
client, err := m.gatewaySelector.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res, err := client.Stat(ctx, req)
|
||||
if err != nil ||
|
||||
res.Status.Code != rpcv1beta1.Code_CODE_OK ||
|
||||
!res.Info.PermissionSet.UpdateGrant {
|
||||
@@ -583,6 +597,7 @@ func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filte
|
||||
func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, filters []*collaboration.Filter) ([]*collaboration.Share, error) {
|
||||
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "listSharesByIDs")
|
||||
defer span.End()
|
||||
sublog := appctx.GetLogger(ctx).With().Str("userid", user.GetId().GetOpaqueId()).Str("useridp", user.GetId().GetIdp()).Str("driver", "jsoncs3").Str("handler", "listSharesByIDs").Logger()
|
||||
|
||||
providerSpaces := make(map[string]map[string]struct{})
|
||||
for _, f := range share.FilterFiltersByType(filters, collaboration.Filter_TYPE_RESOURCE_ID) {
|
||||
@@ -604,19 +619,21 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
|
||||
}
|
||||
|
||||
for _, s := range shares.Shares {
|
||||
resourceID := s.GetResourceId()
|
||||
sublog = sublog.With().Str("storageid", resourceID.GetStorageId()).Str("spaceid", resourceID.GetSpaceId()).Str("opaqueid", resourceID.GetOpaqueId()).Logger()
|
||||
if share.IsExpired(s) {
|
||||
if err := m.removeShare(ctx, s); err != nil {
|
||||
log.Error().Err(err).
|
||||
sublog.Error().Err(err).
|
||||
Msg("failed to unshare expired share")
|
||||
}
|
||||
if err := events.Publish(ctx, m.eventStream, events.ShareExpired{
|
||||
ShareOwner: s.GetOwner(),
|
||||
ItemID: s.GetResourceId(),
|
||||
ItemID: resourceID,
|
||||
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
|
||||
GranteeUserID: s.GetGrantee().GetUserId(),
|
||||
GranteeGroupID: s.GetGrantee().GetGroupId(),
|
||||
}); err != nil {
|
||||
log.Error().Err(err).
|
||||
sublog.Error().Err(err).
|
||||
Msg("failed to publish share expired event")
|
||||
}
|
||||
continue
|
||||
@@ -626,17 +643,33 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
|
||||
}
|
||||
|
||||
if !(share.IsCreatedByUser(s, user) || share.IsGrantedToUser(s, user)) {
|
||||
key := storagespace.FormatResourceID(*s.ResourceId)
|
||||
key := storagespace.FormatResourceID(*resourceID)
|
||||
if _, hit := statCache[key]; !hit {
|
||||
req := &provider.StatRequest{
|
||||
Ref: &provider.Reference{ResourceId: s.ResourceId},
|
||||
Ref: &provider.Reference{ResourceId: resourceID},
|
||||
FieldMask: &fieldmaskpb.FieldMask{
|
||||
Paths: []string{"permissions"},
|
||||
},
|
||||
}
|
||||
res, err := m.gateway.Stat(ctx, req)
|
||||
if err != nil ||
|
||||
res.Status.Code != rpcv1beta1.Code_CODE_OK ||
|
||||
!res.Info.PermissionSet.ListGrants {
|
||||
client, err := m.gatewaySelector.Next()
|
||||
if err != nil {
|
||||
sublog.Error().Err(err).Msg("failed to select next gateway client")
|
||||
continue
|
||||
}
|
||||
res, err := client.Stat(ctx, req)
|
||||
if err != nil {
|
||||
sublog.Error().Err(err).Msg("failed to make stat call")
|
||||
continue
|
||||
}
|
||||
if res.Status.Code != rpcv1beta1.Code_CODE_OK {
|
||||
sublog.Debug().Str("code", res.GetStatus().GetCode().String()).Msg(res.GetStatus().GetMessage())
|
||||
continue
|
||||
}
|
||||
if !res.Info.PermissionSet.ListGrants {
|
||||
sublog.Debug().Msg("user has no list grants permission")
|
||||
continue
|
||||
}
|
||||
sublog.Debug().Msg("listing share for non participating user")
|
||||
statCache[key] = struct{}{}
|
||||
}
|
||||
}
|
||||
@@ -652,6 +685,7 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
|
||||
func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, filters []*collaboration.Filter) ([]*collaboration.Share, error) {
|
||||
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "listCreatedShares")
|
||||
defer span.End()
|
||||
sublog := appctx.GetLogger(ctx).With().Str("userid", user.GetId().GetOpaqueId()).Str("useridp", user.GetId().GetIdp()).Str("driver", "jsoncs3").Str("handler", "listCreatedShares").Logger()
|
||||
|
||||
list, err := m.CreatedCache.List(ctx, user.Id.OpaqueId)
|
||||
if err != nil {
|
||||
@@ -694,7 +728,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
|
||||
// fetch all shares from space with one request
|
||||
_, err := m.Cache.ListSpace(ctx, storageID, spaceID)
|
||||
if err != nil {
|
||||
log.Error().Err(err).
|
||||
sublog.Error().Err(err).
|
||||
Str("storageid", storageID).
|
||||
Str("spaceid", spaceID).
|
||||
Msg("failed to list shares in space")
|
||||
@@ -707,7 +741,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
|
||||
}
|
||||
if share.IsExpired(s) {
|
||||
if err := m.removeShare(ctx, s); err != nil {
|
||||
log.Error().Err(err).
|
||||
sublog.Error().Err(err).
|
||||
Msg("failed to unshare expired share")
|
||||
}
|
||||
if err := events.Publish(ctx, m.eventStream, events.ShareExpired{
|
||||
@@ -717,7 +751,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
|
||||
GranteeUserID: s.GetGrantee().GetUserId(),
|
||||
GranteeGroupID: s.GetGrantee().GetGroupId(),
|
||||
}); err != nil {
|
||||
log.Error().Err(err).
|
||||
sublog.Error().Err(err).
|
||||
Msg("failed to publish share expired event")
|
||||
}
|
||||
continue
|
||||
@@ -762,6 +796,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
|
||||
func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaboration.Filter, forUser *userv1beta1.UserId) ([]*collaboration.ReceivedShare, error) {
|
||||
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListReceivedShares")
|
||||
defer span.End()
|
||||
sublog := appctx.GetLogger(ctx).With().Str("driver", "jsoncs3").Str("handler", "ListReceivedShares").Logger()
|
||||
|
||||
if err := m.initialize(ctx); err != nil {
|
||||
return nil, err
|
||||
@@ -769,7 +804,11 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
|
||||
|
||||
user := ctxpkg.ContextMustGetUser(ctx)
|
||||
if user.GetId().GetType() == userv1beta1.UserType_USER_TYPE_SERVICE {
|
||||
u, err := utils.GetUser(forUser, m.gateway)
|
||||
client, err := m.gatewaySelector.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u, err := utils.GetUser(forUser, client)
|
||||
if err != nil {
|
||||
return nil, errtypes.BadRequest("user not found")
|
||||
}
|
||||
@@ -852,12 +891,11 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
|
||||
g.Go(func() error {
|
||||
for w := range work {
|
||||
storageID, spaceID, _ := shareid.Decode(w.ssid)
|
||||
sublogr := sublog.With().Str("storageid", storageID).Str("spaceid", spaceID).Logger()
|
||||
// fetch all shares from space with one request
|
||||
_, err := m.Cache.ListSpace(ctx, storageID, spaceID)
|
||||
if err != nil {
|
||||
log.Error().Err(err).
|
||||
Str("storageid", storageID).
|
||||
Str("spaceid", spaceID).
|
||||
sublogr.Error().Err(err).
|
||||
Msg("failed to list shares in space")
|
||||
continue
|
||||
}
|
||||
@@ -866,9 +904,10 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
|
||||
if err != nil || s == nil {
|
||||
continue
|
||||
}
|
||||
sublogr = sublogr.With().Str("shareid", shareID).Logger()
|
||||
if share.IsExpired(s) {
|
||||
if err := m.removeShare(ctx, s); err != nil {
|
||||
log.Error().Err(err).
|
||||
sublogr.Error().Err(err).
|
||||
Msg("failed to unshare expired share")
|
||||
}
|
||||
if err := events.Publish(ctx, m.eventStream, events.ShareExpired{
|
||||
@@ -878,7 +917,7 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
|
||||
GranteeUserID: s.GetGrantee().GetUserId(),
|
||||
GranteeGroupID: s.GetGrantee().GetGroupId(),
|
||||
}); err != nil {
|
||||
log.Error().Err(err).
|
||||
sublogr.Error().Err(err).
|
||||
Msg("failed to publish share expired event")
|
||||
}
|
||||
continue
|
||||
@@ -959,6 +998,7 @@ func (m *Manager) GetReceivedShare(ctx context.Context, ref *collaboration.Share
|
||||
func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.ReceivedShare, error) {
|
||||
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "getReceived")
|
||||
defer span.End()
|
||||
sublog := appctx.GetLogger(ctx).With().Str("id", ref.GetId().GetOpaqueId()).Str("key", ref.GetKey().String()).Str("driver", "jsoncs3").Str("handler", "getReceived").Logger()
|
||||
|
||||
s, err := m.get(ctx, ref)
|
||||
if err != nil {
|
||||
@@ -970,7 +1010,7 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer
|
||||
}
|
||||
if share.IsExpired(s) {
|
||||
if err := m.removeShare(ctx, s); err != nil {
|
||||
log.Error().Err(err).
|
||||
sublog.Error().Err(err).
|
||||
Msg("failed to unshare expired share")
|
||||
}
|
||||
if err := events.Publish(ctx, m.eventStream, events.ShareExpired{
|
||||
@@ -980,7 +1020,7 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer
|
||||
GranteeUserID: s.GetGrantee().GetUserId(),
|
||||
GranteeGroupID: s.GetGrantee().GetGroupId(),
|
||||
}); err != nil {
|
||||
log.Error().Err(err).
|
||||
sublog.Error().Err(err).
|
||||
Msg("failed to publish share expired event")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user