Merge pull request #8701 from kobergj/InitiatorID

[full-ci] InitiatorIDs
This commit is contained in:
kobergj
2024-03-22 16:02:41 +01:00
committed by GitHub
20 changed files with 162 additions and 49 deletions

View File

@@ -2,4 +2,5 @@ Enhancement: Bump Reva
bumps reva version
https://github.com/owncloud/ocis/pull/8701
https://github.com/owncloud/ocis/pull/8606

View File

@@ -0,0 +1,5 @@
Enhancement: Initiator-IDs
Allows sending a header `Initiator-ID` on http requests. This id will be added to sse events so clients can figure out if their particular instance was triggering the event. Additionally this adds the etag of the file/folder to all sse events.
https://github.com/owncloud/ocis/pull/8701

2
go.mod
View File

@@ -14,7 +14,7 @@ require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.10.0
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781
github.com/cs3org/reva/v2 v2.19.2-0.20240320153915-890d63097350
github.com/cs3org/reva/v2 v2.19.2-0.20240322140620-cbb501a7ae3a
github.com/dhowden/tag v0.0.0-20230630033851-978a0926ee25
github.com/disintegration/imaging v1.6.2
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e

4
go.sum
View File

@@ -1018,8 +1018,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c=
github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 h1:BUdwkIlf8IS2FasrrPg8gGPHQPOrQ18MS1Oew2tmGtY=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/reva/v2 v2.19.2-0.20240320153915-890d63097350 h1:Y0P9eve8cAwj3i3rDDeUusoW7yUs6AZFkU+ANygkrKo=
github.com/cs3org/reva/v2 v2.19.2-0.20240320153915-890d63097350/go.mod h1:GRUrOp5HbFVwZTgR9bVrMZ/MvVy+Jhxw1PdMmhhKP9E=
github.com/cs3org/reva/v2 v2.19.2-0.20240322140620-cbb501a7ae3a h1:dqIqhnxiRYfmDHhlgtEAeTbOJxQ2nca4O/Gius/TnxQ=
github.com/cs3org/reva/v2 v2.19.2-0.20240322140620-cbb501a7ae3a/go.mod h1:GRUrOp5HbFVwZTgR9bVrMZ/MvVy+Jhxw1PdMmhhKP9E=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg=
github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=

View File

@@ -5,4 +5,6 @@ type FileEvent struct {
ParentItemID string `json:"parentitemid"`
ItemID string `json:"itemid"`
SpaceID string `json:"spaceid"`
InitiatorID string `json:"initiatorid"`
Etag string `json:"etag"`
}

View File

