Ralf Haferkamp
2023-06-08 17:58:00 +02:00
parent fc686d8cd8
commit d9088afd2c
9 changed files with 98 additions and 39 deletions

2
go.mod
View File

@@ -13,7 +13,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.6.0
github.com/cs3org/go-cs3apis v0.0.0-20230516150832-730ac860c71d
github.com/cs3org/reva/v2 v2.14.1-0.20230607220921-238a03c2f795
github.com/cs3org/reva/v2 v2.14.1-0.20230608155229-cf1aa9641f93
github.com/disintegration/imaging v1.6.2
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e
github.com/egirna/icap-client v0.1.1

4
go.sum
View File

@@ -629,8 +629,8 @@ github.com/crewjam/httperr v0.2.0 h1:b2BfXR8U3AlIHwNeFFvZ+BV1LFvKLlzMjzaTnZMybNo
github.com/crewjam/httperr v0.2.0/go.mod h1:Jlz+Sg/XqBQhyMjdDiC+GNNRzZTD7x39Gu3pglZ5oH4=
github.com/crewjam/saml v0.4.13 h1:TYHggH/hwP7eArqiXSJUvtOPNzQDyQ7vwmwEqlFWhMc=
github.com/crewjam/saml v0.4.13/go.mod h1:igEejV+fihTIlHXYP8zOec3V5A8y3lws5bQBFsTm4gA=
github.com/cs3org/reva/v2 v2.14.1-0.20230607220921-238a03c2f795 h1:uyzA03PcmG7mjd+3KJrkws0IXuXQCvHEn25xXBmO2QI=
github.com/cs3org/reva/v2 v2.14.1-0.20230607220921-238a03c2f795/go.mod h1:E32krZG159YflDSjDWfx/QGIC2529PS5LiPnGNHu3d0=
github.com/cs3org/reva/v2 v2.14.1-0.20230608155229-cf1aa9641f93 h1:yRhkp28pdpSbEDX+XQtq5ZiZ8jLMRnmuEKwFj9AlzfY=
github.com/cs3org/reva/v2 v2.14.1-0.20230608155229-cf1aa9641f93/go.mod h1:E32krZG159YflDSjDWfx/QGIC2529PS5LiPnGNHu3d0=
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI=
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=

View File

