diff --git a/go.mod b/go.mod index 387ed5659..829a9301a 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/coreos/go-oidc v2.2.1+incompatible github.com/coreos/go-oidc/v3 v3.6.0 github.com/cs3org/go-cs3apis v0.0.0-20230516150832-730ac860c71d - github.com/cs3org/reva/v2 v2.15.1-0.20230731062052-2c2e370980e0 + github.com/cs3org/reva/v2 v2.15.1-0.20230731073021-26b2f3432d11 github.com/disintegration/imaging v1.6.2 github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e github.com/egirna/icap-client v0.1.1 diff --git a/go.sum b/go.sum index 6d757249a..9c0a4fcce 100644 --- a/go.sum +++ b/go.sum @@ -625,8 +625,8 @@ github.com/crewjam/httperr v0.2.0 h1:b2BfXR8U3AlIHwNeFFvZ+BV1LFvKLlzMjzaTnZMybNo github.com/crewjam/httperr v0.2.0/go.mod h1:Jlz+Sg/XqBQhyMjdDiC+GNNRzZTD7x39Gu3pglZ5oH4= github.com/crewjam/saml v0.4.13 h1:TYHggH/hwP7eArqiXSJUvtOPNzQDyQ7vwmwEqlFWhMc= github.com/crewjam/saml v0.4.13/go.mod h1:igEejV+fihTIlHXYP8zOec3V5A8y3lws5bQBFsTm4gA= -github.com/cs3org/reva/v2 v2.15.1-0.20230731062052-2c2e370980e0 h1:Vmghy5B5q/C22JR+fPtsKMra2ug2p3s0CeVmNnQIu4o= -github.com/cs3org/reva/v2 v2.15.1-0.20230731062052-2c2e370980e0/go.mod h1:4z5EQghS2LhSWZWocH51Dw9VAs16No1zSFvFgQtgS7w= +github.com/cs3org/reva/v2 v2.15.1-0.20230731073021-26b2f3432d11 h1:44FU3t+cJFBEiheA1Vj8JxLVXt8eWY1J0h4lNsaOgxo= +github.com/cs3org/reva/v2 v2.15.1-0.20230731073021-26b2f3432d11/go.mod h1:4z5EQghS2LhSWZWocH51Dw9VAs16No1zSFvFgQtgS7w= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= diff --git a/services/storage-users/pkg/config/config.go b/services/storage-users/pkg/config/config.go index 7c738792b..a2be82ea3 100644 --- a/services/storage-users/pkg/config/config.go +++ b/services/storage-users/pkg/config/config.go @@ -91,9 +91,16 @@ type Drivers struct { Local LocalDriver `yaml:",omitempty"` // not supported by the oCIS product, therefore not part of docs } +// AsyncPropagatorOptions configures the async propagator +type AsyncPropagatorOptions struct { + PropagationDelay time.Duration `yaml:"propagation_delay" env:"STORAGE_USERS_ASYNC_PROPAGATOR_PROPAGATION_DELAY" desc:"The delay in seconds between a change made to a tree and the propagation start on treesize and treetime. Multiple propagations are computed to a single one."` +} + // OCISDriver is the storage driver configuration when using 'ocis' storage driver type OCISDriver struct { - MetadataBackend string `yaml:"metadata_backend" env:"OCIS_DECOMPOSEDFS_METADATA_BACKEND;STORAGE_USERS_OCIS_METADATA_BACKEND" desc:"The backend to use for storing metadata. Supported values are 'messagepack' and 'xattrs'. The setting 'messagepack' uses a dedicated file to store file metadata while 'xattrs' uses extended attributes to store file metadata. Defaults to 'messagepack'."` + MetadataBackend string `yaml:"metadata_backend" env:"OCIS_DECOMPOSEDFS_METADATA_BACKEND;STORAGE_USERS_OCIS_METADATA_BACKEND" desc:"The backend to use for storing metadata. Supported values are 'messagepack' and 'xattrs'. The setting 'messagepack' uses a dedicated file to store file metadata while 'xattrs' uses extended attributes to store file metadata. Defaults to 'messagepack'."` + Propagator string `yaml:"propagator" env:"OCIS_DECOMPOSEDFS_PROPAGATOR;STORAGE_USERS_OCIS_PROPAGATOR" desc:"The propagator used for decomposedfs. At the moment, only 'sync' is fully supported, 'async' is available as an experimental option."` + AsyncPropagatorOptions AsyncPropagatorOptions `yaml:"async_propagator_options"` // Root is the absolute path to the location of the data Root string `yaml:"root" env:"STORAGE_USERS_OCIS_ROOT" desc:"The directory where the filesystem storage will store blobs and metadata. If not defined, the root directory derives from $OCIS_BASE_DATA_PATH:/storage/users."` UserLayout string `yaml:"user_layout" env:"STORAGE_USERS_OCIS_USER_LAYOUT" desc:"Template string for the user storage layout in the user directory."` @@ -115,7 +122,9 @@ type OCISDriver struct { // S3NGDriver is the storage driver configuration when using 's3ng' storage driver type S3NGDriver struct { - MetadataBackend string `yaml:"metadata_backend" env:"STORAGE_USERS_S3NG_METADATA_BACKEND" desc:"The backend to use for storing metadata. Supported values are 'xattrs' and 'messagepack'. The setting 'xattrs' uses extended attributes to store file metadata while 'messagepack' uses a dedicated file to store file metadata. Defaults to 'xattrs'."` + MetadataBackend string `yaml:"metadata_backend" env:"STORAGE_USERS_S3NG_METADATA_BACKEND" desc:"The backend to use for storing metadata. Supported values are 'xattrs' and 'messagepack'. The setting 'xattrs' uses extended attributes to store file metadata while 'messagepack' uses a dedicated file to store file metadata. Defaults to 'xattrs'."` + Propagator string `yaml:"propagator" env:"OCIS_DECOMPOSEDFS_PROPAGATOR;STORAGE_USERS_S3NG_PROPAGATOR" desc:"The propagator used for decomposedfs. At the moment, only 'sync' is fully supported, 'async' is available as an experimental option."` + AsyncPropagatorOptions AsyncPropagatorOptions `yaml:"async_propagator_options"` // Root is the absolute path to the location of the data Root string `yaml:"root" env:"STORAGE_USERS_S3NG_ROOT" desc:"The directory where the filesystem storage will store metadata for blobs. If not defined, the root directory derives from $OCIS_BASE_DATA_PATH:/storage/users."` UserLayout string `yaml:"user_layout" env:"STORAGE_USERS_S3NG_USER_LAYOUT" desc:"Template string for the user storage layout in the user directory."` diff --git a/services/storage-users/pkg/config/defaults/defaultconfig.go b/services/storage-users/pkg/config/defaults/defaultconfig.go index 4674a3b1e..9cb71e4f2 100644 --- a/services/storage-users/pkg/config/defaults/defaultconfig.go +++ b/services/storage-users/pkg/config/defaults/defaultconfig.go @@ -63,6 +63,7 @@ func DefaultConfig() *config.Config { }, S3NG: config.S3NGDriver{ MetadataBackend: "messagepack", + Propagator: "sync", Root: filepath.Join(defaults.BaseDataPath(), "storage", "users"), ShareFolder: "/Shares", UserLayout: "{{.Id.OpaqueId}}", @@ -75,6 +76,7 @@ func DefaultConfig() *config.Config { }, OCIS: config.OCISDriver{ MetadataBackend: "messagepack", + Propagator: "sync", Root: filepath.Join(defaults.BaseDataPath(), "storage", "users"), ShareFolder: "/Shares", UserLayout: "{{.Id.OpaqueId}}", diff --git a/services/storage-users/pkg/revaconfig/drivers.go b/services/storage-users/pkg/revaconfig/drivers.go index 7fb104757..097f16250 100644 --- a/services/storage-users/pkg/revaconfig/drivers.go +++ b/services/storage-users/pkg/revaconfig/drivers.go @@ -115,7 +115,11 @@ func OwnCloudSQL(cfg *config.Config) map[string]interface{} { // Ocis is the config mapping for the Ocis storage driver func Ocis(cfg *config.Config) map[string]interface{} { return map[string]interface{}{ - "metadata_backend": cfg.Drivers.OCIS.MetadataBackend, + "metadata_backend": cfg.Drivers.OCIS.MetadataBackend, + "propagator": cfg.Drivers.OCIS.Propagator, + "async_propagator_options": map[string]interface{}{ + "propagation_delay": cfg.Drivers.OCIS.AsyncPropagatorOptions.PropagationDelay, + }, "root": cfg.Drivers.OCIS.Root, "user_layout": cfg.Drivers.OCIS.UserLayout, "share_folder": cfg.Drivers.OCIS.ShareFolder, @@ -170,7 +174,11 @@ func Ocis(cfg *config.Config) map[string]interface{} { // OcisNoEvents is the config mapping for the ocis storage driver emitting no events func OcisNoEvents(cfg *config.Config) map[string]interface{} { return map[string]interface{}{ - "metadata_backend": cfg.Drivers.OCIS.MetadataBackend, + "metadata_backend": cfg.Drivers.OCIS.MetadataBackend, + "propagator": cfg.Drivers.OCIS.Propagator, + "async_propagator_options": map[string]interface{}{ + "propagation_delay": cfg.Drivers.OCIS.AsyncPropagatorOptions.PropagationDelay, + }, "root": cfg.Drivers.OCIS.Root, "user_layout": cfg.Drivers.OCIS.UserLayout, "share_folder": cfg.Drivers.OCIS.ShareFolder, @@ -224,7 +232,11 @@ func S3(cfg *config.Config) map[string]interface{} { // S3NG is the config mapping for the s3ng storage driver func S3NG(cfg *config.Config) map[string]interface{} { return map[string]interface{}{ - "metadata_backend": cfg.Drivers.S3NG.MetadataBackend, + "metadata_backend": cfg.Drivers.S3NG.MetadataBackend, + "propagator": cfg.Drivers.S3NG.Propagator, + "async_propagator_options": map[string]interface{}{ + "propagation_delay": cfg.Drivers.S3NG.AsyncPropagatorOptions.PropagationDelay, + }, "root": cfg.Drivers.S3NG.Root, "user_layout": cfg.Drivers.S3NG.UserLayout, "share_folder": cfg.Drivers.S3NG.ShareFolder, @@ -283,7 +295,11 @@ func S3NG(cfg *config.Config) map[string]interface{} { // S3NGNoEvents is the config mapping for the s3ng storage driver emitting no events func S3NGNoEvents(cfg *config.Config) map[string]interface{} { return map[string]interface{}{ - "metadata_backend": cfg.Drivers.S3NG.MetadataBackend, + "metadata_backend": cfg.Drivers.S3NG.MetadataBackend, + "propagator": cfg.Drivers.S3NG.Propagator, + "async_propagator_options": map[string]interface{}{ + "propagation_delay": cfg.Drivers.S3NG.AsyncPropagatorOptions.PropagationDelay, + }, "root": cfg.Drivers.S3NG.Root, "user_layout": cfg.Drivers.S3NG.UserLayout, "share_folder": cfg.Drivers.S3NG.ShareFolder, diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup/lookup.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup/lookup.go index 273e094f9..a3377083e 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup/lookup.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup/lookup.go @@ -44,6 +44,20 @@ func init() { tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/lookup") } +// PathLookup defines the interface for the lookup component +type PathLookup interface { + NodeFromResource(ctx context.Context, ref *provider.Reference) (*node.Node, error) + NodeFromID(ctx context.Context, id *provider.ResourceId) (n *node.Node, err error) + + InternalRoot() string + InternalPath(spaceID, nodeID string) string + Path(ctx context.Context, n *node.Node, hasPermission node.PermissionFunc) (path string, err error) + MetadataBackend() metadata.Backend + ReadBlobSizeAttr(ctx context.Context, path string) (int64, error) + ReadBlobIDAttr(ctx context.Context, path string) (string, error) + TypeFromPath(ctx context.Context, path string) provider.ResourceType +} + // Lookup implements transformations from filepath to node and back type Lookup struct { Options *options.Options diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options/options.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options/options.go index f60c912a8..6e081fdce 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options/options.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options/options.go @@ -21,6 +21,7 @@ package options import ( "path/filepath" "strings" + "time" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/sharedconf" @@ -41,6 +42,11 @@ type Options struct { // the metadata backend to use, currently supports `xattr` or `ini` MetadataBackend string `mapstructure:"metadata_backend"` + // the propagator to use for this fs. currently only `sync` is fully supported, `async` is available as an experimental feature + Propagator string `mapstructure:"propagator"` + // Options specific to the async propagator + AsyncPropagatorOptions AsyncPropagatorOptions `mapstructure:"async_propagator_options"` + // ocis fs works on top of a dir of uuid nodes Root string `mapstructure:"root"` @@ -78,6 +84,11 @@ type Options struct { MaxQuota uint64 `mapstructure:"max_quota"` } +// AsyncPropagatorOptions holds the configuration for the async propagator +type AsyncPropagatorOptions struct { + PropagationDelay time.Duration `mapstructure:"propagation_delay"` +} + // EventOptions are the configurable options for events type EventOptions struct { NatsAddress string `mapstructure:"natsaddress"` @@ -145,5 +156,12 @@ func New(m map[string]interface{}) (*Options, error) { o.MaxConcurrency = 100 } + if o.Propagator == "" { + o.Propagator = "sync" + } + if o.AsyncPropagatorOptions.PropagationDelay == 0 { + o.AsyncPropagatorOptions.PropagationDelay = 5 * time.Second + } + return o, nil } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/async.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/async.go new file mode 100644 index 000000000..3bd6864d9 --- /dev/null +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/async.go @@ -0,0 +1,440 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package propagator + +import ( + "context" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/cs3org/reva/v2/pkg/appctx" + "github.com/cs3org/reva/v2/pkg/logger" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" + "github.com/google/renameio/v2" + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/rogpeppe/go-internal/lockedfile" + "github.com/rs/zerolog" + "github.com/shamaton/msgpack/v2" +) + +var _propagationGracePeriod = 3 * time.Minute + +// AsyncPropagator implements asynchronous treetime & treesize propagation +type AsyncPropagator struct { + treeSizeAccounting bool + treeTimeAccounting bool + propagationDelay time.Duration + lookup lookup.PathLookup +} + +// Change represents a change to the tree +type Change struct { + SyncTime time.Time + SizeDiff int64 +} + +// NewAsyncPropagator returns a new AsyncPropagator instance +func NewAsyncPropagator(treeSizeAccounting, treeTimeAccounting bool, o options.AsyncPropagatorOptions, lookup lookup.PathLookup) AsyncPropagator { + p := AsyncPropagator{ + treeSizeAccounting: treeSizeAccounting, + treeTimeAccounting: treeTimeAccounting, + propagationDelay: o.PropagationDelay, + lookup: lookup, + } + + log := logger.New() + + log.Info().Msg("async propagator starting up...") + + // spawn a goroutine that watches for stale .processing dirs and fixes them + go func() { + if !p.treeTimeAccounting && !p.treeSizeAccounting { + // no propagation enabled + log.Debug().Msg("propagation disabled or nothing to propagate") + return + } + + changesDirPath := filepath.Join(p.lookup.InternalRoot(), "changes") + doSleep := false // switch to not sleep on the first iteration + for { + if doSleep { + time.Sleep(5 * time.Minute) + } + doSleep = true + log.Debug().Msg("scanning for stale .processing dirs") + + entries, err := filepath.Glob(changesDirPath + "/**/*") + if err != nil { + log.Error().Err(err).Msg("failed to list changes") + continue + } + + for _, e := range entries { + changesDirPath := e + entry, err := os.Stat(changesDirPath) + if err != nil { + continue + } + // recover all dirs that seem to have been stuck + if !entry.IsDir() || time.Now().Before(entry.ModTime().Add(_propagationGracePeriod)) { + continue + } + + go func() { + if !strings.HasSuffix(changesDirPath, ".processing") { + // first rename the existing node dir + err = os.Rename(changesDirPath, changesDirPath+".processing") + if err != nil { + return + } + changesDirPath += ".processing" + } + + log.Debug().Str("dir", changesDirPath).Msg("propagating stale .processing dir") + parts := strings.SplitN(entry.Name(), ":", 2) + if len(parts) != 2 { + log.Error().Str("file", entry.Name()).Msg("encountered invalid .processing dir") + return + } + + now := time.Now() + _ = os.Chtimes(changesDirPath, now, now) + p.propagate(context.Background(), parts[0], strings.TrimSuffix(parts[1], ".processing"), true, *log) + }() + } + } + }() + + return p +} + +// Propagate triggers a propagation +func (p AsyncPropagator) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) error { + ctx, span := tracer.Start(ctx, "Propagate") + defer span.End() + log := appctx.GetLogger(ctx).With(). + Str("method", "async.Propagate"). + Str("spaceid", n.SpaceID). + Str("nodeid", n.ID). + Str("parentid", n.ParentID). + Int64("sizeDiff", sizeDiff). + Logger() + + if !p.treeTimeAccounting && (!p.treeSizeAccounting || sizeDiff == 0) { + // no propagation enabled + log.Debug().Msg("propagation disabled or nothing to propagate") + return nil + } + + // add a change to the parent node + c := Change{ + // use a sync time and don't rely on the mtime of the current node, as the stat might not change when a rename happened too quickly + SyncTime: time.Now().UTC(), + SizeDiff: sizeDiff, + } + go p.queuePropagation(ctx, n.SpaceID, n.ParentID, c, log) + + return nil +} + +func (p AsyncPropagator) queuePropagation(ctx context.Context, spaceID, nodeID string, change Change, log zerolog.Logger) { + // add a change to the parent node + changePath := p.changesPath(spaceID, nodeID, uuid.New().String()+".mpk") + + data, err := msgpack.Marshal(change) + if err != nil { + log.Error().Err(err).Msg("failed to marshal Change") + return + } + + _, subspan := tracer.Start(ctx, "write changes file") + ready := false + triggerPropagation := false + _ = os.MkdirAll(filepath.Dir(filepath.Dir(changePath)), 0700) + err = os.Mkdir(filepath.Dir(changePath), 0700) + triggerPropagation = err == nil || os.IsExist(err) // only the first goroutine, which succeeds to create the directory, is supposed to actually trigger the propagation + for retries := 0; retries <= 500; retries++ { + err := renameio.WriteFile(changePath, data, 0644) + if err == nil { + ready = true + break + } + log.Error().Err(err).Msg("failed to write Change to disk (retrying)") + err = os.Mkdir(filepath.Dir(changePath), 0700) + triggerPropagation = err == nil || os.IsExist(err) // only the first goroutine, which succeeds to create the directory, is supposed to actually trigger the propagation + } + + if !ready { + log.Error().Err(err).Msg("failed to write Change to disk") + return + } + subspan.End() + + if !triggerPropagation { + return + } + + _, subspan = tracer.Start(ctx, "delay propagation") + time.Sleep(p.propagationDelay) // wait a moment before propagating + subspan.End() + + log.Debug().Msg("propagating") + // add a change to the parent node + changeDirPath := p.changesPath(spaceID, nodeID, "") + + // first rename the existing node dir + err = os.Rename(changeDirPath, changeDirPath+".processing") + if err != nil { + // This can fail in 2 ways + // 1. source does not exist anymore as it has already been propagated by another goroutine + // -> ignore, as the change is already being processed + // 2. target already exists because a previous propagation is still running + // -> ignore, the previous propagation will pick the new changes up + return + } + p.propagate(ctx, spaceID, nodeID, false, log) +} + +func (p AsyncPropagator) propagate(ctx context.Context, spaceID, nodeID string, recalculateTreeSize bool, log zerolog.Logger) { + changeDirPath := p.changesPath(spaceID, nodeID, "") + processingPath := changeDirPath + ".processing" + + cleanup := func() { + err := os.RemoveAll(processingPath) + if err != nil { + log.Error().Err(err).Msg("Could not remove .processing dir") + } + } + + _, subspan := tracer.Start(ctx, "list changes files") + d, err := os.Open(processingPath) + if err != nil { + log.Error().Err(err).Msg("Could not open change .processing dir") + cleanup() + return + } + defer d.Close() + names, err := d.Readdirnames(0) + if err != nil { + log.Error().Err(err).Msg("Could not read dirnames") + cleanup() + return + } + subspan.End() + + _, subspan = tracer.Start(ctx, "read changes files") + pc := Change{} + for _, name := range names { + if !strings.HasSuffix(name, ".mpk") { + continue + } + + b, err := os.ReadFile(filepath.Join(processingPath, name)) + if err != nil { + log.Error().Err(err).Msg("Could not read change") + cleanup() + return + } + c := Change{} + err = msgpack.Unmarshal(b, &c) + if err != nil { + log.Error().Err(err).Msg("Could not unmarshal change") + cleanup() + return + } + if c.SyncTime.After(pc.SyncTime) { + pc.SyncTime = c.SyncTime + } + pc.SizeDiff += c.SizeDiff + } + subspan.End() + + // TODO do we need to write an aggregated parentchange file? + + attrs := node.Attributes{} + + var f *lockedfile.File + // lock parent before reading treesize or tree time + nodePath := filepath.Join(p.lookup.InternalRoot(), "spaces", lookup.Pathify(spaceID, 1, 2), "nodes", lookup.Pathify(nodeID, 4, 2)) + + _, subspan = tracer.Start(ctx, "lockedfile.OpenFile") + lockFilepath := p.lookup.MetadataBackend().LockfilePath(nodePath) + f, err = lockedfile.OpenFile(lockFilepath, os.O_RDWR|os.O_CREATE, 0600) + subspan.End() + if err != nil { + log.Error().Err(err). + Str("lock filepath", lockFilepath). + Msg("Propagation failed. Could not open metadata for node with lock.") + cleanup() + return + } + // always log error if closing node fails + defer func() { + // ignore already closed error + cerr := f.Close() + if err == nil && cerr != nil && !errors.Is(cerr, os.ErrClosed) { + err = cerr // only overwrite err with en error from close if the former was nil + } + }() + + _, subspan = tracer.Start(ctx, "node.ReadNode") + var n *node.Node + if n, err = node.ReadNode(ctx, p.lookup, spaceID, nodeID, false, nil, false); err != nil { + log.Error().Err(err). + Msg("Propagation failed. Could not read node.") + cleanup() + return + } + subspan.End() + + if !n.Exists { + log.Debug().Str("attr", prefixes.PropagationAttr).Msg("node does not exist anymore, not propagating") + cleanup() + return + } + + if !n.HasPropagation(ctx) { + log.Debug().Str("attr", prefixes.PropagationAttr).Msg("propagation attribute not set or unreadable, not propagating") + cleanup() + return + } + + if p.treeTimeAccounting { + // update the parent tree time if it is older than the nodes mtime + updateSyncTime := false + + var tmTime time.Time + tmTime, err = n.GetTMTime(ctx) + switch { + case err != nil: + // missing attribute, or invalid format, overwrite + log.Debug().Err(err). + Msg("could not read tmtime attribute, overwriting") + updateSyncTime = true + case tmTime.Before(pc.SyncTime): + log.Debug(). + Time("tmtime", tmTime). + Time("stime", pc.SyncTime). + Msg("parent tmtime is older than node mtime, updating") + updateSyncTime = true + default: + log.Debug(). + Time("tmtime", tmTime). + Time("stime", pc.SyncTime). + Dur("delta", pc.SyncTime.Sub(tmTime)). + Msg("node tmtime is younger than stime, not updating") + } + + if updateSyncTime { + // update the tree time of the parent node + attrs.SetString(prefixes.TreeMTimeAttr, pc.SyncTime.UTC().Format(time.RFC3339Nano)) + } + + attrs.SetString(prefixes.TmpEtagAttr, "") + } + + // size accounting + if p.treeSizeAccounting && pc.SizeDiff != 0 { + var newSize uint64 + + // read treesize + treeSize, err := n.GetTreeSize(ctx) + switch { + case recalculateTreeSize || metadata.IsAttrUnset(err): + // fallback to calculating the treesize + log.Warn().Msg("treesize attribute unset, falling back to calculating the treesize") + newSize, err = calculateTreeSize(ctx, p.lookup, n.InternalPath()) + if err != nil { + log.Error().Err(err). + Msg("Error when calculating treesize of node.") // FIXME wat? + cleanup() + return + } + case err != nil: + log.Error().Err(err). + Msg("Failed to propagate treesize change. Error when reading the treesize attribute from node") + cleanup() + return + case pc.SizeDiff > 0: + newSize = treeSize + uint64(pc.SizeDiff) + case uint64(-pc.SizeDiff) > treeSize: + // The sizeDiff is larger than the current treesize. Which would result in + // a negative new treesize. Something must have gone wrong with the accounting. + // Reset the current treesize to 0. + log.Error().Uint64("treeSize", treeSize).Int64("sizeDiff", pc.SizeDiff). + Msg("Error when updating treesize of node. Updated treesize < 0. Reestting to 0") + newSize = 0 + default: + newSize = treeSize - uint64(-pc.SizeDiff) + } + + // update the tree size of the node + attrs.SetString(prefixes.TreesizeAttr, strconv.FormatUint(newSize, 10)) + log.Debug().Uint64("newSize", newSize).Msg("updated treesize of node") + } + + if err = n.SetXattrsWithContext(ctx, attrs, false); err != nil { + log.Error().Err(err).Msg("Failed to update extend attributes of node") + cleanup() + return + } + + // Release node lock early, ignore already closed error + _, subspan = tracer.Start(ctx, "f.Close") + cerr := f.Close() + subspan.End() + if cerr != nil && !errors.Is(cerr, os.ErrClosed) { + log.Error().Err(cerr).Msg("Failed to close node and release lock") + } + + log.Info().Msg("Propagation done. cleaning up") + cleanup() + + if !n.IsSpaceRoot(ctx) { // This does not seem robust as it checks the space name property + p.queuePropagation(ctx, n.SpaceID, n.ParentID, pc, log) + } + + // Check for a changes dir that might have been added meanwhile and pick it up + if _, err = os.Open(changeDirPath); err == nil { + log.Info().Msg("Found a new changes dir. starting next propagation") + time.Sleep(p.propagationDelay) // wait a moment before propagating + err = os.Rename(changeDirPath, processingPath) + if err != nil { + // This can fail in 2 ways + // 1. source does not exist anymore as it has already been propagated by another goroutine + // -> ignore, as the change is already being processed + // 2. target already exists because a previous propagation is still running + // -> ignore, the previous propagation will pick the new changes up + return + } + p.propagate(ctx, spaceID, nodeID, false, log) + } +} + +func (p AsyncPropagator) changesPath(spaceID, nodeID, filename string) string { + return filepath.Join(p.lookup.InternalRoot(), "changes", spaceID[0:2], spaceID+":"+nodeID, filename) +} diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/propagator.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/propagator.go new file mode 100644 index 000000000..6f12e43c6 --- /dev/null +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/propagator.go @@ -0,0 +1,101 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package propagator + +import ( + "context" + "os" + "path/filepath" + "strconv" + + sprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/appctx" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" + "github.com/pkg/errors" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" +) + +var tracer trace.Tracer + +func init() { + tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/tree/propagator") +} + +type Propagator interface { + Propagate(ctx context.Context, node *node.Node, sizediff int64) error +} + +func New(lookup lookup.PathLookup, o *options.Options) Propagator { + switch o.Propagator { + case "async": + return NewAsyncPropagator(o.TreeSizeAccounting, o.TreeTimeAccounting, o.AsyncPropagatorOptions, lookup) + default: + return NewSyncPropagator(o.TreeSizeAccounting, o.TreeTimeAccounting, lookup) + } +} + +func calculateTreeSize(ctx context.Context, lookup lookup.PathLookup, childrenPath string) (uint64, error) { + ctx, span := tracer.Start(ctx, "calculateTreeSize") + defer span.End() + var size uint64 + + f, err := os.Open(childrenPath) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Str("childrenPath", childrenPath).Msg("could not open dir") + return 0, err + } + defer f.Close() + + names, err := f.Readdirnames(0) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Str("childrenPath", childrenPath).Msg("could not read dirnames") + return 0, err + } + for i := range names { + cPath := filepath.Join(childrenPath, names[i]) + resolvedPath, err := filepath.EvalSymlinks(cPath) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Str("childpath", cPath).Msg("could not resolve child entry symlink") + continue // continue after an error + } + + // raw read of the attributes for performance reasons + attribs, err := lookup.MetadataBackend().All(ctx, resolvedPath) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Str("childpath", cPath).Msg("could not read attributes of child entry") + continue // continue after an error + } + sizeAttr := "" + if string(attribs[prefixes.TypeAttr]) == strconv.FormatUint(uint64(sprovider.ResourceType_RESOURCE_TYPE_FILE), 10) { + sizeAttr = string(attribs[prefixes.BlobsizeAttr]) + } else { + sizeAttr = string(attribs[prefixes.TreesizeAttr]) + } + csize, err := strconv.ParseInt(sizeAttr, 10, 64) + if err != nil { + return 0, errors.Wrapf(err, "invalid blobsize xattr format") + } + size += uint64(csize) + } + return size, err +} diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/sync.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/sync.go new file mode 100644 index 000000000..4e03aada6 --- /dev/null +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/sync.go @@ -0,0 +1,208 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package propagator + +import ( + "context" + "errors" + "os" + "strconv" + "time" + + "github.com/cs3org/reva/v2/pkg/appctx" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" + "github.com/rogpeppe/go-internal/lockedfile" +) + +// SyncPropagator implements synchronous treetime & treesize propagation +type SyncPropagator struct { + treeSizeAccounting bool + treeTimeAccounting bool + lookup lookup.PathLookup +} + +// NewSyncPropagator returns a new AsyncPropagator instance +func NewSyncPropagator(treeSizeAccounting, treeTimeAccounting bool, lookup lookup.PathLookup) SyncPropagator { + return SyncPropagator{ + treeSizeAccounting: treeSizeAccounting, + treeTimeAccounting: treeTimeAccounting, + lookup: lookup, + } +} + +// Propagate triggers a propagation +func (p SyncPropagator) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) error { + ctx, span := tracer.Start(ctx, "Propagate") + defer span.End() + sublog := appctx.GetLogger(ctx).With(). + Str("method", "sync.Propagate"). + Str("spaceid", n.SpaceID). + Str("nodeid", n.ID). + Int64("sizeDiff", sizeDiff). + Logger() + + if !p.treeTimeAccounting && (!p.treeSizeAccounting || sizeDiff == 0) { + // no propagation enabled + sublog.Debug().Msg("propagation disabled or nothing to propagate") + return nil + } + + // is propagation enabled for the parent node? + root := n.SpaceRoot + + // use a sync time and don't rely on the mtime of the current node, as the stat might not change when a rename happened too quickly + sTime := time.Now().UTC() + + // we loop until we reach the root + var err error + for err == nil && n.ID != root.ID { + sublog.Debug().Msg("propagating") + + attrs := node.Attributes{} + + var f *lockedfile.File + // lock parent before reading treesize or tree time + + _, subspan := tracer.Start(ctx, "lockedfile.OpenFile") + parentFilename := p.lookup.MetadataBackend().LockfilePath(n.ParentPath()) + f, err = lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600) + subspan.End() + if err != nil { + sublog.Error().Err(err). + Str("parent filename", parentFilename). + Msg("Propagation failed. Could not open metadata for parent with lock.") + return err + } + // always log error if closing node fails + defer func() { + // ignore already closed error + cerr := f.Close() + if err == nil && cerr != nil && !errors.Is(cerr, os.ErrClosed) { + err = cerr // only overwrite err with en error from close if the former was nil + } + }() + + if n, err = n.Parent(ctx); err != nil { + sublog.Error().Err(err). + Msg("Propagation failed. Could not read parent node.") + return err + } + + if !n.HasPropagation(ctx) { + sublog.Debug().Str("attr", prefixes.PropagationAttr).Msg("propagation attribute not set or unreadable, not propagating") + // if the attribute is not set treat it as false / none / no propagation + return nil + } + + sublog = sublog.With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() + + if p.treeTimeAccounting { + // update the parent tree time if it is older than the nodes mtime + updateSyncTime := false + + var tmTime time.Time + tmTime, err = n.GetTMTime(ctx) + switch { + case err != nil: + // missing attribute, or invalid format, overwrite + sublog.Debug().Err(err). + Msg("could not read tmtime attribute, overwriting") + updateSyncTime = true + case tmTime.Before(sTime): + sublog.Debug(). + Time("tmtime", tmTime). + Time("stime", sTime). + Msg("parent tmtime is older than node mtime, updating") + updateSyncTime = true + default: + sublog.Debug(). + Time("tmtime", tmTime). + Time("stime", sTime). + Dur("delta", sTime.Sub(tmTime)). + Msg("parent tmtime is younger than node mtime, not updating") + } + + if updateSyncTime { + // update the tree time of the parent node + attrs.SetString(prefixes.TreeMTimeAttr, sTime.UTC().Format(time.RFC3339Nano)) + } + + attrs.SetString(prefixes.TmpEtagAttr, "") + } + + // size accounting + if p.treeSizeAccounting && sizeDiff != 0 { + var newSize uint64 + + // read treesize + treeSize, err := n.GetTreeSize(ctx) + switch { + case metadata.IsAttrUnset(err): + // fallback to calculating the treesize + sublog.Warn().Msg("treesize attribute unset, falling back to calculating the treesize") + newSize, err = calculateTreeSize(ctx, p.lookup, n.InternalPath()) + if err != nil { + return err + } + case err != nil: + sublog.Error().Err(err). + Msg("Faild to propagate treesize change. Error when reading the treesize attribute from parent") + return err + case sizeDiff > 0: + newSize = treeSize + uint64(sizeDiff) + case uint64(-sizeDiff) > treeSize: + // The sizeDiff is larger than the current treesize. Which would result in + // a negative new treesize. Something must have gone wrong with the accounting. + // Reset the current treesize to 0. + sublog.Error().Uint64("treeSize", treeSize).Int64("sizeDiff", sizeDiff). + Msg("Error when updating treesize of parent node. Updated treesize < 0. Reestting to 0") + newSize = 0 + default: + newSize = treeSize - uint64(-sizeDiff) + } + + // update the tree size of the node + attrs.SetString(prefixes.TreesizeAttr, strconv.FormatUint(newSize, 10)) + sublog.Debug().Uint64("newSize", newSize).Msg("updated treesize of parent node") + } + + if err = n.SetXattrsWithContext(ctx, attrs, false); err != nil { + sublog.Error().Err(err).Msg("Failed to update extend attributes of parent node") + return err + } + + // Release node lock early, ignore already closed error + _, subspan = tracer.Start(ctx, "f.Close") + cerr := f.Close() + subspan.End() + if cerr != nil && !errors.Is(cerr, os.ErrClosed) { + sublog.Error().Err(cerr).Msg("Failed to close parent node and release lock") + return cerr + } + } + if err != nil { + sublog.Error().Err(err).Msg("error propagating") + return err + } + return nil + +} diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/tree.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/tree.go index 67beadc83..4ab4bda4d 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/tree.go @@ -28,7 +28,6 @@ import ( "os" "path/filepath" "regexp" - "strconv" "strings" "time" @@ -36,14 +35,13 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator" "github.com/cs3org/reva/v2/pkg/utils" "github.com/google/uuid" "github.com/pkg/errors" - "github.com/rogpeppe/go-internal/lockedfile" "github.com/rs/zerolog/log" "go-micro.dev/v4/store" "go.opentelemetry.io/otel" @@ -66,24 +64,11 @@ type Blobstore interface { Delete(node *node.Node) error } -// PathLookup defines the interface for the lookup component -type PathLookup interface { - NodeFromResource(ctx context.Context, ref *provider.Reference) (*node.Node, error) - NodeFromID(ctx context.Context, id *provider.ResourceId) (n *node.Node, err error) - - InternalRoot() string - InternalPath(spaceID, nodeID string) string - Path(ctx context.Context, n *node.Node, hasPermission node.PermissionFunc) (path string, err error) - MetadataBackend() metadata.Backend - ReadBlobSizeAttr(ctx context.Context, path string) (int64, error) - ReadBlobIDAttr(ctx context.Context, path string) (string, error) - TypeFromPath(ctx context.Context, path string) provider.ResourceType -} - // Tree manages a hierarchical tree type Tree struct { - lookup PathLookup - blobstore Blobstore + lookup lookup.PathLookup + blobstore Blobstore + propagator propagator.Propagator options *options.Options @@ -94,12 +79,13 @@ type Tree struct { type PermissionCheckFunc func(rp *provider.ResourcePermissions) bool // New returns a new instance of Tree -func New(lu PathLookup, bs Blobstore, o *options.Options, cache store.Store) *Tree { +func New(lu lookup.PathLookup, bs Blobstore, o *options.Options, cache store.Store) *Tree { return &Tree{ - lookup: lu, - blobstore: bs, - options: o, - idCache: cache, + lookup: lu, + blobstore: bs, + options: o, + idCache: cache, + propagator: propagator.New(lu, o), } } @@ -718,205 +704,7 @@ func (t *Tree) removeNode(ctx context.Context, path string, n *node.Node) error // Propagate propagates changes to the root of the tree func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err error) { - ctx, span := tracer.Start(ctx, "Propagate") - defer span.End() - sublog := appctx.GetLogger(ctx).With(). - Str("method", "tree.Propagate"). - Str("spaceid", n.SpaceID). - Str("nodeid", n.ID). - Int64("sizeDiff", sizeDiff). - Logger() - - if !t.options.TreeTimeAccounting && (!t.options.TreeSizeAccounting || sizeDiff == 0) { - // no propagation enabled - sublog.Debug().Msg("propagation disabled or nothing to propagate") - return - } - - // is propagation enabled for the parent node? - root := n.SpaceRoot - - // use a sync time and don't rely on the mtime of the current node, as the stat might not change when a rename happened too quickly - sTime := time.Now().UTC() - - // we loop until we reach the root - for err == nil && n.ID != root.ID { - sublog.Debug().Msg("propagating") - - attrs := node.Attributes{} - - var f *lockedfile.File - // lock parent before reading treesize or tree time - - _, subspan := tracer.Start(ctx, "lockedfile.OpenFile") - parentFilename := t.lookup.MetadataBackend().LockfilePath(n.ParentPath()) - f, err = lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600) - subspan.End() - if err != nil { - sublog.Error().Err(err). - Str("parent filename", parentFilename). - Msg("Propagation failed. Could not open metadata for parent with lock.") - return err - } - // always log error if closing node fails - defer func() { - // ignore already closed error - cerr := f.Close() - if err == nil && cerr != nil && !errors.Is(cerr, os.ErrClosed) { - err = cerr // only overwrite err with en error from close if the former was nil - } - }() - - if n, err = n.Parent(ctx); err != nil { - sublog.Error().Err(err). - Msg("Propagation failed. Could not read parent node.") - return err - } - - // TODO none, sync and async? - if !n.HasPropagation(ctx) { - sublog.Debug().Str("attr", prefixes.PropagationAttr).Msg("propagation attribute not set or unreadable, not propagating") - // if the attribute is not set treat it as false / none / no propagation - return nil - } - - sublog = sublog.With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() - - if t.options.TreeTimeAccounting { - // update the parent tree time if it is older than the nodes mtime - updateSyncTime := false - - var tmTime time.Time - tmTime, err = n.GetTMTime(ctx) - switch { - case err != nil: - // missing attribute, or invalid format, overwrite - sublog.Debug().Err(err). - Msg("could not read tmtime attribute, overwriting") - updateSyncTime = true - case tmTime.Before(sTime): - sublog.Debug(). - Time("tmtime", tmTime). - Time("stime", sTime). - Msg("parent tmtime is older than node mtime, updating") - updateSyncTime = true - default: - sublog.Debug(). - Time("tmtime", tmTime). - Time("stime", sTime). - Dur("delta", sTime.Sub(tmTime)). - Msg("parent tmtime is younger than node mtime, not updating") - } - - if updateSyncTime { - // update the tree time of the parent node - attrs.SetString(prefixes.TreeMTimeAttr, sTime.UTC().Format(time.RFC3339Nano)) - } - - attrs.SetString(prefixes.TmpEtagAttr, "") - } - - // size accounting - if t.options.TreeSizeAccounting && sizeDiff != 0 { - var newSize uint64 - - // read treesize - treeSize, err := n.GetTreeSize(ctx) - switch { - case metadata.IsAttrUnset(err): - // fallback to calculating the treesize - sublog.Warn().Msg("treesize attribute unset, falling back to calculating the treesize") - newSize, err = t.calculateTreeSize(ctx, n.InternalPath()) - if err != nil { - return err - } - case err != nil: - sublog.Error().Err(err). - Msg("Faild to propagate treesize change. Error when reading the treesize attribute from parent") - return err - case sizeDiff > 0: - newSize = treeSize + uint64(sizeDiff) - case uint64(-sizeDiff) > treeSize: - // The sizeDiff is larger than the current treesize. Which would result in - // a negative new treesize. Something must have gone wrong with the accounting. - // Reset the current treesize to 0. - sublog.Error().Uint64("treeSize", treeSize).Int64("sizeDiff", sizeDiff). - Msg("Error when updating treesize of parent node. Updated treesize < 0. Reestting to 0") - newSize = 0 - default: - newSize = treeSize - uint64(-sizeDiff) - } - - // update the tree size of the node - attrs.SetString(prefixes.TreesizeAttr, strconv.FormatUint(newSize, 10)) - sublog.Debug().Uint64("newSize", newSize).Msg("updated treesize of parent node") - } - - if err = n.SetXattrsWithContext(ctx, attrs, false); err != nil { - sublog.Error().Err(err).Msg("Failed to update extend attributes of parent node") - return err - } - - // Release node lock early, ignore already closed error - _, subspan = tracer.Start(ctx, "f.Close") - cerr := f.Close() - subspan.End() - if cerr != nil && !errors.Is(cerr, os.ErrClosed) { - sublog.Error().Err(cerr).Msg("Failed to close parent node and release lock") - return cerr - } - } - if err != nil { - sublog.Error().Err(err).Msg("error propagating") - return - } - return -} - -func (t *Tree) calculateTreeSize(ctx context.Context, childrenPath string) (uint64, error) { - ctx, span := tracer.Start(ctx, "calculateTreeSize") - defer span.End() - var size uint64 - - f, err := os.Open(childrenPath) - if err != nil { - appctx.GetLogger(ctx).Error().Err(err).Str("childrenPath", childrenPath).Msg("could not open dir") - return 0, err - } - defer f.Close() - - names, err := f.Readdirnames(0) - if err != nil { - appctx.GetLogger(ctx).Error().Err(err).Str("childrenPath", childrenPath).Msg("could not read dirnames") - return 0, err - } - for i := range names { - cPath := filepath.Join(childrenPath, names[i]) - resolvedPath, err := filepath.EvalSymlinks(cPath) - if err != nil { - appctx.GetLogger(ctx).Error().Err(err).Str("childpath", cPath).Msg("could not resolve child entry symlink") - continue // continue after an error - } - - // raw read of the attributes for performance reasons - attribs, err := t.lookup.MetadataBackend().All(ctx, resolvedPath) - if err != nil { - appctx.GetLogger(ctx).Error().Err(err).Str("childpath", cPath).Msg("could not read attributes of child entry") - continue // continue after an error - } - sizeAttr := "" - if string(attribs[prefixes.TypeAttr]) == strconv.FormatUint(uint64(provider.ResourceType_RESOURCE_TYPE_FILE), 10) { - sizeAttr = string(attribs[prefixes.BlobsizeAttr]) - } else { - sizeAttr = string(attribs[prefixes.TreesizeAttr]) - } - csize, err := strconv.ParseInt(sizeAttr, 10, 64) - if err != nil { - return 0, errors.Wrapf(err, "invalid blobsize xattr format") - } - size += uint64(csize) - } - return size, err + return t.propagator.Propagate(ctx, n, sizeDiff) } // WriteBlob writes a blob to the blobstore diff --git a/vendor/modules.txt b/vendor/modules.txt index 0fba14b4c..1ced96053 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -352,7 +352,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1 github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1 github.com/cs3org/go-cs3apis/cs3/tx/v1beta1 github.com/cs3org/go-cs3apis/cs3/types/v1beta1 -# github.com/cs3org/reva/v2 v2.15.1-0.20230731062052-2c2e370980e0 +# github.com/cs3org/reva/v2 v2.15.1-0.20230731073021-26b2f3432d11 ## explicit; go 1.20 github.com/cs3org/reva/v2/cmd/revad/internal/grace github.com/cs3org/reva/v2/cmd/revad/runtime @@ -662,6 +662,7 @@ github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaceidindex github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree +github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload github.com/cs3org/reva/v2/pkg/storage/utils/downloader github.com/cs3org/reva/v2/pkg/storage/utils/eosfs