@@ -95,12 +95,17 @@ func (cl *ClientlogService) processEvent(event events.Event) {
evType string
data interface{}
)
p := func(typ string, ref *provider.Reference) {
evType = typ
users, data, err = processFileEvent(ctx, ref, gwc, event.InitiatorID)
}
switch e := event.Event.(type) {
default:
err = errors.New("unhandled event")
case events.UploadReady:
evType = "postprocessing-finished"
users, data, err = processFileEvent(ctx, e.FileRef, gwc)
p("postprocessing-finished", e.FileRef)
case events.ItemTrashed:
evType = "item-trashed"
@@ -120,7 +125,8 @@ func (cl *ClientlogService) processEvent(event events.Event) {
ItemID: storagespace.FormatResourceID(*e.ID),
// TODO: check with web if parentID is needed
// ParentItemID: storagespace.FormatResourceID(*item.GetRef().GetResourceId()),
SpaceID: storagespace.FormatStorageID(e.ID.GetStorageId(), e.ID.GetSpaceId()),
SpaceID: storagespace.FormatStorageID(e.ID.GetStorageId(), e.ID.GetSpaceId()),
InitiatorID: event.InitiatorID,
}
users, err = utils.GetSpaceMembers(ctx, e.ID.GetSpaceId(), gwc, utils.ViewerRole)
@@ -128,24 +134,19 @@ func (cl *ClientlogService) processEvent(event events.Event) {
}
}
case events.ItemRestored:
evType = "item-restored"
users, data, err = processFileEvent(ctx, e.Ref, gwc)
p("item-restored", e.Ref)
case events.ContainerCreated:
evType = "folder-created"
users, data, err = processFileEvent(ctx, e.Ref, gwc)
p("folder-created", e.Ref)
case events.ItemMoved:
// we are only interested in the rename case
if !utils.ResourceIDEqual(e.OldReference.GetResourceId(), e.Ref.GetResourceId()) || e.Ref.GetPath() == e.OldReference.GetPath() {
return
}
evType = "item-renamed"
users, data, err = processFileEvent(ctx, e.Ref, gwc)
p("item-renamed", e.Ref)
case events.FileLocked:
evType = "file-locked"
users, data, err = processFileEvent(ctx, e.Ref, gwc)
p("file-locked", e.Ref)
case events.FileUnlocked:
evType = "file-unlocked"
users, data, err = processFileEvent(ctx, e.Ref, gwc)
p("file-unlocked", e.Ref)
}
if err != nil {
@@ -173,7 +174,7 @@ func (cl *ClientlogService) sendSSE(userIDs []string, evType string, data interf
})
}
func processFileEvent(ctx context.Context, ref *provider.Reference, gwc gateway.GatewayAPIClient) ([]string, FileEvent, error) {
func processFileEvent(ctx context.Context, ref *provider.Reference, gwc gateway.GatewayAPIClient, initiatorid string) ([]string, FileEvent, error) {
info, err := utils.GetResource(ctx, ref, gwc)
if err != nil {
return nil, FileEvent{}, err
@@ -183,6 +184,8 @@ func processFileEvent(ctx context.Context, ref *provider.Reference, gwc gateway.
ParentItemID: storagespace.FormatResourceID(*info.GetParentId()),
ItemID: storagespace.FormatResourceID(*info.GetId()),
SpaceID: storagespace.FormatStorageID(info.GetSpace().GetRoot().GetStorageId(), info.GetSpace().GetRoot().GetSpaceId()),
InitiatorID: initiatorid,
Etag: info.GetEtag(),
}
users, err := utils.GetSpaceMembers(ctx, info.GetSpace().GetId().GetOpaqueId(), gwc, utils.ViewerRole)

View File

@@ -12,15 +12,16 @@ import (
// Postprocessing handles postprocessing of a file
type Postprocessing struct {
ID string
URL string
User *user.User
Filename string
Filesize uint64
ResourceID *provider.ResourceId
Steps []events.Postprocessingstep
Status Status
Failures int
ID string
URL string
User *user.User
Filename string
Filesize uint64
ResourceID *provider.ResourceId
Steps []events.Postprocessingstep
Status Status
Failures int
InitiatorID string
config config.Postprocessing
}

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"time"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
@@ -92,13 +93,14 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
switch ev := e.Event.(type) {
case events.BytesReceived:
pp = &postprocessing.Postprocessing{
ID: ev.UploadID,
URL: ev.URL,
User: ev.ExecutingUser,
Filename: ev.Filename,
Filesize: ev.Filesize,
ResourceID: ev.ResourceID,
Steps: pps.steps,
ID: ev.UploadID,
URL: ev.URL,
User: ev.ExecutingUser,
Filename: ev.Filename,
Filesize: ev.Filesize,
ResourceID: ev.ResourceID,
Steps: pps.steps,
InitiatorID: e.InitiatorID,
}
next = pp.Init(ev)
case events.PostprocessingStepFinished:
@@ -160,11 +162,14 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
}
if pp != nil {
ctx = ctxpkg.ContextSetInitiator(ctx, pp.InitiatorID)
if err := storePP(pps.store, pp); err != nil {
pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload")
return fmt.Errorf("%w: cannot store upload", errEvent)
}
}
if next != nil {
if err := events.Publish(ctx, pps.pub, next); err != nil {
pps.log.Error().Err(err).Msg("unable to publish event")

View File

@@ -185,7 +185,6 @@ func NewUnary(m map[string]interface{}) (grpc.UnaryServerInterceptor, int, error
ev = FileTouched(v, req.(*provider.TouchFileRequest), ownerID, executantID)
}
case *provider.SetLockResponse:
fmt.Println("set lock response", v)
if isSuccess(v) {
ev = FileLocked(v, req.(*provider.SetLockRequest), ownerID, executantID)
}

View File

@@ -39,6 +39,14 @@ func NewUnary() grpc.UnaryServerInterceptor {
ctx = metadata.AppendToOutgoingContext(ctx, ctxpkg.TokenHeader, tkn)
}
}
if val, ok := md[ctxpkg.InitiatorHeader]; ok {
if len(val) > 0 && val[0] != "" {
initiatorID := val[0]
ctx = ctxpkg.ContextSetInitiator(ctx, initiatorID)
ctx = metadata.AppendToOutgoingContext(ctx, ctxpkg.InitiatorHeader, initiatorID)
}
}
}
return handler(ctx, req)
@@ -61,6 +69,14 @@ func NewStream() grpc.StreamServerInterceptor {
ctx = metadata.AppendToOutgoingContext(ctx, ctxpkg.TokenHeader, tkn)
}
}
if val, ok := md[ctxpkg.InitiatorHeader]; ok {
if len(val) > 0 && val[0] != "" {
initiatorID := val[0]
ctx = ctxpkg.ContextSetInitiator(ctx, initiatorID)
ctx = metadata.AppendToOutgoingContext(ctx, ctxpkg.InitiatorHeader, initiatorID)
}
}
}
wrapped := newWrappedServerStream(ctx, ss)

View File

@@ -182,6 +182,18 @@ func (s *svc) updateSpaceShare(ctx context.Context, req *collaboration.UpdateSha
return nil, errors.Wrap(err, "gateway: error denying grant in storage")
}
} else {
if !grant.Permissions.RemoveGrant {
// this request might remove Manager Permissions so we need to
// check if there is at least one manager remaining of the
// resource.
listGrantRes, err := s.listGrants(ctx, req.GetShare().GetResourceId())
if err != nil {
return nil, errors.Wrap(err, "gateway: error getting grant to remove from storage")
}
if !isSpaceManagerRemaining(listGrantRes.GetGrants(), grant.GetGrantee()) {
return nil, errors.New("gateway: can't remove the last manager")
}
}
st, err = s.updateGrant(ctx, req.GetShare().GetResourceId(), grant, opaque)
if err != nil {
return nil, errors.Wrap(err, "gateway: error adding grant to storage")
@@ -709,6 +721,11 @@ func (s *svc) removeSpaceShare(ctx context.Context, ref *provider.ResourceId, gr
if permissions == nil {
return nil, errors.New("gateway: error getting grant to remove from storage")
}
if len(listGrantRes.Grants) == 1 || !isSpaceManagerRemaining(listGrantRes.Grants, grantee) {
return nil, errors.New("gateway: can't remove the last manager")
}
// TODO: change CS3 APIs
opaque := &typesv1beta1.Opaque{
Map: map[string]*typesv1beta1.OpaqueEntry{
@@ -728,6 +745,18 @@ func (s *svc) removeSpaceShare(ctx context.Context, ref *provider.ResourceId, gr
return &collaboration.RemoveShareResponse{Status: status.NewOK(ctx)}, nil
}
func isSpaceManagerRemaining(grants []*provider.Grant, grantee *provider.Grantee) bool {
for _, g := range grants {
// RemoveGrant is currently the way to check for the manager role
// If it is not set than the current grant is not for a manager and
// we can just continue with the next one.
if g.Permissions.RemoveGrant && !isEqualGrantee(g.Grantee, grantee) {
return true
}
}
return false
}
func (s *svc) checkLock(ctx context.Context, shareId *collaboration.ShareId) (*rpc.Status, error) {
logger := appctx.GetLogger(ctx)
getShareRes, err := s.GetShare(ctx, &collaboration.GetShareRequest{

View File

@@ -240,7 +240,7 @@ func authenticateUser(w http.ResponseWriter, r *http.Request, conf *config, toke
logError(isUnprotectedEndpoint, log, err, "got an error retrieving groups for user "+user.Username, http.StatusInternalServerError, w)
return nil, err
}
return ctxWithUserInfo(ctx, r, user, token, tokenScope), nil
return ctxWithUserInfo(ctx, r, user, token, tokenScope, r.Header.Get(ctxpkg.InitiatorHeader)), nil
}
}
}
@@ -341,24 +341,16 @@ func authenticateUser(w http.ResponseWriter, r *http.Request, conf *config, toke
return nil, err
}
// store user and core access token in context.
ctx = ctxpkg.ContextSetUser(ctx, u)
ctx = ctxpkg.ContextSetToken(ctx, token)
ctx = metadata.AppendToOutgoingContext(ctx, ctxpkg.TokenHeader, token) // TODO(jfd): hardcoded metadata key. use PerRPCCredentials?
ctx = metadata.AppendToOutgoingContext(ctx, ctxpkg.UserAgentHeader, r.UserAgent())
// store scopes in context
ctx = ctxpkg.ContextSetScopes(ctx, tokenScope)
return ctxWithUserInfo(ctx, r, u, token, tokenScope), nil
return ctxWithUserInfo(ctx, r, u, token, tokenScope, r.Header.Get(ctxpkg.InitiatorHeader)), nil
}
func ctxWithUserInfo(ctx context.Context, r *http.Request, user *userpb.User, token string, tokenScope map[string]*authpb.Scope) context.Context {
func ctxWithUserInfo(ctx context.Context, r *http.Request, user *userpb.User, token string, tokenScope map[string]*authpb.Scope, initiatorid string) context.Context {
ctx = ctxpkg.ContextSetUser(ctx, user)
ctx = ctxpkg.ContextSetToken(ctx, token)
ctx = ctxpkg.ContextSetInitiator(ctx, initiatorid)
ctx = metadata.AppendToOutgoingContext(ctx, ctxpkg.TokenHeader, token)
ctx = metadata.AppendToOutgoingContext(ctx, ctxpkg.UserAgentHeader, r.UserAgent())
ctx = metadata.AppendToOutgoingContext(ctx, ctxpkg.InitiatorHeader, initiatorid)
ctx = ctxpkg.ContextSetScopes(ctx, tokenScope)
return ctx
}

View File

@@ -0,0 +1,35 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package ctx
import "context"
// InitiatorHeader is the header key used to pass the initiator id to http services and grpc calls.
var InitiatorHeader = "initiator-id"
// ContextGetInitiator returns the initiator if set in the given context.
func ContextGetInitiator(ctx context.Context) (string, bool) {
i, ok := ctx.Value(initiatorKey).(string)
return i, ok
}
// ContextSetInitiator stores the initiator in the context.
func ContextSetInitiator(ctx context.Context, i string) context.Context {
return context.WithValue(ctx, initiatorKey, i)
}

View File

@@ -32,6 +32,7 @@ const (
idKey
lockIDKey
scopeKey
initiatorKey
)
// ContextGetUser returns the user if set in the given context.

View File

@@ -23,6 +23,7 @@ import (
"log"
"reflect"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/google/uuid"
"go-micro.dev/v4/events"
"go.opentelemetry.io/otel/propagation"
@@ -43,6 +44,9 @@ var (
// MetadatakeyTraceParent is the key used for the traceparent in the metadata map of the event
MetadatakeyTraceParent = "traceparent"
// MetadatakeyInitiatorID is the key used for the initiator id in the metadata map of the event
MetadatakeyInitiatorID = "initiatorid"
)
type (
@@ -72,6 +76,7 @@ type (
Type string
ID string
TraceParent string
InitiatorID string
Event interface{}
}
)
@@ -111,6 +116,7 @@ func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan Event, error
Type: et,
ID: e.Metadata[MetadatakeyEventID],
TraceParent: e.Metadata[MetadatakeyTraceParent],
InitiatorID: e.Metadata[MetadatakeyInitiatorID],
Event: event,
}
}
@@ -133,6 +139,7 @@ func ConsumeAll(s Consumer, group string) (<-chan Event, error) {
Type: e.Metadata[MetadatakeyEventType],
ID: e.Metadata[MetadatakeyEventID],
TraceParent: e.Metadata[MetadatakeyTraceParent],
InitiatorID: e.Metadata[MetadatakeyInitiatorID],
Event: e.Payload,
}
}
@@ -145,10 +152,12 @@ func ConsumeAll(s Consumer, group string) (<-chan Event, error) {
func Publish(ctx context.Context, s Publisher, ev interface{}) error {
evName := reflect.TypeOf(ev).String()
traceParent := getTraceParentFromCtx(ctx)
iid, _ := ctxpkg.ContextGetInitiator(ctx)
return s.Publish(MainQueueName, ev, events.WithMetadata(map[string]string{
MetadatakeyEventType: evName,
MetadatakeyEventID: uuid.New().String(),
MetadatakeyTraceParent: traceParent,
MetadatakeyInitiatorID: iid,
}))
}

View File

@@ -252,6 +252,8 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
continue // NOTE: since we can't get the upload, we can't delete the blob
}
ctx = session.Context(ctx)
n, err := session.Node(ctx)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node")

View File

@@ -54,6 +54,8 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u
session := up.(*upload.OcisSession)
ctx = session.Context(ctx)
if session.Chunk() != "" { // check chunking v1
p, assembledFile, err := fs.chunkHandler.WriteChunk(session.Chunk(), req.Body)
if err != nil {
@@ -178,6 +180,9 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere
session.SetStorageValue("SpaceRoot", n.SpaceRoot.ID) // TODO SpaceRoot -> SpaceID
session.SetStorageValue("SpaceOwnerOrManager", n.SpaceOwnerOrManager(ctx).GetOpaqueId()) // TODO needed for what?
iid, _ := ctxpkg.ContextGetInitiator(ctx)
session.SetMetadata("initiatorid", iid)
if metadata != nil {
session.SetMetadata("providerID", metadata["providerID"])
if mtime, ok := metadata["mtime"]; ok {

View File

@@ -54,7 +54,8 @@ func (s *OcisSession) Context(ctx context.Context) context.Context { // restore
sub := log.With().Int("pid", os.Getpid()).Logger()
ctx = appctx.WithLogger(ctx, &sub)
ctx = ctxpkg.ContextSetLockID(ctx, s.lockID())
return ctxpkg.ContextSetUser(ctx, s.executantUser())
ctx = ctxpkg.ContextSetUser(ctx, s.executantUser())
return ctxpkg.ContextSetInitiator(ctx, s.InitiatorID())
}
func (s *OcisSession) lockID() string {
@@ -305,6 +306,11 @@ func (s *OcisSession) binPath() string {
return filepath.Join(s.store.root, "uploads", s.info.ID)
}
// InitiatorID returns the id of the initiating client
func (s *OcisSession) InitiatorID() string {
return s.info.MetaData["initiatorid"]
}
// sessionPath returns the path to the .info file storing the file's info.
func sessionPath(root, id string) string {
return filepath.Join(root, "uploads", id+".info")

View File

@@ -129,6 +129,8 @@ func (session *OcisSession) FinishUpload(ctx context.Context) error {
defer span.End()
log := appctx.GetLogger(ctx)
ctx = ctxpkg.ContextSetInitiator(ctx, session.InitiatorID())
// calculate the checksum of the written bytes
// they will all be written to the metadata later, so we cannot omit any of them
// TODO only calculate the checksum in sync that was requested to match, the rest could be async ... but the tests currently expect all to be present

2
vendor/modules.txt vendored
View File

@@ -359,7 +359,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
# github.com/cs3org/reva/v2 v2.19.2-0.20240320153915-890d63097350
# github.com/cs3org/reva/v2 v2.19.2-0.20240322140620-cbb501a7ae3a
## explicit; go 1.21
github.com/cs3org/reva/v2/cmd/revad/internal/grace
github.com/cs3org/reva/v2/cmd/revad/runtime