@@ -33,6 +33,7 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel/codes"
"golang.org/x/sync/errgroup"
"google.golang.org/genproto/protobuf/field_mask"
@@ -230,8 +231,11 @@ func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int,
}, nil
}
func (m *Manager) initialize() error {
func (m *Manager) initialize(ctx context.Context) error {
_, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "initialize")
defer span.End()
if m.initialized {
span.SetStatus(codes.Ok, "already initialized")
return nil
}
@@ -239,29 +243,39 @@ func (m *Manager) initialize() error {
defer m.Unlock()
if m.initialized { // check if initialization happened while grabbing the lock
span.SetStatus(codes.Ok, "initialized while grabbing lock")
return nil
}
ctx := context.Background()
ctx = context.Background()
err := m.storage.Init(ctx, "jsoncs3-share-manager-metadata")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
err = m.storage.MakeDirIfNotExist(ctx, "storages")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
err = m.storage.MakeDirIfNotExist(ctx, "users")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
err = m.storage.MakeDirIfNotExist(ctx, "groups")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
m.initialized = true
span.SetStatus(codes.Ok, "initialized")
return nil
}
@@ -269,7 +283,9 @@ func (m *Manager) initialize() error {
func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Share")
defer span.End()
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
@@ -280,7 +296,10 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
// TODO: should this not already be caught at the gw level?
if g.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_USER &&
(utils.UserEqual(g.Grantee.GetUserId(), user.Id) || utils.UserEqual(g.Grantee.GetUserId(), md.Owner)) {
return nil, errtypes.BadRequest("jsoncs3: owner/creator and grantee are the same")
err := errtypes.BadRequest("jsoncs3: owner/creator and grantee are the same")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
// check if share already exists.
@@ -295,7 +314,10 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
_, err := m.getByKey(ctx, key)
if err == nil {
// share already exists
return nil, errtypes.AlreadyExists(key.String())
err := errtypes.AlreadyExists(key.String())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
shareID := shareid.Encode(md.GetId().GetStorageId(), md.GetId().GetSpaceId(), uuid.NewString())
@@ -316,24 +338,32 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
err = m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.Cache.Sync(ctx, md.Id.StorageId, md.Id.SpaceId); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
err = m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s)
// TODO try more often?
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
err = m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.CreatedCache.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
err = m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID)
// TODO try more often?
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
@@ -350,12 +380,16 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
err = m.UserReceivedStates.Add(ctx, userid, spaceID, rs)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.UserReceivedStates.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
err = m.UserReceivedStates.Add(ctx, userid, spaceID, rs)
// TODO try more often?
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
case provider.GranteeType_GRANTEE_TYPE_GROUP:
@@ -363,16 +397,21 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
err := m.GroupReceivedCache.Add(ctx, groupid, shareID)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.GroupReceivedCache.Sync(ctx, groupid); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
err = m.GroupReceivedCache.Add(ctx, groupid, shareID)
// TODO try more often?
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
}
span.SetStatus(codes.Ok, "")
return s, nil
}
@@ -425,7 +464,7 @@ func (m *Manager) get(ctx context.Context, ref *collaboration.ShareReference) (s
func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "GetShare")
defer span.End()
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}
@@ -478,7 +517,7 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Unshare")
defer span.End()
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return err
}
@@ -504,7 +543,7 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateShare")
defer span.End()
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}
@@ -586,7 +625,7 @@ func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filte
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListShares")
defer span.End()
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}
@@ -622,6 +661,8 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
for spaceID := range spaces {
err := m.Cache.Sync(ctx, providerID, spaceID)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
@@ -669,6 +710,7 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
}
}
}
span.SetStatus(codes.Ok, "")
return ss, nil
}
@@ -679,6 +721,8 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
var ss []*collaboration.Share
if err := m.CreatedCache.Sync(ctx, user.Id.OpaqueId); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return ss, err
}
for ssid, spaceShareIDs := range m.CreatedCache.List(user.Id.OpaqueId) {
@@ -718,6 +762,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
}
}
span.SetStatus(codes.Ok, "")
return ss, nil
}
@@ -726,7 +771,7 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListReceivedShares")
defer span.End()
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}
@@ -870,9 +915,12 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
}
if err := g.Wait(); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
span.SetStatus(codes.Ok, "")
return rss, nil
}
@@ -899,7 +947,7 @@ func (m *Manager) convert(ctx context.Context, userID string, s *collaboration.S
// GetReceivedShare returns the information for a received share.
func (m *Manager) GetReceivedShare(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.ReceivedShare, error) {
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}
@@ -944,7 +992,7 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateReceivedShare")
defer span.End()
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}
@@ -997,7 +1045,7 @@ func updateShareID(share *collaboration.Share) {
// Load imports shares and received shares from channels (e.g. during migration)
func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Share, receivedShareChan <-chan share.ReceivedShareWithUser) error {
log := appctx.GetLogger(ctx)
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return err
}

View File

