mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-28 16:01:18 -05:00
Merge pull request #6824 from aduffeck/propagators
Make the storage-users propagator configurable
This commit is contained in:
2
go.mod
2
go.mod
@@ -13,7 +13,7 @@ require (
|
||||
github.com/coreos/go-oidc v2.2.1+incompatible
|
||||
github.com/coreos/go-oidc/v3 v3.6.0
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20230516150832-730ac860c71d
|
||||
github.com/cs3org/reva/v2 v2.15.1-0.20230731062052-2c2e370980e0
|
||||
github.com/cs3org/reva/v2 v2.15.1-0.20230731073021-26b2f3432d11
|
||||
github.com/disintegration/imaging v1.6.2
|
||||
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e
|
||||
github.com/egirna/icap-client v0.1.1
|
||||
|
||||
4
go.sum
4
go.sum
@@ -625,8 +625,8 @@ github.com/crewjam/httperr v0.2.0 h1:b2BfXR8U3AlIHwNeFFvZ+BV1LFvKLlzMjzaTnZMybNo
|
||||
github.com/crewjam/httperr v0.2.0/go.mod h1:Jlz+Sg/XqBQhyMjdDiC+GNNRzZTD7x39Gu3pglZ5oH4=
|
||||
github.com/crewjam/saml v0.4.13 h1:TYHggH/hwP7eArqiXSJUvtOPNzQDyQ7vwmwEqlFWhMc=
|
||||
github.com/crewjam/saml v0.4.13/go.mod h1:igEejV+fihTIlHXYP8zOec3V5A8y3lws5bQBFsTm4gA=
|
||||
github.com/cs3org/reva/v2 v2.15.1-0.20230731062052-2c2e370980e0 h1:Vmghy5B5q/C22JR+fPtsKMra2ug2p3s0CeVmNnQIu4o=
|
||||
github.com/cs3org/reva/v2 v2.15.1-0.20230731062052-2c2e370980e0/go.mod h1:4z5EQghS2LhSWZWocH51Dw9VAs16No1zSFvFgQtgS7w=
|
||||
github.com/cs3org/reva/v2 v2.15.1-0.20230731073021-26b2f3432d11 h1:44FU3t+cJFBEiheA1Vj8JxLVXt8eWY1J0h4lNsaOgxo=
|
||||
github.com/cs3org/reva/v2 v2.15.1-0.20230731073021-26b2f3432d11/go.mod h1:4z5EQghS2LhSWZWocH51Dw9VAs16No1zSFvFgQtgS7w=
|
||||
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI=
|
||||
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY=
|
||||
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
|
||||
|
||||
@@ -91,9 +91,16 @@ type Drivers struct {
|
||||
Local LocalDriver `yaml:",omitempty"` // not supported by the oCIS product, therefore not part of docs
|
||||
}
|
||||
|
||||
// AsyncPropagatorOptions configures the async propagator
|
||||
type AsyncPropagatorOptions struct {
|
||||
PropagationDelay time.Duration `yaml:"propagation_delay" env:"STORAGE_USERS_ASYNC_PROPAGATOR_PROPAGATION_DELAY" desc:"The delay in seconds between a change made to a tree and the propagation start on treesize and treetime. Multiple propagations are computed to a single one."`
|
||||
}
|
||||
|
||||
// OCISDriver is the storage driver configuration when using 'ocis' storage driver
|
||||
type OCISDriver struct {
|
||||
MetadataBackend string `yaml:"metadata_backend" env:"OCIS_DECOMPOSEDFS_METADATA_BACKEND;STORAGE_USERS_OCIS_METADATA_BACKEND" desc:"The backend to use for storing metadata. Supported values are 'messagepack' and 'xattrs'. The setting 'messagepack' uses a dedicated file to store file metadata while 'xattrs' uses extended attributes to store file metadata. Defaults to 'messagepack'."`
|
||||
MetadataBackend string `yaml:"metadata_backend" env:"OCIS_DECOMPOSEDFS_METADATA_BACKEND;STORAGE_USERS_OCIS_METADATA_BACKEND" desc:"The backend to use for storing metadata. Supported values are 'messagepack' and 'xattrs'. The setting 'messagepack' uses a dedicated file to store file metadata while 'xattrs' uses extended attributes to store file metadata. Defaults to 'messagepack'."`
|
||||
Propagator string `yaml:"propagator" env:"OCIS_DECOMPOSEDFS_PROPAGATOR;STORAGE_USERS_OCIS_PROPAGATOR" desc:"The propagator used for decomposedfs. At the moment, only 'sync' is fully supported, 'async' is available as an experimental option."`
|
||||
AsyncPropagatorOptions AsyncPropagatorOptions `yaml:"async_propagator_options"`
|
||||
// Root is the absolute path to the location of the data
|
||||
Root string `yaml:"root" env:"STORAGE_USERS_OCIS_ROOT" desc:"The directory where the filesystem storage will store blobs and metadata. If not defined, the root directory derives from $OCIS_BASE_DATA_PATH:/storage/users."`
|
||||
UserLayout string `yaml:"user_layout" env:"STORAGE_USERS_OCIS_USER_LAYOUT" desc:"Template string for the user storage layout in the user directory."`
|
||||
@@ -115,7 +122,9 @@ type OCISDriver struct {
|
||||
|
||||
// S3NGDriver is the storage driver configuration when using 's3ng' storage driver
|
||||
type S3NGDriver struct {
|
||||
MetadataBackend string `yaml:"metadata_backend" env:"STORAGE_USERS_S3NG_METADATA_BACKEND" desc:"The backend to use for storing metadata. Supported values are 'xattrs' and 'messagepack'. The setting 'xattrs' uses extended attributes to store file metadata while 'messagepack' uses a dedicated file to store file metadata. Defaults to 'xattrs'."`
|
||||
MetadataBackend string `yaml:"metadata_backend" env:"STORAGE_USERS_S3NG_METADATA_BACKEND" desc:"The backend to use for storing metadata. Supported values are 'xattrs' and 'messagepack'. The setting 'xattrs' uses extended attributes to store file metadata while 'messagepack' uses a dedicated file to store file metadata. Defaults to 'xattrs'."`
|
||||
Propagator string `yaml:"propagator" env:"OCIS_DECOMPOSEDFS_PROPAGATOR;STORAGE_USERS_S3NG_PROPAGATOR" desc:"The propagator used for decomposedfs. At the moment, only 'sync' is fully supported, 'async' is available as an experimental option."`
|
||||
AsyncPropagatorOptions AsyncPropagatorOptions `yaml:"async_propagator_options"`
|
||||
// Root is the absolute path to the location of the data
|
||||
Root string `yaml:"root" env:"STORAGE_USERS_S3NG_ROOT" desc:"The directory where the filesystem storage will store metadata for blobs. If not defined, the root directory derives from $OCIS_BASE_DATA_PATH:/storage/users."`
|
||||
UserLayout string `yaml:"user_layout" env:"STORAGE_USERS_S3NG_USER_LAYOUT" desc:"Template string for the user storage layout in the user directory."`
|
||||
|
||||
@@ -63,6 +63,7 @@ func DefaultConfig() *config.Config {
|
||||
},
|
||||
S3NG: config.S3NGDriver{
|
||||
MetadataBackend: "messagepack",
|
||||
Propagator: "sync",
|
||||
Root: filepath.Join(defaults.BaseDataPath(), "storage", "users"),
|
||||
ShareFolder: "/Shares",
|
||||
UserLayout: "{{.Id.OpaqueId}}",
|
||||
@@ -75,6 +76,7 @@ func DefaultConfig() *config.Config {
|
||||
},
|
||||
OCIS: config.OCISDriver{
|
||||
MetadataBackend: "messagepack",
|
||||
Propagator: "sync",
|
||||
Root: filepath.Join(defaults.BaseDataPath(), "storage", "users"),
|
||||
ShareFolder: "/Shares",
|
||||
UserLayout: "{{.Id.OpaqueId}}",
|
||||
|
||||
@@ -115,7 +115,11 @@ func OwnCloudSQL(cfg *config.Config) map[string]interface{} {
|
||||
// Ocis is the config mapping for the Ocis storage driver
|
||||
func Ocis(cfg *config.Config) map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"metadata_backend": cfg.Drivers.OCIS.MetadataBackend,
|
||||
"metadata_backend": cfg.Drivers.OCIS.MetadataBackend,
|
||||
"propagator": cfg.Drivers.OCIS.Propagator,
|
||||
"async_propagator_options": map[string]interface{}{
|
||||
"propagation_delay": cfg.Drivers.OCIS.AsyncPropagatorOptions.PropagationDelay,
|
||||
},
|
||||
"root": cfg.Drivers.OCIS.Root,
|
||||
"user_layout": cfg.Drivers.OCIS.UserLayout,
|
||||
"share_folder": cfg.Drivers.OCIS.ShareFolder,
|
||||
@@ -170,7 +174,11 @@ func Ocis(cfg *config.Config) map[string]interface{} {
|
||||
// OcisNoEvents is the config mapping for the ocis storage driver emitting no events
|
||||
func OcisNoEvents(cfg *config.Config) map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"metadata_backend": cfg.Drivers.OCIS.MetadataBackend,
|
||||
"metadata_backend": cfg.Drivers.OCIS.MetadataBackend,
|
||||
"propagator": cfg.Drivers.OCIS.Propagator,
|
||||
"async_propagator_options": map[string]interface{}{
|
||||
"propagation_delay": cfg.Drivers.OCIS.AsyncPropagatorOptions.PropagationDelay,
|
||||
},
|
||||
"root": cfg.Drivers.OCIS.Root,
|
||||
"user_layout": cfg.Drivers.OCIS.UserLayout,
|
||||
"share_folder": cfg.Drivers.OCIS.ShareFolder,
|
||||
@@ -224,7 +232,11 @@ func S3(cfg *config.Config) map[string]interface{} {
|
||||
// S3NG is the config mapping for the s3ng storage driver
|
||||
func S3NG(cfg *config.Config) map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"metadata_backend": cfg.Drivers.S3NG.MetadataBackend,
|
||||
"metadata_backend": cfg.Drivers.S3NG.MetadataBackend,
|
||||
"propagator": cfg.Drivers.S3NG.Propagator,
|
||||
"async_propagator_options": map[string]interface{}{
|
||||
"propagation_delay": cfg.Drivers.S3NG.AsyncPropagatorOptions.PropagationDelay,
|
||||
},
|
||||
"root": cfg.Drivers.S3NG.Root,
|
||||
"user_layout": cfg.Drivers.S3NG.UserLayout,
|
||||
"share_folder": cfg.Drivers.S3NG.ShareFolder,
|
||||
@@ -283,7 +295,11 @@ func S3NG(cfg *config.Config) map[string]interface{} {
|
||||
// S3NGNoEvents is the config mapping for the s3ng storage driver emitting no events
|
||||
func S3NGNoEvents(cfg *config.Config) map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"metadata_backend": cfg.Drivers.S3NG.MetadataBackend,
|
||||
"metadata_backend": cfg.Drivers.S3NG.MetadataBackend,
|
||||
"propagator": cfg.Drivers.S3NG.Propagator,
|
||||
"async_propagator_options": map[string]interface{}{
|
||||
"propagation_delay": cfg.Drivers.S3NG.AsyncPropagatorOptions.PropagationDelay,
|
||||
},
|
||||
"root": cfg.Drivers.S3NG.Root,
|
||||
"user_layout": cfg.Drivers.S3NG.UserLayout,
|
||||
"share_folder": cfg.Drivers.S3NG.ShareFolder,
|
||||
|
||||
14
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup/lookup.go
generated
vendored
14
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup/lookup.go
generated
vendored
@@ -44,6 +44,20 @@ func init() {
|
||||
tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/lookup")
|
||||
}
|
||||
|
||||
// PathLookup defines the interface for the lookup component
|
||||
type PathLookup interface {
|
||||
NodeFromResource(ctx context.Context, ref *provider.Reference) (*node.Node, error)
|
||||
NodeFromID(ctx context.Context, id *provider.ResourceId) (n *node.Node, err error)
|
||||
|
||||
InternalRoot() string
|
||||
InternalPath(spaceID, nodeID string) string
|
||||
Path(ctx context.Context, n *node.Node, hasPermission node.PermissionFunc) (path string, err error)
|
||||
MetadataBackend() metadata.Backend
|
||||
ReadBlobSizeAttr(ctx context.Context, path string) (int64, error)
|
||||
ReadBlobIDAttr(ctx context.Context, path string) (string, error)
|
||||
TypeFromPath(ctx context.Context, path string) provider.ResourceType
|
||||
}
|
||||
|
||||
// Lookup implements transformations from filepath to node and back
|
||||
type Lookup struct {
|
||||
Options *options.Options
|
||||
|
||||
18
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options/options.go
generated
vendored
18
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options/options.go
generated
vendored
@@ -21,6 +21,7 @@ package options
|
||||
import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/cs3org/reva/v2/pkg/sharedconf"
|
||||
@@ -41,6 +42,11 @@ type Options struct {
|
||||
// the metadata backend to use, currently supports `xattr` or `ini`
|
||||
MetadataBackend string `mapstructure:"metadata_backend"`
|
||||
|
||||
// the propagator to use for this fs. currently only `sync` is fully supported, `async` is available as an experimental feature
|
||||
Propagator string `mapstructure:"propagator"`
|
||||
// Options specific to the async propagator
|
||||
AsyncPropagatorOptions AsyncPropagatorOptions `mapstructure:"async_propagator_options"`
|
||||
|
||||
// ocis fs works on top of a dir of uuid nodes
|
||||
Root string `mapstructure:"root"`
|
||||
|
||||
@@ -78,6 +84,11 @@ type Options struct {
|
||||
MaxQuota uint64 `mapstructure:"max_quota"`
|
||||
}
|
||||
|
||||
// AsyncPropagatorOptions holds the configuration for the async propagator
|
||||
type AsyncPropagatorOptions struct {
|
||||
PropagationDelay time.Duration `mapstructure:"propagation_delay"`
|
||||
}
|
||||
|
||||
// EventOptions are the configurable options for events
|
||||
type EventOptions struct {
|
||||
NatsAddress string `mapstructure:"natsaddress"`
|
||||
@@ -145,5 +156,12 @@ func New(m map[string]interface{}) (*Options, error) {
|
||||
o.MaxConcurrency = 100
|
||||
}
|
||||
|
||||
if o.Propagator == "" {
|
||||
o.Propagator = "sync"
|
||||
}
|
||||
if o.AsyncPropagatorOptions.PropagationDelay == 0 {
|
||||
o.AsyncPropagatorOptions.PropagationDelay = 5 * time.Second
|
||||
}
|
||||
|
||||
return o, nil
|
||||
}
|
||||
|
||||
440
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/async.go
generated
vendored
Normal file
440
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/async.go
generated
vendored
Normal file
@@ -0,0 +1,440 @@
|
||||
// Copyright 2018-2021 CERN
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// In applying this license, CERN does not waive the privileges and immunities
|
||||
// granted to it by virtue of its status as an Intergovernmental Organization
|
||||
// or submit itself to any jurisdiction.
|
||||
|
||||
package propagator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cs3org/reva/v2/pkg/appctx"
|
||||
"github.com/cs3org/reva/v2/pkg/logger"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
|
||||
"github.com/google/renameio/v2"
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rogpeppe/go-internal/lockedfile"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/shamaton/msgpack/v2"
|
||||
)
|
||||
|
||||
var _propagationGracePeriod = 3 * time.Minute
|
||||
|
||||
// AsyncPropagator implements asynchronous treetime & treesize propagation
|
||||
type AsyncPropagator struct {
|
||||
treeSizeAccounting bool
|
||||
treeTimeAccounting bool
|
||||
propagationDelay time.Duration
|
||||
lookup lookup.PathLookup
|
||||
}
|
||||
|
||||
// Change represents a change to the tree
|
||||
type Change struct {
|
||||
SyncTime time.Time
|
||||
SizeDiff int64
|
||||
}
|
||||
|
||||
// NewAsyncPropagator returns a new AsyncPropagator instance
|
||||
func NewAsyncPropagator(treeSizeAccounting, treeTimeAccounting bool, o options.AsyncPropagatorOptions, lookup lookup.PathLookup) AsyncPropagator {
|
||||
p := AsyncPropagator{
|
||||
treeSizeAccounting: treeSizeAccounting,
|
||||
treeTimeAccounting: treeTimeAccounting,
|
||||
propagationDelay: o.PropagationDelay,
|
||||
lookup: lookup,
|
||||
}
|
||||
|
||||
log := logger.New()
|
||||
|
||||
log.Info().Msg("async propagator starting up...")
|
||||
|
||||
// spawn a goroutine that watches for stale .processing dirs and fixes them
|
||||
go func() {
|
||||
if !p.treeTimeAccounting && !p.treeSizeAccounting {
|
||||
// no propagation enabled
|
||||
log.Debug().Msg("propagation disabled or nothing to propagate")
|
||||
return
|
||||
}
|
||||
|
||||
changesDirPath := filepath.Join(p.lookup.InternalRoot(), "changes")
|
||||
doSleep := false // switch to not sleep on the first iteration
|
||||
for {
|
||||
if doSleep {
|
||||
time.Sleep(5 * time.Minute)
|
||||
}
|
||||
doSleep = true
|
||||
log.Debug().Msg("scanning for stale .processing dirs")
|
||||
|
||||
entries, err := filepath.Glob(changesDirPath + "/**/*")
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("failed to list changes")
|
||||
continue
|
||||
}
|
||||
|
||||
for _, e := range entries {
|
||||
changesDirPath := e
|
||||
entry, err := os.Stat(changesDirPath)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
// recover all dirs that seem to have been stuck
|
||||
if !entry.IsDir() || time.Now().Before(entry.ModTime().Add(_propagationGracePeriod)) {
|
||||
continue
|
||||
}
|
||||
|
||||
go func() {
|
||||
if !strings.HasSuffix(changesDirPath, ".processing") {
|
||||
// first rename the existing node dir
|
||||
err = os.Rename(changesDirPath, changesDirPath+".processing")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
changesDirPath += ".processing"
|
||||
}
|
||||
|
||||
log.Debug().Str("dir", changesDirPath).Msg("propagating stale .processing dir")
|
||||
parts := strings.SplitN(entry.Name(), ":", 2)
|
||||
if len(parts) != 2 {
|
||||
log.Error().Str("file", entry.Name()).Msg("encountered invalid .processing dir")
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
_ = os.Chtimes(changesDirPath, now, now)
|
||||
p.propagate(context.Background(), parts[0], strings.TrimSuffix(parts[1], ".processing"), true, *log)
|
||||
}()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// Propagate triggers a propagation
|
||||
func (p AsyncPropagator) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) error {
|
||||
ctx, span := tracer.Start(ctx, "Propagate")
|
||||
defer span.End()
|
||||
log := appctx.GetLogger(ctx).With().
|
||||
Str("method", "async.Propagate").
|
||||
Str("spaceid", n.SpaceID).
|
||||
Str("nodeid", n.ID).
|
||||
Str("parentid", n.ParentID).
|
||||
Int64("sizeDiff", sizeDiff).
|
||||
Logger()
|
||||
|
||||
if !p.treeTimeAccounting && (!p.treeSizeAccounting || sizeDiff == 0) {
|
||||
// no propagation enabled
|
||||
log.Debug().Msg("propagation disabled or nothing to propagate")
|
||||
return nil
|
||||
}
|
||||
|
||||
// add a change to the parent node
|
||||
c := Change{
|
||||
// use a sync time and don't rely on the mtime of the current node, as the stat might not change when a rename happened too quickly
|
||||
SyncTime: time.Now().UTC(),
|
||||
SizeDiff: sizeDiff,
|
||||
}
|
||||
go p.queuePropagation(ctx, n.SpaceID, n.ParentID, c, log)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p AsyncPropagator) queuePropagation(ctx context.Context, spaceID, nodeID string, change Change, log zerolog.Logger) {
|
||||
// add a change to the parent node
|
||||
changePath := p.changesPath(spaceID, nodeID, uuid.New().String()+".mpk")
|
||||
|
||||
data, err := msgpack.Marshal(change)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("failed to marshal Change")
|
||||
return
|
||||
}
|
||||
|
||||
_, subspan := tracer.Start(ctx, "write changes file")
|
||||
ready := false
|
||||
triggerPropagation := false
|
||||
_ = os.MkdirAll(filepath.Dir(filepath.Dir(changePath)), 0700)
|
||||
err = os.Mkdir(filepath.Dir(changePath), 0700)
|
||||
triggerPropagation = err == nil || os.IsExist(err) // only the first goroutine, which succeeds to create the directory, is supposed to actually trigger the propagation
|
||||
for retries := 0; retries <= 500; retries++ {
|
||||
err := renameio.WriteFile(changePath, data, 0644)
|
||||
if err == nil {
|
||||
ready = true
|
||||
break
|
||||
}
|
||||
log.Error().Err(err).Msg("failed to write Change to disk (retrying)")
|
||||
err = os.Mkdir(filepath.Dir(changePath), 0700)
|
||||
triggerPropagation = err == nil || os.IsExist(err) // only the first goroutine, which succeeds to create the directory, is supposed to actually trigger the propagation
|
||||
}
|
||||
|
||||
if !ready {
|
||||
log.Error().Err(err).Msg("failed to write Change to disk")
|
||||
return
|
||||
}
|
||||
subspan.End()
|
||||
|
||||
if !triggerPropagation {
|
||||
return
|
||||
}
|
||||
|
||||
_, subspan = tracer.Start(ctx, "delay propagation")
|
||||
time.Sleep(p.propagationDelay) // wait a moment before propagating
|
||||
subspan.End()
|
||||
|
||||
log.Debug().Msg("propagating")
|
||||
// add a change to the parent node
|
||||
changeDirPath := p.changesPath(spaceID, nodeID, "")
|
||||
|
||||
// first rename the existing node dir
|
||||
err = os.Rename(changeDirPath, changeDirPath+".processing")
|
||||
if err != nil {
|
||||
// This can fail in 2 ways
|
||||
// 1. source does not exist anymore as it has already been propagated by another goroutine
|
||||
// -> ignore, as the change is already being processed
|
||||
// 2. target already exists because a previous propagation is still running
|
||||
// -> ignore, the previous propagation will pick the new changes up
|
||||
return
|
||||
}
|
||||
p.propagate(ctx, spaceID, nodeID, false, log)
|
||||
}
|
||||
|
||||
func (p AsyncPropagator) propagate(ctx context.Context, spaceID, nodeID string, recalculateTreeSize bool, log zerolog.Logger) {
|
||||
changeDirPath := p.changesPath(spaceID, nodeID, "")
|
||||
processingPath := changeDirPath + ".processing"
|
||||
|
||||
cleanup := func() {
|
||||
err := os.RemoveAll(processingPath)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Could not remove .processing dir")
|
||||
}
|
||||
}
|
||||
|
||||
_, subspan := tracer.Start(ctx, "list changes files")
|
||||
d, err := os.Open(processingPath)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Could not open change .processing dir")
|
||||
cleanup()
|
||||
return
|
||||
}
|
||||
defer d.Close()
|
||||
names, err := d.Readdirnames(0)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Could not read dirnames")
|
||||
cleanup()
|
||||
return
|
||||
}
|
||||
subspan.End()
|
||||
|
||||
_, subspan = tracer.Start(ctx, "read changes files")
|
||||
pc := Change{}
|
||||
for _, name := range names {
|
||||
if !strings.HasSuffix(name, ".mpk") {
|
||||
continue
|
||||
}
|
||||
|
||||
b, err := os.ReadFile(filepath.Join(processingPath, name))
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Could not read change")
|
||||
cleanup()
|
||||
return
|
||||
}
|
||||
c := Change{}
|
||||
err = msgpack.Unmarshal(b, &c)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Could not unmarshal change")
|
||||
cleanup()
|
||||
return
|
||||
}
|
||||
if c.SyncTime.After(pc.SyncTime) {
|
||||
pc.SyncTime = c.SyncTime
|
||||
}
|
||||
pc.SizeDiff += c.SizeDiff
|
||||
}
|
||||
subspan.End()
|
||||
|
||||
// TODO do we need to write an aggregated parentchange file?
|
||||
|
||||
attrs := node.Attributes{}
|
||||
|
||||
var f *lockedfile.File
|
||||
// lock parent before reading treesize or tree time
|
||||
nodePath := filepath.Join(p.lookup.InternalRoot(), "spaces", lookup.Pathify(spaceID, 1, 2), "nodes", lookup.Pathify(nodeID, 4, 2))
|
||||
|
||||
_, subspan = tracer.Start(ctx, "lockedfile.OpenFile")
|
||||
lockFilepath := p.lookup.MetadataBackend().LockfilePath(nodePath)
|
||||
f, err = lockedfile.OpenFile(lockFilepath, os.O_RDWR|os.O_CREATE, 0600)
|
||||
subspan.End()
|
||||
if err != nil {
|
||||
log.Error().Err(err).
|
||||
Str("lock filepath", lockFilepath).
|
||||
Msg("Propagation failed. Could not open metadata for node with lock.")
|
||||
cleanup()
|
||||
return
|
||||
}
|
||||
// always log error if closing node fails
|
||||
defer func() {
|
||||
// ignore already closed error
|
||||
cerr := f.Close()
|
||||
if err == nil && cerr != nil && !errors.Is(cerr, os.ErrClosed) {
|
||||
err = cerr // only overwrite err with en error from close if the former was nil
|
||||
}
|
||||
}()
|
||||
|
||||
_, subspan = tracer.Start(ctx, "node.ReadNode")
|
||||
var n *node.Node
|
||||
if n, err = node.ReadNode(ctx, p.lookup, spaceID, nodeID, false, nil, false); err != nil {
|
||||
log.Error().Err(err).
|
||||
Msg("Propagation failed. Could not read node.")
|
||||
cleanup()
|
||||
return
|
||||
}
|
||||
subspan.End()
|
||||
|
||||
if !n.Exists {
|
||||
log.Debug().Str("attr", prefixes.PropagationAttr).Msg("node does not exist anymore, not propagating")
|
||||
cleanup()
|
||||
return
|
||||
}
|
||||
|
||||
if !n.HasPropagation(ctx) {
|
||||
log.Debug().Str("attr", prefixes.PropagationAttr).Msg("propagation attribute not set or unreadable, not propagating")
|
||||
cleanup()
|
||||
return
|
||||
}
|
||||
|
||||
if p.treeTimeAccounting {
|
||||
// update the parent tree time if it is older than the nodes mtime
|
||||
updateSyncTime := false
|
||||
|
||||
var tmTime time.Time
|
||||
tmTime, err = n.GetTMTime(ctx)
|
||||
switch {
|
||||
case err != nil:
|
||||
// missing attribute, or invalid format, overwrite
|
||||
log.Debug().Err(err).
|
||||
Msg("could not read tmtime attribute, overwriting")
|
||||
updateSyncTime = true
|
||||
case tmTime.Before(pc.SyncTime):
|
||||
log.Debug().
|
||||
Time("tmtime", tmTime).
|
||||
Time("stime", pc.SyncTime).
|
||||
Msg("parent tmtime is older than node mtime, updating")
|
||||
updateSyncTime = true
|
||||
default:
|
||||
log.Debug().
|
||||
Time("tmtime", tmTime).
|
||||
Time("stime", pc.SyncTime).
|
||||
Dur("delta", pc.SyncTime.Sub(tmTime)).
|
||||
Msg("node tmtime is younger than stime, not updating")
|
||||
}
|
||||
|
||||
if updateSyncTime {
|
||||
// update the tree time of the parent node
|
||||
attrs.SetString(prefixes.TreeMTimeAttr, pc.SyncTime.UTC().Format(time.RFC3339Nano))
|
||||
}
|
||||
|
||||
attrs.SetString(prefixes.TmpEtagAttr, "")
|
||||
}
|
||||
|
||||
// size accounting
|
||||
if p.treeSizeAccounting && pc.SizeDiff != 0 {
|
||||
var newSize uint64
|
||||
|
||||
// read treesize
|
||||
treeSize, err := n.GetTreeSize(ctx)
|
||||
switch {
|
||||
case recalculateTreeSize || metadata.IsAttrUnset(err):
|
||||
// fallback to calculating the treesize
|
||||
log.Warn().Msg("treesize attribute unset, falling back to calculating the treesize")
|
||||
newSize, err = calculateTreeSize(ctx, p.lookup, n.InternalPath())
|
||||
if err != nil {
|
||||
log.Error().Err(err).
|
||||
Msg("Error when calculating treesize of node.") // FIXME wat?
|
||||
cleanup()
|
||||
return
|
||||
}
|
||||
case err != nil:
|
||||
log.Error().Err(err).
|
||||
Msg("Failed to propagate treesize change. Error when reading the treesize attribute from node")
|
||||
cleanup()
|
||||
return
|
||||
case pc.SizeDiff > 0:
|
||||
newSize = treeSize + uint64(pc.SizeDiff)
|
||||
case uint64(-pc.SizeDiff) > treeSize:
|
||||
// The sizeDiff is larger than the current treesize. Which would result in
|
||||
// a negative new treesize. Something must have gone wrong with the accounting.
|
||||
// Reset the current treesize to 0.
|
||||
log.Error().Uint64("treeSize", treeSize).Int64("sizeDiff", pc.SizeDiff).
|
||||
Msg("Error when updating treesize of node. Updated treesize < 0. Reestting to 0")
|
||||
newSize = 0
|
||||
default:
|
||||
newSize = treeSize - uint64(-pc.SizeDiff)
|
||||
}
|
||||
|
||||
// update the tree size of the node
|
||||
attrs.SetString(prefixes.TreesizeAttr, strconv.FormatUint(newSize, 10))
|
||||
log.Debug().Uint64("newSize", newSize).Msg("updated treesize of node")
|
||||
}
|
||||
|
||||
if err = n.SetXattrsWithContext(ctx, attrs, false); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to update extend attributes of node")
|
||||
cleanup()
|
||||
return
|
||||
}
|
||||
|
||||
// Release node lock early, ignore already closed error
|
||||
_, subspan = tracer.Start(ctx, "f.Close")
|
||||
cerr := f.Close()
|
||||
subspan.End()
|
||||
if cerr != nil && !errors.Is(cerr, os.ErrClosed) {
|
||||
log.Error().Err(cerr).Msg("Failed to close node and release lock")
|
||||
}
|
||||
|
||||
log.Info().Msg("Propagation done. cleaning up")
|
||||
cleanup()
|
||||
|
||||
if !n.IsSpaceRoot(ctx) { // This does not seem robust as it checks the space name property
|
||||
p.queuePropagation(ctx, n.SpaceID, n.ParentID, pc, log)
|
||||
}
|
||||
|
||||
// Check for a changes dir that might have been added meanwhile and pick it up
|
||||
if _, err = os.Open(changeDirPath); err == nil {
|
||||
log.Info().Msg("Found a new changes dir. starting next propagation")
|
||||
time.Sleep(p.propagationDelay) // wait a moment before propagating
|
||||
err = os.Rename(changeDirPath, processingPath)
|
||||
if err != nil {
|
||||
// This can fail in 2 ways
|
||||
// 1. source does not exist anymore as it has already been propagated by another goroutine
|
||||
// -> ignore, as the change is already being processed
|
||||
// 2. target already exists because a previous propagation is still running
|
||||
// -> ignore, the previous propagation will pick the new changes up
|
||||
return
|
||||
}
|
||||
p.propagate(ctx, spaceID, nodeID, false, log)
|
||||
}
|
||||
}
|
||||
|
||||
func (p AsyncPropagator) changesPath(spaceID, nodeID, filename string) string {
|
||||
return filepath.Join(p.lookup.InternalRoot(), "changes", spaceID[0:2], spaceID+":"+nodeID, filename)
|
||||
}
|
||||
101
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/propagator.go
generated
vendored
Normal file
101
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/propagator.go
generated
vendored
Normal file
@@ -0,0 +1,101 @@
|
||||
// Copyright 2018-2021 CERN
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// In applying this license, CERN does not waive the privileges and immunities
|
||||
// granted to it by virtue of its status as an Intergovernmental Organization
|
||||
// or submit itself to any jurisdiction.
|
||||
|
||||
package propagator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
sprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
||||
"github.com/cs3org/reva/v2/pkg/appctx"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
|
||||
"github.com/pkg/errors"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
var tracer trace.Tracer
|
||||
|
||||
func init() {
|
||||
tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/tree/propagator")
|
||||
}
|
||||
|
||||
type Propagator interface {
|
||||
Propagate(ctx context.Context, node *node.Node, sizediff int64) error
|
||||
}
|
||||
|
||||
func New(lookup lookup.PathLookup, o *options.Options) Propagator {
|
||||
switch o.Propagator {
|
||||
case "async":
|
||||
return NewAsyncPropagator(o.TreeSizeAccounting, o.TreeTimeAccounting, o.AsyncPropagatorOptions, lookup)
|
||||
default:
|
||||
return NewSyncPropagator(o.TreeSizeAccounting, o.TreeTimeAccounting, lookup)
|
||||
}
|
||||
}
|
||||
|
||||
func calculateTreeSize(ctx context.Context, lookup lookup.PathLookup, childrenPath string) (uint64, error) {
|
||||
ctx, span := tracer.Start(ctx, "calculateTreeSize")
|
||||
defer span.End()
|
||||
var size uint64
|
||||
|
||||
f, err := os.Open(childrenPath)
|
||||
if err != nil {
|
||||
appctx.GetLogger(ctx).Error().Err(err).Str("childrenPath", childrenPath).Msg("could not open dir")
|
||||
return 0, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
names, err := f.Readdirnames(0)
|
||||
if err != nil {
|
||||
appctx.GetLogger(ctx).Error().Err(err).Str("childrenPath", childrenPath).Msg("could not read dirnames")
|
||||
return 0, err
|
||||
}
|
||||
for i := range names {
|
||||
cPath := filepath.Join(childrenPath, names[i])
|
||||
resolvedPath, err := filepath.EvalSymlinks(cPath)
|
||||
if err != nil {
|
||||
appctx.GetLogger(ctx).Error().Err(err).Str("childpath", cPath).Msg("could not resolve child entry symlink")
|
||||
continue // continue after an error
|
||||
}
|
||||
|
||||
// raw read of the attributes for performance reasons
|
||||
attribs, err := lookup.MetadataBackend().All(ctx, resolvedPath)
|
||||
if err != nil {
|
||||
appctx.GetLogger(ctx).Error().Err(err).Str("childpath", cPath).Msg("could not read attributes of child entry")
|
||||
continue // continue after an error
|
||||
}
|
||||
sizeAttr := ""
|
||||
if string(attribs[prefixes.TypeAttr]) == strconv.FormatUint(uint64(sprovider.ResourceType_RESOURCE_TYPE_FILE), 10) {
|
||||
sizeAttr = string(attribs[prefixes.BlobsizeAttr])
|
||||
} else {
|
||||
sizeAttr = string(attribs[prefixes.TreesizeAttr])
|
||||
}
|
||||
csize, err := strconv.ParseInt(sizeAttr, 10, 64)
|
||||
if err != nil {
|
||||
return 0, errors.Wrapf(err, "invalid blobsize xattr format")
|
||||
}
|
||||
size += uint64(csize)
|
||||
}
|
||||
return size, err
|
||||
}
|
||||
208
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/sync.go
generated
vendored
Normal file
208
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/sync.go
generated
vendored
Normal file
@@ -0,0 +1,208 @@
|
||||
// Copyright 2018-2021 CERN
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// In applying this license, CERN does not waive the privileges and immunities
|
||||
// granted to it by virtue of its status as an Intergovernmental Organization
|
||||
// or submit itself to any jurisdiction.
|
||||
|
||||
package propagator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/cs3org/reva/v2/pkg/appctx"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
|
||||
"github.com/rogpeppe/go-internal/lockedfile"
|
||||
)
|
||||
|
||||
// SyncPropagator implements synchronous treetime & treesize propagation
|
||||
type SyncPropagator struct {
|
||||
treeSizeAccounting bool
|
||||
treeTimeAccounting bool
|
||||
lookup lookup.PathLookup
|
||||
}
|
||||
|
||||
// NewSyncPropagator returns a new AsyncPropagator instance
|
||||
func NewSyncPropagator(treeSizeAccounting, treeTimeAccounting bool, lookup lookup.PathLookup) SyncPropagator {
|
||||
return SyncPropagator{
|
||||
treeSizeAccounting: treeSizeAccounting,
|
||||
treeTimeAccounting: treeTimeAccounting,
|
||||
lookup: lookup,
|
||||
}
|
||||
}
|
||||
|
||||
// Propagate triggers a propagation
|
||||
func (p SyncPropagator) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) error {
|
||||
ctx, span := tracer.Start(ctx, "Propagate")
|
||||
defer span.End()
|
||||
sublog := appctx.GetLogger(ctx).With().
|
||||
Str("method", "sync.Propagate").
|
||||
Str("spaceid", n.SpaceID).
|
||||
Str("nodeid", n.ID).
|
||||
Int64("sizeDiff", sizeDiff).
|
||||
Logger()
|
||||
|
||||
if !p.treeTimeAccounting && (!p.treeSizeAccounting || sizeDiff == 0) {
|
||||
// no propagation enabled
|
||||
sublog.Debug().Msg("propagation disabled or nothing to propagate")
|
||||
return nil
|
||||
}
|
||||
|
||||
// is propagation enabled for the parent node?
|
||||
root := n.SpaceRoot
|
||||
|
||||
// use a sync time and don't rely on the mtime of the current node, as the stat might not change when a rename happened too quickly
|
||||
sTime := time.Now().UTC()
|
||||
|
||||
// we loop until we reach the root
|
||||
var err error
|
||||
for err == nil && n.ID != root.ID {
|
||||
sublog.Debug().Msg("propagating")
|
||||
|
||||
attrs := node.Attributes{}
|
||||
|
||||
var f *lockedfile.File
|
||||
// lock parent before reading treesize or tree time
|
||||
|
||||
_, subspan := tracer.Start(ctx, "lockedfile.OpenFile")
|
||||
parentFilename := p.lookup.MetadataBackend().LockfilePath(n.ParentPath())
|
||||
f, err = lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600)
|
||||
subspan.End()
|
||||
if err != nil {
|
||||
sublog.Error().Err(err).
|
||||
Str("parent filename", parentFilename).
|
||||
Msg("Propagation failed. Could not open metadata for parent with lock.")
|
||||
return err
|
||||
}
|
||||
// always log error if closing node fails
|
||||
defer func() {
|
||||
// ignore already closed error
|
||||
cerr := f.Close()
|
||||
if err == nil && cerr != nil && !errors.Is(cerr, os.ErrClosed) {
|
||||
err = cerr // only overwrite err with en error from close if the former was nil
|
||||
}
|
||||
}()
|
||||
|
||||
if n, err = n.Parent(ctx); err != nil {
|
||||
sublog.Error().Err(err).
|
||||
Msg("Propagation failed. Could not read parent node.")
|
||||
return err
|
||||
}
|
||||
|
||||
if !n.HasPropagation(ctx) {
|
||||
sublog.Debug().Str("attr", prefixes.PropagationAttr).Msg("propagation attribute not set or unreadable, not propagating")
|
||||
// if the attribute is not set treat it as false / none / no propagation
|
||||
return nil
|
||||
}
|
||||
|
||||
sublog = sublog.With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger()
|
||||
|
||||
if p.treeTimeAccounting {
|
||||
// update the parent tree time if it is older than the nodes mtime
|
||||
updateSyncTime := false
|
||||
|
||||
var tmTime time.Time
|
||||
tmTime, err = n.GetTMTime(ctx)
|
||||
switch {
|
||||
case err != nil:
|
||||
// missing attribute, or invalid format, overwrite
|
||||
sublog.Debug().Err(err).
|
||||
Msg("could not read tmtime attribute, overwriting")
|
||||
updateSyncTime = true
|
||||
case tmTime.Before(sTime):
|
||||
sublog.Debug().
|
||||
Time("tmtime", tmTime).
|
||||
Time("stime", sTime).
|
||||
Msg("parent tmtime is older than node mtime, updating")
|
||||
updateSyncTime = true
|
||||
default:
|
||||
sublog.Debug().
|
||||
Time("tmtime", tmTime).
|
||||
Time("stime", sTime).
|
||||
Dur("delta", sTime.Sub(tmTime)).
|
||||
Msg("parent tmtime is younger than node mtime, not updating")
|
||||
}
|
||||
|
||||
if updateSyncTime {
|
||||
// update the tree time of the parent node
|
||||
attrs.SetString(prefixes.TreeMTimeAttr, sTime.UTC().Format(time.RFC3339Nano))
|
||||
}
|
||||
|
||||
attrs.SetString(prefixes.TmpEtagAttr, "")
|
||||
}
|
||||
|
||||
// size accounting
|
||||
if p.treeSizeAccounting && sizeDiff != 0 {
|
||||
var newSize uint64
|
||||
|
||||
// read treesize
|
||||
treeSize, err := n.GetTreeSize(ctx)
|
||||
switch {
|
||||
case metadata.IsAttrUnset(err):
|
||||
// fallback to calculating the treesize
|
||||
sublog.Warn().Msg("treesize attribute unset, falling back to calculating the treesize")
|
||||
newSize, err = calculateTreeSize(ctx, p.lookup, n.InternalPath())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case err != nil:
|
||||
sublog.Error().Err(err).
|
||||
Msg("Faild to propagate treesize change. Error when reading the treesize attribute from parent")
|
||||
return err
|
||||
case sizeDiff > 0:
|
||||
newSize = treeSize + uint64(sizeDiff)
|
||||
case uint64(-sizeDiff) > treeSize:
|
||||
// The sizeDiff is larger than the current treesize. Which would result in
|
||||
// a negative new treesize. Something must have gone wrong with the accounting.
|
||||
// Reset the current treesize to 0.
|
||||
sublog.Error().Uint64("treeSize", treeSize).Int64("sizeDiff", sizeDiff).
|
||||
Msg("Error when updating treesize of parent node. Updated treesize < 0. Reestting to 0")
|
||||
newSize = 0
|
||||
default:
|
||||
newSize = treeSize - uint64(-sizeDiff)
|
||||
}
|
||||
|
||||
// update the tree size of the node
|
||||
attrs.SetString(prefixes.TreesizeAttr, strconv.FormatUint(newSize, 10))
|
||||
sublog.Debug().Uint64("newSize", newSize).Msg("updated treesize of parent node")
|
||||
}
|
||||
|
||||
if err = n.SetXattrsWithContext(ctx, attrs, false); err != nil {
|
||||
sublog.Error().Err(err).Msg("Failed to update extend attributes of parent node")
|
||||
return err
|
||||
}
|
||||
|
||||
// Release node lock early, ignore already closed error
|
||||
_, subspan = tracer.Start(ctx, "f.Close")
|
||||
cerr := f.Close()
|
||||
subspan.End()
|
||||
if cerr != nil && !errors.Is(cerr, os.ErrClosed) {
|
||||
sublog.Error().Err(cerr).Msg("Failed to close parent node and release lock")
|
||||
return cerr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
sublog.Error().Err(err).Msg("error propagating")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
234
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/tree.go
generated
vendored
234
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/tree.go
generated
vendored
@@ -28,7 +28,6 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -36,14 +35,13 @@ import (
|
||||
"github.com/cs3org/reva/v2/pkg/appctx"
|
||||
"github.com/cs3org/reva/v2/pkg/errtypes"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
|
||||
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator"
|
||||
"github.com/cs3org/reva/v2/pkg/utils"
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rogpeppe/go-internal/lockedfile"
|
||||
"github.com/rs/zerolog/log"
|
||||
"go-micro.dev/v4/store"
|
||||
"go.opentelemetry.io/otel"
|
||||
@@ -66,24 +64,11 @@ type Blobstore interface {
|
||||
Delete(node *node.Node) error
|
||||
}
|
||||
|
||||
// PathLookup defines the interface for the lookup component
|
||||
type PathLookup interface {
|
||||
NodeFromResource(ctx context.Context, ref *provider.Reference) (*node.Node, error)
|
||||
NodeFromID(ctx context.Context, id *provider.ResourceId) (n *node.Node, err error)
|
||||
|
||||
InternalRoot() string
|
||||
InternalPath(spaceID, nodeID string) string
|
||||
Path(ctx context.Context, n *node.Node, hasPermission node.PermissionFunc) (path string, err error)
|
||||
MetadataBackend() metadata.Backend
|
||||
ReadBlobSizeAttr(ctx context.Context, path string) (int64, error)
|
||||
ReadBlobIDAttr(ctx context.Context, path string) (string, error)
|
||||
TypeFromPath(ctx context.Context, path string) provider.ResourceType
|
||||
}
|
||||
|
||||
// Tree manages a hierarchical tree
|
||||
type Tree struct {
|
||||
lookup PathLookup
|
||||
blobstore Blobstore
|
||||
lookup lookup.PathLookup
|
||||
blobstore Blobstore
|
||||
propagator propagator.Propagator
|
||||
|
||||
options *options.Options
|
||||
|
||||
@@ -94,12 +79,13 @@ type Tree struct {
|
||||
type PermissionCheckFunc func(rp *provider.ResourcePermissions) bool
|
||||
|
||||
// New returns a new instance of Tree
|
||||
func New(lu PathLookup, bs Blobstore, o *options.Options, cache store.Store) *Tree {
|
||||
func New(lu lookup.PathLookup, bs Blobstore, o *options.Options, cache store.Store) *Tree {
|
||||
return &Tree{
|
||||
lookup: lu,
|
||||
blobstore: bs,
|
||||
options: o,
|
||||
idCache: cache,
|
||||
lookup: lu,
|
||||
blobstore: bs,
|
||||
options: o,
|
||||
idCache: cache,
|
||||
propagator: propagator.New(lu, o),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -718,205 +704,7 @@ func (t *Tree) removeNode(ctx context.Context, path string, n *node.Node) error
|
||||
|
||||
// Propagate propagates changes to the root of the tree
|
||||
func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err error) {
|
||||
ctx, span := tracer.Start(ctx, "Propagate")
|
||||
defer span.End()
|
||||
sublog := appctx.GetLogger(ctx).With().
|
||||
Str("method", "tree.Propagate").
|
||||
Str("spaceid", n.SpaceID).
|
||||
Str("nodeid", n.ID).
|
||||
Int64("sizeDiff", sizeDiff).
|
||||
Logger()
|
||||
|
||||
if !t.options.TreeTimeAccounting && (!t.options.TreeSizeAccounting || sizeDiff == 0) {
|
||||
// no propagation enabled
|
||||
sublog.Debug().Msg("propagation disabled or nothing to propagate")
|
||||
return
|
||||
}
|
||||
|
||||
// is propagation enabled for the parent node?
|
||||
root := n.SpaceRoot
|
||||
|
||||
// use a sync time and don't rely on the mtime of the current node, as the stat might not change when a rename happened too quickly
|
||||
sTime := time.Now().UTC()
|
||||
|
||||
// we loop until we reach the root
|
||||
for err == nil && n.ID != root.ID {
|
||||
sublog.Debug().Msg("propagating")
|
||||
|
||||
attrs := node.Attributes{}
|
||||
|
||||
var f *lockedfile.File
|
||||
// lock parent before reading treesize or tree time
|
||||
|
||||
_, subspan := tracer.Start(ctx, "lockedfile.OpenFile")
|
||||
parentFilename := t.lookup.MetadataBackend().LockfilePath(n.ParentPath())
|
||||
f, err = lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600)
|
||||
subspan.End()
|
||||
if err != nil {
|
||||
sublog.Error().Err(err).
|
||||
Str("parent filename", parentFilename).
|
||||
Msg("Propagation failed. Could not open metadata for parent with lock.")
|
||||
return err
|
||||
}
|
||||
// always log error if closing node fails
|
||||
defer func() {
|
||||
// ignore already closed error
|
||||
cerr := f.Close()
|
||||
if err == nil && cerr != nil && !errors.Is(cerr, os.ErrClosed) {
|
||||
err = cerr // only overwrite err with en error from close if the former was nil
|
||||
}
|
||||
}()
|
||||
|
||||
if n, err = n.Parent(ctx); err != nil {
|
||||
sublog.Error().Err(err).
|
||||
Msg("Propagation failed. Could not read parent node.")
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO none, sync and async?
|
||||
if !n.HasPropagation(ctx) {
|
||||
sublog.Debug().Str("attr", prefixes.PropagationAttr).Msg("propagation attribute not set or unreadable, not propagating")
|
||||
// if the attribute is not set treat it as false / none / no propagation
|
||||
return nil
|
||||
}
|
||||
|
||||
sublog = sublog.With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger()
|
||||
|
||||
if t.options.TreeTimeAccounting {
|
||||
// update the parent tree time if it is older than the nodes mtime
|
||||
updateSyncTime := false
|
||||
|
||||
var tmTime time.Time
|
||||
tmTime, err = n.GetTMTime(ctx)
|
||||
switch {
|
||||
case err != nil:
|
||||
// missing attribute, or invalid format, overwrite
|
||||
sublog.Debug().Err(err).
|
||||
Msg("could not read tmtime attribute, overwriting")
|
||||
updateSyncTime = true
|
||||
case tmTime.Before(sTime):
|
||||
sublog.Debug().
|
||||
Time("tmtime", tmTime).
|
||||
Time("stime", sTime).
|
||||
Msg("parent tmtime is older than node mtime, updating")
|
||||
updateSyncTime = true
|
||||
default:
|
||||
sublog.Debug().
|
||||
Time("tmtime", tmTime).
|
||||
Time("stime", sTime).
|
||||
Dur("delta", sTime.Sub(tmTime)).
|
||||
Msg("parent tmtime is younger than node mtime, not updating")
|
||||
}
|
||||
|
||||
if updateSyncTime {
|
||||
// update the tree time of the parent node
|
||||
attrs.SetString(prefixes.TreeMTimeAttr, sTime.UTC().Format(time.RFC3339Nano))
|
||||
}
|
||||
|
||||
attrs.SetString(prefixes.TmpEtagAttr, "")
|
||||
}
|
||||
|
||||
// size accounting
|
||||
if t.options.TreeSizeAccounting && sizeDiff != 0 {
|
||||
var newSize uint64
|
||||
|
||||
// read treesize
|
||||
treeSize, err := n.GetTreeSize(ctx)
|
||||
switch {
|
||||
case metadata.IsAttrUnset(err):
|
||||
// fallback to calculating the treesize
|
||||
sublog.Warn().Msg("treesize attribute unset, falling back to calculating the treesize")
|
||||
newSize, err = t.calculateTreeSize(ctx, n.InternalPath())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case err != nil:
|
||||
sublog.Error().Err(err).
|
||||
Msg("Faild to propagate treesize change. Error when reading the treesize attribute from parent")
|
||||
return err
|
||||
case sizeDiff > 0:
|
||||
newSize = treeSize + uint64(sizeDiff)
|
||||
case uint64(-sizeDiff) > treeSize:
|
||||
// The sizeDiff is larger than the current treesize. Which would result in
|
||||
// a negative new treesize. Something must have gone wrong with the accounting.
|
||||
// Reset the current treesize to 0.
|
||||
sublog.Error().Uint64("treeSize", treeSize).Int64("sizeDiff", sizeDiff).
|
||||
Msg("Error when updating treesize of parent node. Updated treesize < 0. Reestting to 0")
|
||||
newSize = 0
|
||||
default:
|
||||
newSize = treeSize - uint64(-sizeDiff)
|
||||
}
|
||||
|
||||
// update the tree size of the node
|
||||
attrs.SetString(prefixes.TreesizeAttr, strconv.FormatUint(newSize, 10))
|
||||
sublog.Debug().Uint64("newSize", newSize).Msg("updated treesize of parent node")
|
||||
}
|
||||
|
||||
if err = n.SetXattrsWithContext(ctx, attrs, false); err != nil {
|
||||
sublog.Error().Err(err).Msg("Failed to update extend attributes of parent node")
|
||||
return err
|
||||
}
|
||||
|
||||
// Release node lock early, ignore already closed error
|
||||
_, subspan = tracer.Start(ctx, "f.Close")
|
||||
cerr := f.Close()
|
||||
subspan.End()
|
||||
if cerr != nil && !errors.Is(cerr, os.ErrClosed) {
|
||||
sublog.Error().Err(cerr).Msg("Failed to close parent node and release lock")
|
||||
return cerr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
sublog.Error().Err(err).Msg("error propagating")
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (t *Tree) calculateTreeSize(ctx context.Context, childrenPath string) (uint64, error) {
|
||||
ctx, span := tracer.Start(ctx, "calculateTreeSize")
|
||||
defer span.End()
|
||||
var size uint64
|
||||
|
||||
f, err := os.Open(childrenPath)
|
||||
if err != nil {
|
||||
appctx.GetLogger(ctx).Error().Err(err).Str("childrenPath", childrenPath).Msg("could not open dir")
|
||||
return 0, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
names, err := f.Readdirnames(0)
|
||||
if err != nil {
|
||||
appctx.GetLogger(ctx).Error().Err(err).Str("childrenPath", childrenPath).Msg("could not read dirnames")
|
||||
return 0, err
|
||||
}
|
||||
for i := range names {
|
||||
cPath := filepath.Join(childrenPath, names[i])
|
||||
resolvedPath, err := filepath.EvalSymlinks(cPath)
|
||||
if err != nil {
|
||||
appctx.GetLogger(ctx).Error().Err(err).Str("childpath", cPath).Msg("could not resolve child entry symlink")
|
||||
continue // continue after an error
|
||||
}
|
||||
|
||||
// raw read of the attributes for performance reasons
|
||||
attribs, err := t.lookup.MetadataBackend().All(ctx, resolvedPath)
|
||||
if err != nil {
|
||||
appctx.GetLogger(ctx).Error().Err(err).Str("childpath", cPath).Msg("could not read attributes of child entry")
|
||||
continue // continue after an error
|
||||
}
|
||||
sizeAttr := ""
|
||||
if string(attribs[prefixes.TypeAttr]) == strconv.FormatUint(uint64(provider.ResourceType_RESOURCE_TYPE_FILE), 10) {
|
||||
sizeAttr = string(attribs[prefixes.BlobsizeAttr])
|
||||
} else {
|
||||
sizeAttr = string(attribs[prefixes.TreesizeAttr])
|
||||
}
|
||||
csize, err := strconv.ParseInt(sizeAttr, 10, 64)
|
||||
if err != nil {
|
||||
return 0, errors.Wrapf(err, "invalid blobsize xattr format")
|
||||
}
|
||||
size += uint64(csize)
|
||||
}
|
||||
return size, err
|
||||
return t.propagator.Propagate(ctx, n, sizeDiff)
|
||||
}
|
||||
|
||||
// WriteBlob writes a blob to the blobstore
|
||||
|
||||
3
vendor/modules.txt
vendored
3
vendor/modules.txt
vendored
@@ -352,7 +352,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
|
||||
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
|
||||
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
|
||||
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
|
||||
# github.com/cs3org/reva/v2 v2.15.1-0.20230731062052-2c2e370980e0
|
||||
# github.com/cs3org/reva/v2 v2.15.1-0.20230731073021-26b2f3432d11
|
||||
## explicit; go 1.20
|
||||
github.com/cs3org/reva/v2/cmd/revad/internal/grace
|
||||
github.com/cs3org/reva/v2/cmd/revad/runtime
|
||||
@@ -662,6 +662,7 @@ github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node
|
||||
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options
|
||||
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaceidindex
|
||||
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree
|
||||
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator
|
||||
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload
|
||||
github.com/cs3org/reva/v2/pkg/storage/utils/downloader
|
||||
github.com/cs3org/reva/v2/pkg/storage/utils/eosfs
|
||||
|
||||
Reference in New Issue
Block a user