diff --git a/go.mod b/go.mod index 1922374ad6..e780145c63 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/coreos/go-oidc v2.2.1+incompatible github.com/coreos/go-oidc/v3 v3.6.0 github.com/cs3org/go-cs3apis v0.0.0-20230516150832-730ac860c71d - github.com/cs3org/reva/v2 v2.16.4-0.20231211121647-b269a07b70b2 + github.com/cs3org/reva/v2 v2.16.4-0.20231212083844-00de22ceb749 github.com/disintegration/imaging v1.6.2 github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e github.com/egirna/icap-client v0.1.1 diff --git a/go.sum b/go.sum index 5bbe351d3a..8052cd60f3 100644 --- a/go.sum +++ b/go.sum @@ -864,8 +864,8 @@ github.com/crewjam/httperr v0.2.0 h1:b2BfXR8U3AlIHwNeFFvZ+BV1LFvKLlzMjzaTnZMybNo github.com/crewjam/httperr v0.2.0/go.mod h1:Jlz+Sg/XqBQhyMjdDiC+GNNRzZTD7x39Gu3pglZ5oH4= github.com/crewjam/saml v0.4.13 h1:TYHggH/hwP7eArqiXSJUvtOPNzQDyQ7vwmwEqlFWhMc= github.com/crewjam/saml v0.4.13/go.mod h1:igEejV+fihTIlHXYP8zOec3V5A8y3lws5bQBFsTm4gA= -github.com/cs3org/reva/v2 v2.16.4-0.20231211121647-b269a07b70b2 h1:I6+bI04Kh0MoTSi/EnfkHqdr1HetFTxV3Sph5RIgTNg= -github.com/cs3org/reva/v2 v2.16.4-0.20231211121647-b269a07b70b2/go.mod h1:RvhuweTFqzezjUFU0SIdTXakrEx9vJlMvQ7znPXSP1g= +github.com/cs3org/reva/v2 v2.16.4-0.20231212083844-00de22ceb749 h1:oCktbSObMu5VTGwcux3tlabQwQT+e0CycuheKvzbQow= +github.com/cs3org/reva/v2 v2.16.4-0.20231212083844-00de22ceb749/go.mod h1:RvhuweTFqzezjUFU0SIdTXakrEx9vJlMvQ7znPXSP1g= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/services/storage-users/pkg/command/uploads.go b/services/storage-users/pkg/command/uploads.go index 568864defc..8217a669c4 100644 --- a/services/storage-users/pkg/command/uploads.go +++ b/services/storage-users/pkg/command/uploads.go @@ -3,11 +3,8 @@ package command import ( "fmt" "os" - "strconv" "sync" - "time" - tusd "github.com/tus/tusd/pkg/handler" "github.com/urfave/cli/v2" "github.com/cs3org/reva/v2/pkg/storage" @@ -51,20 +48,21 @@ func ListUploads(cfg *config.Config) *cli.Command { return err } - managingFS, ok := fs.(storage.UploadsManager) + managingFS, ok := fs.(storage.UploadSessionLister) if !ok { fmt.Fprintf(os.Stderr, "'%s' storage does not support listing expired uploads\n", cfg.Driver) os.Exit(1) } - - uploads, err := managingFS.ListUploads() + falseValue := false + uploads, err := managingFS.ListUploadSessions(c.Context, storage.UploadSessionFilter{Expired: &falseValue}) if err != nil { return err } fmt.Println("Incomplete uploads:") for _, u := range uploads { - fmt.Printf(" - %s (%s, Size: %d, Expires: %s)\n", u.ID, u.MetaData["filename"], u.Size, expiredString(u.MetaData["expires"])) + ref := u.Reference() + fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing()) } return nil }, @@ -92,7 +90,7 @@ func PurgeExpiredUploads(cfg *config.Config) *cli.Command { return err } - managingFS, ok := fs.(storage.UploadsManager) + managingFS, ok := fs.(storage.UploadSessionLister) if !ok { fmt.Fprintf(os.Stderr, "'%s' storage does not support clean expired uploads\n", cfg.Driver) os.Exit(1) @@ -100,18 +98,23 @@ func PurgeExpiredUploads(cfg *config.Config) *cli.Command { wg := sync.WaitGroup{} wg.Add(1) - purgedChannel := make(chan tusd.FileInfo) + falseValue := false + trueValue := false + uploads, err := managingFS.ListUploadSessions(c.Context, storage.UploadSessionFilter{Expired: &trueValue, Processing: &falseValue}) + if err != nil { + return err + } - fmt.Println("Cleaned uploads:") + fmt.Println("purging uploads:") go func() { - for purged := range purgedChannel { - fmt.Printf(" - %s (%s, Size: %d, Expires: %s)\n", purged.ID, purged.MetaData["filename"], purged.Size, expiredString(purged.MetaData["expires"])) + for _, u := range uploads { + ref := u.Reference() + fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing()) + u.Purge(c.Context) } wg.Done() }() - err = managingFS.PurgeExpiredUploads(purgedChannel) - close(purgedChannel) wg.Wait() if err != nil { fmt.Fprintf(os.Stderr, "Failed to clean expired uploads '%s'\n", err) @@ -121,12 +124,3 @@ func PurgeExpiredUploads(cfg *config.Config) *cli.Command { }, } } - -func expiredString(e string) string { - expired := "N/A" - iExpires, err := strconv.Atoi(e) - if err == nil { - expired = time.Unix(int64(iExpires), 0).Format(time.RFC3339) - } - return expired -} diff --git a/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/tus/tus.go b/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/tus/tus.go index f98ce8adb1..a46bd17739 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/tus/tus.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/tus/tus.go @@ -23,13 +23,11 @@ import ( "log" "net/http" "path" - "path/filepath" "time" "github.com/pkg/errors" tusd "github.com/tus/tusd/pkg/handler" - userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net" "github.com/cs3org/reva/v2/pkg/appctx" @@ -40,8 +38,8 @@ import ( "github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/cache" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload" "github.com/cs3org/reva/v2/pkg/storagespace" - "github.com/cs3org/reva/v2/pkg/utils" "github.com/mitchellh/mapstructure" ) @@ -103,33 +101,27 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { return nil, err } - go func() { - for { - ev := <-handler.CompleteUploads - info := ev.Upload - spaceOwner := &userv1beta1.UserId{ - OpaqueId: info.Storage["SpaceOwnerOrManager"], - } - owner := &userv1beta1.UserId{ - Idp: info.Storage["Idp"], - OpaqueId: info.Storage["UserId"], - } - ref := &provider.Reference{ - ResourceId: &provider.ResourceId{ - StorageId: info.MetaData["providerID"], - SpaceId: info.Storage["SpaceRoot"], - OpaqueId: info.Storage["SpaceRoot"], - }, - Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), - } - datatx.InvalidateCache(owner, ref, m.statCache) - if m.publisher != nil { - if err := datatx.EmitFileUploadedEvent(spaceOwner, owner, ref, m.publisher); err != nil { - appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event") + if _, ok := fs.(storage.UploadSessionLister); ok { + // We can currently only send updates if the fs is decomposedfs as we read very specific keys from the storage map of the tus info + go func() { + for { + ev := <-handler.CompleteUploads + // We should be able to get the upload progress with fs.GetUploadProgress, but currently tus will erase the info files + // so we create a Progress instance here that is used to read the correct properties + up := upload.Progress{ + Info: ev.Upload, + } + executant := up.Executant() + ref := up.Reference() + datatx.InvalidateCache(&executant, &ref, m.statCache) + if m.publisher != nil { + if err := datatx.EmitFileUploadedEvent(up.SpaceOwner(), &executant, &ref, m.publisher); err != nil { + appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event") + } } } - } - }() + }() + } h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { method := r.Method diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/storage.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/storage.go index 668b37f369..a81994cb42 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/storage.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/storage.go @@ -23,16 +23,10 @@ import ( "io" "net/url" - tusd "github.com/tus/tusd/pkg/handler" - - userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1" ) -// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished -type UploadFinishedFunc func(spaceOwner, owner *userpb.UserId, ref *provider.Reference) - // FS is the interface to implement access to the storage. type FS interface { GetHome(ctx context.Context) (string, error) @@ -77,12 +71,6 @@ type FS interface { DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorageSpaceRequest) error } -// UploadsManager defines the interface for FS implementations that allow for managing uploads -type UploadsManager interface { - ListUploads() ([]tusd.FileInfo, error) - PurgeExpiredUploads(chan<- tusd.FileInfo) error -} - // Registry is the interface that storage registries implement // for discovering storage providers type Registry interface { diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/uploads.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/uploads.go new file mode 100644 index 0000000000..87d26115bf --- /dev/null +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/uploads.go @@ -0,0 +1,86 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package storage + +import ( + "context" + "io" + "time" + + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + tusd "github.com/tus/tusd/pkg/handler" +) + +// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished +type UploadFinishedFunc func(spaceOwner, executant *userpb.UserId, ref *provider.Reference) + +// UploadRequest us used in FS.Upload() to carry required upload metadata +type UploadRequest struct { + Ref *provider.Reference + Body io.ReadCloser + Length int64 +} + +// UploadsManager defines the interface for storage drivers that allow for managing uploads +// Deprecated: No longer used. Storage drivers should implement the UploadSessionLister. +type UploadsManager interface { + ListUploads() ([]tusd.FileInfo, error) + PurgeExpiredUploads(chan<- tusd.FileInfo) error +} + +// UploadSessionLister defines the interface for FS implementations that allow listing and purging upload sessions +type UploadSessionLister interface { + // ListUploadSessions returns the upload sessions matching the given filter + ListUploadSessions(ctx context.Context, filter UploadSessionFilter) ([]UploadSession, error) +} + +// UploadSession is the interface that storage drivers need to return whan listing upload sessions. +type UploadSession interface { + // ID returns the upload id + ID() string + // Filename returns the filename of the file + Filename() string + // Size returns the size of the upload + Size() int64 + // Offset returns the current offset + Offset() int64 + // Reference returns a reference for the file being uploaded. May be absolute id based or relative to e.g. a space root + Reference() provider.Reference + // Executant returns the userid of the user that created the upload + Executant() userpb.UserId + // SpaceOwner returns the owner of a space if set. optional + SpaceOwner() *userpb.UserId + // Expires returns the time when the upload can no longer be used + Expires() time.Time + + // IsProcessing returns true if postprocessing has not finished, yet + // The actual postprocessing state is tracked in the postprocessing service. + IsProcessing() bool + + // Purge allows completely removing an upload. Should emit a PostprocessingFinished event with a Delete outcome + Purge(ctx context.Context) error +} + +// UploadSessionFilter can be used to filter upload sessions +type UploadSessionFilter struct { + ID *string + Processing *bool + Expired *bool +} diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go index a27086a4df..2ec3726ddd 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go @@ -20,11 +20,11 @@ package decomposedfs import ( "context" + "fmt" "io" "os" "path/filepath" "regexp" - "strconv" "strings" "time" @@ -97,14 +97,14 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i }, Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), } - owner, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx) + executant, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx) if !ok { return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from uploadinfo context") } spaceOwner := &userpb.UserId{ OpaqueId: info.Storage["SpaceOwnerOrManager"], } - uff(spaceOwner, owner.Id, uploadRef) + uff(spaceOwner, executant.Id, uploadRef) } ri := provider.ResourceInfo{ @@ -244,36 +244,42 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, return upload.Get(ctx, id, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) } -// ListUploads returns a list of all incomplete uploads -func (fs *Decomposedfs) ListUploads() ([]tusd.FileInfo, error) { - return fs.uploadInfos(context.Background()) -} - -// PurgeExpiredUploads scans the fs for expired downloads and removes any leftovers -func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- tusd.FileInfo) error { - infos, err := fs.uploadInfos(context.Background()) - if err != nil { - return err - } - - for _, info := range infos { - expires, err := strconv.Atoi(info.MetaData["expires"]) +// ListUploadSessions returns the upload sessions for the given filter +func (fs *Decomposedfs) ListUploadSessions(ctx context.Context, filter storage.UploadSessionFilter) ([]storage.UploadSession, error) { + var sessions []storage.UploadSession + if filter.ID != nil && *filter.ID != "" { + session, err := fs.getUploadSession(ctx, filepath.Join(fs.o.Root, "uploads", *filter.ID+".info")) if err != nil { + return nil, err + } + sessions = []storage.UploadSession{session} + } else { + var err error + sessions, err = fs.uploadSessions(ctx) + if err != nil { + return nil, err + } + } + filteredSessions := []storage.UploadSession{} + now := time.Now() + for _, session := range sessions { + if filter.Processing != nil && *filter.Processing != session.IsProcessing() { continue } - if int64(expires) < time.Now().Unix() { - purgedChan <- info - err = os.Remove(info.Storage["BinPath"]) - if err != nil { - return err - } - err = os.Remove(filepath.Join(fs.o.Root, "uploads", info.ID+".info")) - if err != nil { - return err + if filter.Expired != nil { + if *filter.Expired { + if now.Before(session.Expires()) { + continue + } + } else { + if now.After(session.Expires()) { + continue + } } } + filteredSessions = append(filteredSessions, session) } - return nil + return filteredSessions, nil } // AsTerminatableUpload returns a TerminatableUpload @@ -297,28 +303,47 @@ func (fs *Decomposedfs) AsConcatableUpload(up tusd.Upload) tusd.ConcatableUpload return up.(*upload.Upload) } -func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]tusd.FileInfo, error) { - infos := []tusd.FileInfo{} +func (fs *Decomposedfs) uploadSessions(ctx context.Context) ([]storage.UploadSession, error) { + uploads := []storage.UploadSession{} infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info")) if err != nil { return nil, err } for _, info := range infoFiles { - match := _idRegexp.FindStringSubmatch(info) - if match == nil || len(match) < 2 { + progress, err := fs.getUploadSession(ctx, info) + if err != nil { + appctx.GetLogger(ctx).Error().Interface("path", info).Msg("Decomposedfs: could not getUploadSession") continue } - up, err := fs.GetUpload(ctx, match[1]) - if err != nil { - return nil, err - } - info, err := up.GetInfo(context.Background()) - if err != nil { - return nil, err - } - infos = append(infos, info) + uploads = append(uploads, progress) } - return infos, nil + return uploads, nil +} + +func (fs *Decomposedfs) getUploadSession(ctx context.Context, path string) (storage.UploadSession, error) { + match := _idRegexp.FindStringSubmatch(path) + if match == nil || len(match) < 2 { + return nil, fmt.Errorf("invalid upload path") + } + up, err := fs.GetUpload(ctx, match[1]) + if err != nil { + return nil, err + } + info, err := up.GetInfo(context.Background()) + if err != nil { + return nil, err + } + // upload processing state is stored in the node, for decomposedfs the NodeId is always set by InitiateUpload + n, err := node.ReadNode(ctx, fs.lu, info.Storage["SpaceRoot"], info.Storage["NodeId"], true, nil, true) + if err != nil { + return nil, err + } + progress := upload.Progress{ + Path: path, + Info: info, + Processing: n.IsProcessing(ctx), + } + return progress, nil } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/processing.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/processing.go index d70f7f44a8..1d57d0c6ae 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/processing.go @@ -21,6 +21,7 @@ package upload import ( "context" "encoding/json" + stderrors "errors" "fmt" iofs "io/fs" "os" @@ -493,3 +494,85 @@ func lookupNode(ctx context.Context, spaceRoot *node.Node, path string, lu *look } return n, nil } + +// Progress adapts the persisted upload metadata for the UploadSessionLister interface +type Progress struct { + Path string + Info tusd.FileInfo + Processing bool +} + +// ID implements the storage.UploadSession interface +func (p Progress) ID() string { + return p.Info.ID +} + +// Filename implements the storage.UploadSession interface +func (p Progress) Filename() string { + return p.Info.MetaData["filename"] +} + +// Size implements the storage.UploadSession interface +func (p Progress) Size() int64 { + return p.Info.Size +} + +// Offset implements the storage.UploadSession interface +func (p Progress) Offset() int64 { + return p.Info.Offset +} + +// Reference implements the storage.UploadSession interface +func (p Progress) Reference() provider.Reference { + return provider.Reference{ + ResourceId: &provider.ResourceId{ + StorageId: p.Info.MetaData["providerID"], + SpaceId: p.Info.Storage["SpaceRoot"], + OpaqueId: p.Info.Storage["NodeId"], // Node id is always set in InitiateUpload + }, + } +} + +// Executant implements the storage.UploadSession interface +func (p Progress) Executant() userpb.UserId { + return userpb.UserId{ + Idp: p.Info.Storage["Idp"], + OpaqueId: p.Info.Storage["UserId"], + Type: utils.UserTypeMap(p.Info.Storage["UserType"]), + } +} + +// SpaceOwner implements the storage.UploadSession interface +func (p Progress) SpaceOwner() *userpb.UserId { + return &userpb.UserId{ + // idp and type do not seem to be consumed and the node currently only stores the user id anyway + OpaqueId: p.Info.Storage["SpaceOwnerOrManager"], + } +} + +// Expires implements the storage.UploadSession interface +func (p Progress) Expires() time.Time { + mt, _ := utils.MTimeToTime(p.Info.MetaData["expires"]) + return mt +} + +// IsProcessing implements the storage.UploadSession interface +func (p Progress) IsProcessing() bool { + return p.Processing +} + +// Purge implements the storage.UploadSession interface +func (p Progress) Purge(ctx context.Context) error { + berr := os.Remove(p.Info.Storage["BinPath"]) + if berr != nil { + appctx.GetLogger(ctx).Error().Str("id", p.Info.ID).Interface("path", p.Info.Storage["BinPath"]).Msg("Decomposedfs: could not purge bin path for upload session") + } + + // remove upload metadata + merr := os.Remove(p.Path) + if merr != nil { + appctx.GetLogger(ctx).Error().Str("id", p.Info.ID).Interface("path", p.Path).Msg("Decomposedfs: could not purge metadata path for upload session") + } + + return stderrors.Join(berr, merr) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 40f9b2ffaa..374186ee48 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -354,7 +354,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1 github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1 github.com/cs3org/go-cs3apis/cs3/tx/v1beta1 github.com/cs3org/go-cs3apis/cs3/types/v1beta1 -# github.com/cs3org/reva/v2 v2.16.4-0.20231211121647-b269a07b70b2 +# github.com/cs3org/reva/v2 v2.16.4-0.20231212083844-00de22ceb749 ## explicit; go 1.20 github.com/cs3org/reva/v2/cmd/revad/internal/grace github.com/cs3org/reva/v2/cmd/revad/runtime