@@ -168,6 +168,7 @@ func (c *Cache) PersistWithTime(ctx context.Context, storageID, spaceID string,
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID))
if c.Providers[storageID] == nil || c.Providers[storageID].Spaces[spaceID] == nil {
span.SetStatus(codes.Ok, "no shares in provider or space")
return nil
}
@@ -178,11 +179,15 @@ func (c *Cache) PersistWithTime(ctx context.Context, storageID, spaceID string,
createdBytes, err := json.Marshal(c.Providers[storageID].Spaces[spaceID])
if err != nil {
c.Providers[storageID].Spaces[spaceID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
jsonPath := spaceJSONPath(storageID, spaceID)
if err := c.storage.MakeDirIfNotExist(ctx, path.Dir(jsonPath)); err != nil {
c.Providers[storageID].Spaces[spaceID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
@@ -192,8 +197,11 @@ func (c *Cache) PersistWithTime(ctx context.Context, storageID, spaceID string,
IfUnmodifiedSince: c.Providers[storageID].Spaces[spaceID].Mtime,
}); err != nil {
c.Providers[storageID].Spaces[spaceID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
span.SetStatus(codes.Ok, "")
return nil
}

View File

@@ -179,6 +179,7 @@ func (c *Cache) Persist(ctx context.Context, userID string) error {
span.SetAttributes(attribute.String("cs3.userid", userID))
if c.ReceivedSpaces[userID] == nil {
span.SetStatus(codes.Ok, "no received shares")
return nil
}
@@ -188,11 +189,15 @@ func (c *Cache) Persist(ctx context.Context, userID string) error {
createdBytes, err := json.Marshal(c.ReceivedSpaces[userID])
if err != nil {
c.ReceivedSpaces[userID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
jsonPath := userJSONPath(userID)
if err := c.storage.MakeDirIfNotExist(ctx, path.Dir(jsonPath)); err != nil {
c.ReceivedSpaces[userID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
@@ -202,8 +207,11 @@ func (c *Cache) Persist(ctx context.Context, userID string) error {
IfUnmodifiedSince: c.ReceivedSpaces[userID].Mtime,
}); err != nil {
c.ReceivedSpaces[userID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
span.SetStatus(codes.Ok, "")
return nil
}

View File

@@ -217,11 +217,15 @@ func (c *Cache) Persist(ctx context.Context, userid string) error {
createdBytes, err := json.Marshal(c.UserShares[userid])
if err != nil {
c.UserShares[userid].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
jsonPath := c.userCreatedPath(userid)
if err := c.storage.MakeDirIfNotExist(ctx, path.Dir(jsonPath)); err != nil {
c.UserShares[userid].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
@@ -231,8 +235,11 @@ func (c *Cache) Persist(ctx context.Context, userid string) error {
IfUnmodifiedSince: c.UserShares[userid].Mtime,
}); err != nil {
c.UserShares[userid].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
span.SetStatus(codes.Ok, "")
return nil
}

View File

@@ -242,6 +242,9 @@ func FilterFiltersByType(f []*collaboration.Filter, t collaboration.Filter_Type)
// IsExpired tests whether a share is expired
func IsExpired(s *collaboration.Share) bool {
expiration := time.Unix(int64(s.Expiration.GetSeconds()), int64(s.Expiration.GetNanos()))
return s.Expiration != nil && expiration.Before(time.Now())
if e := s.GetExpiration(); e != nil {
expiration := time.Unix(int64(e.Seconds), int64(e.Nanos))
return expiration.Before(time.Now())
}
return false
}

View File

@@ -396,36 +396,21 @@ func (cs3 *CS3) MakeDirIfNotExist(ctx context.Context, folder string) error {
Path: utils.MakeRelativePath(folder),
}
resp, err := client.Stat(ctx, &provider.StatRequest{
resp, err := client.CreateContainer(ctx, &provider.CreateContainerRequest{
Ref: rootPathRef,
})
if err != nil {
return err
}
switch {
case err != nil:
return err
case resp.Status.Code == rpc.Code_CODE_OK:
// nothing to do in this case
case resp.Status.Code == rpc.Code_CODE_NOT_FOUND:
r, err := client.CreateContainer(ctx, &provider.CreateContainerRequest{
Ref: rootPathRef,
})
if err != nil {
return err
}
if r.Status.Code != rpc.Code_CODE_OK {
return errtypes.NewErrtypeFromStatus(r.Status)
}
return nil
case resp.Status.Code == rpc.Code_CODE_ALREADY_EXISTS:
// nothing to do in this case
return nil
default:
return errtypes.NewErrtypeFromStatus(resp.Status)
}
return nil
}
// CreateSymlink creates a symlink

2
vendor/modules.txt vendored
View File

@@ -352,7 +352,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
# github.com/cs3org/reva/v2 v2.14.1-0.20230607220921-238a03c2f795
# github.com/cs3org/reva/v2 v2.14.1-0.20230608155229-cf1aa9641f93
## explicit; go 1.20
github.com/cs3org/reva/v2/cmd/revad/internal/grace
github.com/cs3org/reva/v2/cmd/revad/runtime