Bump reva

This commit is contained in:
André Duffeck
2024-05-02 16:52:54 +02:00
parent 738a268ab3
commit 5b2a8f8cad
23 changed files with 410 additions and 269 deletions

2
go.mod
View File

@@ -15,7 +15,7 @@ require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.10.0
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781
github.com/cs3org/reva/v2 v2.19.2-0.20240502102837-7e48a5145133
github.com/cs3org/reva/v2 v2.19.2-0.20240503075142-9982214f5702
github.com/dhowden/tag v0.0.0-20230630033851-978a0926ee25
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e
github.com/egirna/icap-client v0.1.1

4
go.sum
View File

@@ -1025,8 +1025,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c=
github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 h1:BUdwkIlf8IS2FasrrPg8gGPHQPOrQ18MS1Oew2tmGtY=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/reva/v2 v2.19.2-0.20240502102837-7e48a5145133 h1:zuqfsPUYjJjIiq4wx2xTDxlJH+Sh3H3m8ynSjbQ/e8w=
github.com/cs3org/reva/v2 v2.19.2-0.20240502102837-7e48a5145133/go.mod h1:GRUrOp5HbFVwZTgR9bVrMZ/MvVy+Jhxw1PdMmhhKP9E=
github.com/cs3org/reva/v2 v2.19.2-0.20240503075142-9982214f5702 h1:tQt2FTKHI+oe4dYwGrcqvBdFRTp1fvl3xwrVOLldPXA=
github.com/cs3org/reva/v2 v2.19.2-0.20240503075142-9982214f5702/go.mod h1:lBSLlSm6MtbtWutLhfHZiUXMI5/HKwREu5YbEOf3+0k=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg=
github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=

View File

@@ -118,6 +118,17 @@ func (lu *Lookup) TypeFromPath(ctx context.Context, path string) provider.Resour
return t
}
func (lu *Lookup) NodeIDFromParentAndName(ctx context.Context, parent *node.Node, name string) (string, error) {
id, err := lu.metadataBackend.Get(ctx, filepath.Join(parent.InternalPath(), name), prefixes.IDAttr)
if err != nil {
if metadata.IsNotExist(err) {
return "", errtypes.NotFound(name)
}
return "", err
}
return string(id), nil
}
// NodeFromResource takes in a request path or request id and converts it to a Node
func (lu *Lookup) NodeFromResource(ctx context.Context, ref *provider.Reference) (*node.Node, error) {
ctx, span := tracer.Start(ctx, "NodeFromResource")

View File

@@ -34,6 +34,7 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
@@ -687,6 +688,43 @@ func (t *Tree) DeleteBlob(node *node.Node) error {
return t.blobstore.Delete(node)
}
// BuildSpaceIDIndexEntry returns the entry for the space id index
func (t *Tree) BuildSpaceIDIndexEntry(spaceID, nodeID string) string {
return nodeID
}
// ResolveSpaceIDIndexEntry returns the node id for the space id index entry
func (t *Tree) ResolveSpaceIDIndexEntry(spaceid, entry string) (string, string, error) {
return spaceid, entry, nil
}
// InitNewNode initializes a new node
func (t *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (metadata.UnlockFunc, error) {
// create folder structure (if needed)
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
return nil, err
}
// create and write lock new node metadata
unlock, err := t.lookup.MetadataBackend().Lock(n.InternalPath())
if err != nil {
return nil, err
}
// we also need to touch the actual node file here it stores the mtime of the resource
h, err := os.OpenFile(n.InternalPath(), os.O_CREATE|os.O_EXCL, 0600)
if err != nil {
return unlock, err
}
h.Close()
if _, err := node.CheckQuota(ctx, n.SpaceRoot, false, 0, fsize); err != nil {
return unlock, err
}
return unlock, nil
}
// TODO check if node exists?
func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) {
ctx, span := tracer.Start(ctx, "createDirNode")

View File

@@ -26,8 +26,9 @@ import (
// Aspects holds dependencies for handling aspects of the decomposedfs
type Aspects struct {
Lookup node.PathLookup
Tree node.Tree
Permissions permissions.Permissions
EventStream events.Stream
Lookup node.PathLookup
Tree node.Tree
Permissions permissions.Permissions
EventStream events.Stream
DisableVersioning bool
}

View File

@@ -211,7 +211,7 @@ func New(o *options.Options, aspects aspects.Aspects) (storage.FS, error) {
userSpaceIndex: userSpaceIndex,
groupSpaceIndex: groupSpaceIndex,
spaceTypeIndex: spaceTypeIndex,
sessionStore: upload.NewSessionStore(aspects.Lookup, aspects.Tree, o.Root, aspects.EventStream, o.AsyncFileUploads, o.Tokens),
sessionStore: upload.NewSessionStore(aspects.Lookup, aspects.Tree, o.Root, aspects.EventStream, o.AsyncFileUploads, o.Tokens, aspects.DisableVersioning),
}
if o.AsyncFileUploads {

View File

@@ -327,7 +327,7 @@ func (fs *Decomposedfs) storeGrant(ctx context.Context, n *node.Node, g *provide
}
// update the indexes only after successfully setting the grant
err := fs.updateIndexes(ctx, g.GetGrantee(), spaceType, n.ID)
err := fs.updateIndexes(ctx, g.GetGrantee(), spaceType, n.SpaceID, n.ID)
if err != nil {
return err
}

View File

@@ -87,6 +87,23 @@ func (lu *Lookup) ReadBlobIDAttr(ctx context.Context, path string) (string, erro
}
return string(attr), nil
}
func readChildNodeFromLink(path string) (string, error) {
link, err := os.Readlink(path)
if err != nil {
return "", err
}
nodeID := strings.TrimLeft(link, "/.")
nodeID = strings.ReplaceAll(nodeID, "/", "")
return nodeID, nil
}
func (lu *Lookup) NodeIDFromParentAndName(ctx context.Context, parent *node.Node, name string) (string, error) {
nodeID, err := readChildNodeFromLink(filepath.Join(parent.InternalPath(), name))
if err != nil {
return "", errors.Wrap(err, "decomposedfs: Wrap: readlink error")
}
return nodeID, nil
}
// TypeFromPath returns the type of the node at the given path
func (lu *Lookup) TypeFromPath(ctx context.Context, path string) provider.ResourceType {

View File

@@ -19,6 +19,7 @@
package metadata
import (
"io/fs"
"os"
"syscall"
@@ -50,3 +51,13 @@ func IsAttrUnset(err error) bool {
}
return false
}
// The os error is buried inside the fs.PathError error
func IsNotDir(err error) bool {
if perr, ok := errors.Cause(err).(*fs.PathError); ok {
if serr, ok2 := perr.Err.(syscall.Errno); ok2 {
return serr == syscall.ENOTDIR
}
}
return false
}

View File

@@ -304,6 +304,22 @@ func (MessagePackBackend) MetadataPath(path string) string { return path + ".mpk
// LockfilePath returns the path of the lock file
func (MessagePackBackend) LockfilePath(path string) string { return path + ".mlock" }
// Lock locks the metadata for the given path
func (b MessagePackBackend) Lock(path string) (UnlockFunc, error) {
metaLockPath := b.LockfilePath(path)
mlock, err := lockedfile.OpenFile(metaLockPath, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return nil, err
}
return func() error {
err := mlock.Close()
if err != nil {
return err
}
return os.Remove(metaLockPath)
}, nil
}
func (b MessagePackBackend) cacheKey(path string) string {
// rootPath is guaranteed to have no trailing slash
// the cache key shouldn't begin with a slash as some stores drop it which can cause

View File

@@ -35,6 +35,8 @@ func init() {
var errUnconfiguredError = errors.New("no metadata backend configured. Bailing out")
type UnlockFunc func() error
// Backend defines the interface for file attribute backends
type Backend interface {
Name() string
@@ -48,6 +50,7 @@ type Backend interface {
SetMultiple(ctx context.Context, path string, attribs map[string][]byte, acquireLock bool) error
Remove(ctx context.Context, path, key string, acquireLock bool) error
Lock(path string) (UnlockFunc, error)
Purge(path string) error
Rename(oldPath, newPath string) error
IsMetaFile(path string) bool
@@ -99,6 +102,11 @@ func (NullBackend) Remove(ctx context.Context, path string, key string, acquireL
return errUnconfiguredError
}
// Lock locks the metadata for the given path
func (NullBackend) Lock(path string) (UnlockFunc, error) {
return nil, nil
}
// IsMetaFile returns whether the given path represents a meta file
func (NullBackend) IsMetaFile(path string) bool { return false }

View File

@@ -28,6 +28,7 @@ package prefixes
// "user.ocis." in the xattrs_prefix*.go files.
const (
TypeAttr string = OcisPrefix + "type"
IDAttr string = OcisPrefix + "id"
ParentidAttr string = OcisPrefix + "parentid"
OwnerIDAttr string = OcisPrefix + "owner.id"
OwnerIDPAttr string = OcisPrefix + "owner.idp"
@@ -89,6 +90,7 @@ const (
QuotaAttr string = OcisPrefix + "quota"
// the name given to a storage space. It should not contain any semantics as its only purpose is to be read.
SpaceIDAttr string = OcisPrefix + "space.id"
SpaceNameAttr string = OcisPrefix + "space.name"
SpaceTypeAttr string = OcisPrefix + "space.type"
SpaceDescriptionAttr string = OcisPrefix + "space.description"

View File

@@ -113,13 +113,13 @@ func (b XattrsBackend) Set(ctx context.Context, path string, key string, val []b
}
// SetMultiple sets a set of attribute for the given path
func (XattrsBackend) SetMultiple(ctx context.Context, path string, attribs map[string][]byte, acquireLock bool) (err error) {
func (b XattrsBackend) SetMultiple(ctx context.Context, path string, attribs map[string][]byte, acquireLock bool) (err error) {
if acquireLock {
err := os.MkdirAll(filepath.Dir(path), 0600)
if err != nil {
return err
}
lockedFile, err := lockedfile.OpenFile(path+filelocks.LockFileSuffix, os.O_CREATE|os.O_WRONLY, 0600)
lockedFile, err := lockedfile.OpenFile(b.LockfilePath(path), os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
@@ -173,6 +173,22 @@ func (XattrsBackend) MetadataPath(path string) string { return path }
// LockfilePath returns the path of the lock file
func (XattrsBackend) LockfilePath(path string) string { return path + ".mlock" }
// Lock locks the metadata for the given path
func (b XattrsBackend) Lock(path string) (UnlockFunc, error) {
metaLockPath := b.LockfilePath(path)
mlock, err := lockedfile.OpenFile(metaLockPath, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return nil, err
}
return func() error {
err := mlock.Close()
if err != nil {
return err
}
return os.Remove(metaLockPath)
}, nil
}
func cleanupLockfile(f *lockedfile.File) {
_ = f.Close()
_ = os.Remove(f.Name())

View File

@@ -21,16 +21,16 @@ package node
import (
"context"
"crypto/md5"
"crypto/sha1"
"encoding/hex"
"fmt"
"hash"
"hash/adler32"
"io"
"io/fs"
"os"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
@@ -99,10 +99,15 @@ type Tree interface {
RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *Node) (*Node, *Node, func() error, error)
PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*Node, func() error, error)
InitNewNode(ctx context.Context, n *Node, fsize uint64) (metadata.UnlockFunc, error)
WriteBlob(node *Node, source string) error
ReadBlob(node *Node) (io.ReadCloser, error)
DeleteBlob(node *Node) error
BuildSpaceIDIndexEntry(spaceID, nodeID string) string
ResolveSpaceIDIndexEntry(spaceID, entry string) (string, string, error)
Propagate(ctx context.Context, node *Node, sizeDiff int64) (err error)
}
@@ -112,7 +117,10 @@ type PathLookup interface {
NodeFromResource(ctx context.Context, ref *provider.Reference) (*Node, error)
NodeFromID(ctx context.Context, id *provider.ResourceId) (n *Node, err error)
NodeIDFromParentAndName(ctx context.Context, n *Node, name string) (string, error)
GenerateSpaceID(spaceType string, owner *userpb.User) (string, error)
InternalRoot() string
InternalPath(spaceID, nodeID string) string
Path(ctx context.Context, n *Node, hasPermission PermissionFunc) (path string, err error)
@@ -124,6 +132,11 @@ type PathLookup interface {
CopyMetadata(ctx context.Context, src, target string, filter func(attributeName string, value []byte) (newValue []byte, copy bool), acquireTargetLock bool) (err error)
}
type IDCacher interface {
CacheID(ctx context.Context, spaceID, nodeID, val string) error
GetCachedID(ctx context.Context, spaceID, nodeID string) (string, bool)
}
// Node represents a node in the tree and provides methods to get a Parent or Child instance
type Node struct {
SpaceID string
@@ -362,26 +375,6 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis
return n, nil
}
// The os error is buried inside the fs.PathError error
func isNotDir(err error) bool {
if perr, ok := err.(*fs.PathError); ok {
if serr, ok2 := perr.Err.(syscall.Errno); ok2 {
return serr == syscall.ENOTDIR
}
}
return false
}
func readChildNodeFromLink(path string) (string, error) {
link, err := os.Readlink(path)
if err != nil {
return "", err
}
nodeID := strings.TrimLeft(link, "/.")
nodeID = strings.ReplaceAll(nodeID, "/", "")
return nodeID, nil
}
// Child returns the child node with the given name
func (n *Node) Child(ctx context.Context, name string) (*Node, error) {
ctx, span := tracer.Start(ctx, "Child")
@@ -393,24 +386,22 @@ func (n *Node) Child(ctx context.Context, name string) (*Node, error) {
} else if n.SpaceRoot != nil {
spaceID = n.SpaceRoot.ID
}
nodeID, err := readChildNodeFromLink(filepath.Join(n.InternalPath(), name))
if err != nil {
if errors.Is(err, fs.ErrNotExist) || isNotDir(err) {
c := &Node{
SpaceID: spaceID,
lu: n.lu,
ParentID: n.ID,
Name: name,
SpaceRoot: n.SpaceRoot,
}
return c, nil // if the file does not exist we return a node that has Exists = false
}
return nil, errors.Wrap(err, "decomposedfs: Wrap: readlink error")
c := &Node{
SpaceID: spaceID,
lu: n.lu,
ParentID: n.ID,
Name: name,
SpaceRoot: n.SpaceRoot,
}
nodeID, err := n.lu.NodeIDFromParentAndName(ctx, n, name)
switch {
case metadata.IsNotExist(err) || metadata.IsNotDir(err):
return c, nil // if the file does not exist we return a node that has Exists = false
case err != nil:
return nil, err
}
var c *Node
c, err = ReadNode(ctx, n.lu, spaceID, nodeID, false, n.SpaceRoot, true)
if err != nil {
return nil, errors.Wrap(err, "could not read child node")
@@ -1360,3 +1351,30 @@ func enoughDiskSpace(path string, fileSize uint64) bool {
}
return avalB > fileSize
}
// CalculateChecksums calculates the sha1, md5 and adler32 checksums of a file
func CalculateChecksums(ctx context.Context, path string) (hash.Hash, hash.Hash, hash.Hash32, error) {
sha1h := sha1.New()
md5h := md5.New()
adler32h := adler32.New()
_, subspan := tracer.Start(ctx, "os.Open")
f, err := os.Open(path)
subspan.End()
if err != nil {
return nil, nil, nil, err
}
defer f.Close()
r1 := io.TeeReader(f, sha1h)
r2 := io.TeeReader(r1, md5h)
_, subspan = tracer.Start(ctx, "io.Copy")
_, err = io.Copy(adler32h, r2)
subspan.End()
if err != nil {
return nil, nil, nil, err
}
return sha1h, md5h, adler32h, nil
}

View File

@@ -50,9 +50,15 @@ type Options struct {
// ocis fs works on top of a dir of uuid nodes
Root string `mapstructure:"root"`
// the upload directory where uploads in progress are stored
UploadDirectory string `mapstructure:"upload_directory"`
// UserLayout describes the relative path from the storage's root node to the users home node.
UserLayout string `mapstructure:"user_layout"`
// ProjectLayout describes the relative path from the storage's root node to the project spaces root directory.
ProjectLayout string `mapstructure:"project_layout"`
// propagate mtime changes as tmtime (tree modification time) to the parent directory when user.ocis.propagation=1 is set on a node
TreeTimeAccounting bool `mapstructure:"treetime_accounting"`
@@ -65,7 +71,9 @@ type Options struct {
PermTLSMode pool.TLSMode
PersonalSpaceAliasTemplate string `mapstructure:"personalspacealias_template"`
PersonalSpacePathTemplate string `mapstructure:"personalspacepath_template"`
GeneralSpaceAliasTemplate string `mapstructure:"generalspacealias_template"`
GeneralSpacePathTemplate string `mapstructure:"generalspacepath_template"`
AsyncFileUploads bool `mapstructure:"asyncfileuploads"`
@@ -73,10 +81,9 @@ type Options struct {
Tokens TokenOptions `mapstructure:"tokens"`
// FileMetadataCache for file metadata
StatCache cache.Config `mapstructure:"statcache"`
FileMetadataCache cache.Config `mapstructure:"filemetadatacache"`
// IDCache for symlink lookups of direntry to node id
IDCache cache.Config `mapstructure:"idcache"`
IDCache cache.Config `mapstructure:"idcache"`
MaxAcquireLockCycles int `mapstructure:"max_acquire_lock_cycles"`
LockCycleDurationFactor int `mapstructure:"lock_cycle_duration_factor"`
@@ -117,9 +124,6 @@ func New(m map[string]interface{}) (*Options, error) {
o.MetadataBackend = "xattrs"
}
if o.UserLayout == "" {
o.UserLayout = "{{.Id.OpaqueId}}"
}
// ensure user layout has no starting or trailing /
o.UserLayout = strings.Trim(o.UserLayout, "/")
@@ -160,5 +164,9 @@ func New(m map[string]interface{}) (*Options, error) {
o.AsyncPropagatorOptions.PropagationDelay = 5 * time.Second
}
if o.UploadDirectory == "" {
o.UploadDirectory = filepath.Join(o.Root, "uploads")
}
return o, nil
}

View File

@@ -213,7 +213,10 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer
if err != nil {
return err
}
defer f.Close()
defer func() {
_ = f.Close()
_ = os.Remove(fs.lu.MetadataBackend().LockfilePath(n.InternalPath()))
}()
// move current version to new revision
nodePath := fs.lu.InternalPath(spaceID, kp[0])

View File

@@ -45,7 +45,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions"
"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/cs3org/reva/v2/pkg/storage/utils/templates"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
@@ -73,21 +72,17 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
if err != nil {
return nil, err
}
// allow sending a space id
if reqSpaceID := utils.ReadPlainFromOpaque(req.Opaque, "spaceid"); reqSpaceID != "" {
spaceID = reqSpaceID
}
// allow sending a space description
description := utils.ReadPlainFromOpaque(req.Opaque, "description")
// allow sending a spaceAlias
alias := utils.ReadPlainFromOpaque(req.Opaque, "spaceAlias")
if alias == "" {
alias = templates.WithSpacePropertiesAndUser(u, req.Type, req.Name, fs.o.GeneralSpaceAliasTemplate)
alias = templates.WithSpacePropertiesAndUser(u, req.Type, req.Name, spaceID, fs.o.GeneralSpaceAliasTemplate)
}
// TODO enforce a uuid?
// TODO clarify if we want to enforce a single personal storage space or if we want to allow sending the spaceid
if req.Type == _spaceTypePersonal {
alias = templates.WithSpacePropertiesAndUser(u, req.Type, req.Name, fs.o.PersonalSpaceAliasTemplate)
alias = templates.WithSpacePropertiesAndUser(u, req.Type, req.Name, spaceID, fs.o.PersonalSpaceAliasTemplate)
}
root, err := node.ReadNode(ctx, fs.lu, spaceID, spaceID, true, nil, false) // will fall into `Exists` case below
@@ -103,11 +98,28 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
// create a directory node
root.SetType(provider.ResourceType_RESOURCE_TYPE_CONTAINER)
rootPath := root.InternalPath()
switch req.Type {
case _spaceTypePersonal:
if fs.o.PersonalSpacePathTemplate != "" {
rootPath = filepath.Join(fs.o.Root, templates.WithUser(u, fs.o.PersonalSpacePathTemplate))
}
default:
if fs.o.GeneralSpacePathTemplate != "" {
rootPath = filepath.Join(fs.o.Root, templates.WithSpacePropertiesAndUser(u, req.Type, req.Name, spaceID, fs.o.GeneralSpacePathTemplate))
}
}
if err := os.MkdirAll(rootPath, 0700); err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error creating node")
}
// Store id in cache
if c, ok := fs.lu.(node.IDCacher); ok {
if err := c.CacheID(ctx, spaceID, spaceID, rootPath); err != nil {
return nil, err
}
}
if req.GetOwner() != nil && req.GetOwner().GetId() != nil {
root.SetOwner(req.GetOwner().GetId())
} else {
@@ -115,6 +127,8 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
}
metadata := node.Attributes{}
metadata.SetString(prefixes.IDAttr, spaceID)
metadata.SetString(prefixes.SpaceIDAttr, spaceID)
metadata.SetString(prefixes.OwnerIDAttr, root.Owner().GetOpaqueId())
metadata.SetString(prefixes.OwnerIDPAttr, root.Owner().GetIdp())
metadata.SetString(prefixes.OwnerTypeAttr, utils.UserTypeToString(root.Owner().GetType()))
@@ -159,7 +173,7 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
err = fs.updateIndexes(ctx, &provider.Grantee{
Type: provider.GranteeType_GRANTEE_TYPE_USER,
Id: &provider.Grantee_UserId{UserId: req.GetOwner().GetId()},
}, req.Type, root.ID)
}, req.Type, root.ID, root.ID)
if err != nil {
return nil, err
}
@@ -199,19 +213,6 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
return resp, nil
}
// ReadSpaceAndNodeFromIndexLink reads a symlink and parses space and node id if the link has the correct format, eg:
// ../../spaces/4c/510ada-c86b-4815-8820-42cdf82c3d51/nodes/4c/51/0a/da/-c86b-4815-8820-42cdf82c3d51
// ../../spaces/4c/510ada-c86b-4815-8820-42cdf82c3d51/nodes/4c/51/0a/da/-c86b-4815-8820-42cdf82c3d51.T.2022-02-24T12:35:18.196484592Z
func ReadSpaceAndNodeFromIndexLink(link string) (string, string, error) {
// ../../../spaces/sp/ace-id/nodes/sh/or/tn/od/eid
// 0 1 2 3 4 5 6 7 8 9 10 11
parts := strings.Split(link, string(filepath.Separator))
if len(parts) != 12 || parts[0] != ".." || parts[1] != ".." || parts[2] != ".." || parts[3] != "spaces" || parts[6] != "nodes" {
return "", "", errtypes.InternalError("malformed link")
}
return strings.Join(parts[4:6], ""), strings.Join(parts[7:12], ""), nil
}
// ListStorageSpaces returns a list of StorageSpaces.
// The list can be filtered by space type or space id.
// Spaces are persisted with symlinks in /spaces/<type>/<spaceid> pointing to ../../nodes/<nodeid>, the root node of the space
@@ -302,7 +303,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
return spaces, nil
}
matches := map[string]struct{}{}
matches := map[string]string{}
var allMatches map[string]string
var err error
@@ -313,11 +314,11 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
}
if nodeID == spaceIDAny {
for _, match := range allMatches {
matches[match] = struct{}{}
for spaceID, nodeID := range allMatches {
matches[spaceID] = nodeID
}
} else {
matches[allMatches[nodeID]] = struct{}{}
matches[allMatches[nodeID]] = allMatches[nodeID]
}
// get Groups for userid
@@ -340,11 +341,11 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
}
if nodeID == spaceIDAny {
for _, match := range allMatches {
matches[match] = struct{}{}
for spaceID, nodeID := range allMatches {
matches[spaceID] = nodeID
}
} else {
matches[allMatches[nodeID]] = struct{}{}
matches[allMatches[nodeID]] = allMatches[nodeID]
}
}
@@ -370,11 +371,11 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
}
if nodeID == spaceIDAny {
for _, match := range allMatches {
matches[match] = struct{}{}
for spaceID, nodeID := range allMatches {
matches[spaceID] = nodeID
}
} else {
matches[allMatches[nodeID]] = struct{}{}
matches[allMatches[nodeID]] = allMatches[nodeID]
}
}
}
@@ -391,15 +392,15 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
// the personal spaces must also use the nodeid and not the name
numShares := atomic.Int64{}
errg, ctx := errgroup.WithContext(ctx)
work := make(chan string, len(matches))
work := make(chan []string, len(matches))
results := make(chan *provider.StorageSpace, len(matches))
// Distribute work
errg.Go(func() error {
defer close(work)
for match := range matches {
for spaceID, nodeID := range matches {
select {
case work <- match:
case work <- []string{spaceID, nodeID}:
case <-ctx.Done():
return ctx.Err()
}
@@ -415,26 +416,15 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
for i := 0; i < numWorkers; i++ {
errg.Go(func() error {
for match := range work {
var err error
// TODO introduce metadata.IsLockFile(path)
// do not investigate flock files any further. They indicate file locks but are not relevant here.
if strings.HasSuffix(match, filelocks.LockFileSuffix) {
continue
}
// skip metadata files
if fs.lu.MetadataBackend().IsMetaFile(match) {
continue
}
// always read link in case storage space id != node id
linkSpaceID, linkNodeID, err := ReadSpaceAndNodeFromIndexLink(match)
spaceID, nodeID, err := fs.tp.ResolveSpaceIDIndexEntry(match[0], match[1])
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("match", match).Msg("could not read link, skipping")
appctx.GetLogger(ctx).Error().Err(err).Str("id", nodeID).Msg("resolve space id index entry, skipping")
continue
}
n, err := node.ReadNode(ctx, fs.lu, linkSpaceID, linkNodeID, true, nil, true)
n, err := node.ReadNode(ctx, fs.lu, spaceID, nodeID, true, nil, true)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("id", linkNodeID).Msg("could not read node, skipping")
appctx.GetLogger(ctx).Error().Err(err).Str("id", nodeID).Msg("could not read node, skipping")
continue
}
@@ -450,7 +440,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
case errtypes.NotFound:
// ok
default:
appctx.GetLogger(ctx).Error().Err(err).Str("id", linkNodeID).Msg("could not convert to storage space")
appctx.GetLogger(ctx).Error().Err(err).Str("id", nodeID).Msg("could not convert to storage space")
}
continue
}
@@ -504,7 +494,6 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
}
return spaces, nil
}
// UserIDToUserAndGroups converts a user ID to a user with groups
@@ -752,8 +741,12 @@ func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.De
return n.SetDTime(ctx, &dtime)
}
func (fs *Decomposedfs) updateIndexes(ctx context.Context, grantee *provider.Grantee, spaceType, spaceID string) error {
err := fs.linkStorageSpaceType(ctx, spaceType, spaceID)
// the value of `target` depends on the implementation:
// - for ocis/s3ng it is the relative link to the space root
// - for the posixfs it is the node id
func (fs *Decomposedfs) updateIndexes(ctx context.Context, grantee *provider.Grantee, spaceType, spaceID, nodeID string) error {
target := fs.tp.BuildSpaceIDIndexEntry(spaceID, nodeID)
err := fs.linkStorageSpaceType(ctx, spaceType, spaceID, target)
if err != nil {
return err
}
@@ -766,26 +759,23 @@ func (fs *Decomposedfs) updateIndexes(ctx context.Context, grantee *provider.Gra
// create space grant index
switch {
case grantee.Type == provider.GranteeType_GRANTEE_TYPE_USER:
return fs.linkSpaceByUser(ctx, grantee.GetUserId().GetOpaqueId(), spaceID)
return fs.linkSpaceByUser(ctx, grantee.GetUserId().GetOpaqueId(), spaceID, target)
case grantee.Type == provider.GranteeType_GRANTEE_TYPE_GROUP:
return fs.linkSpaceByGroup(ctx, grantee.GetGroupId().GetOpaqueId(), spaceID)
return fs.linkSpaceByGroup(ctx, grantee.GetGroupId().GetOpaqueId(), spaceID, target)
default:
return errtypes.BadRequest("invalid grantee type: " + grantee.GetType().String())
}
}
func (fs *Decomposedfs) linkSpaceByUser(ctx context.Context, userID, spaceID string) error {
target := "../../../spaces/" + lookup.Pathify(spaceID, 1, 2) + "/nodes/" + lookup.Pathify(spaceID, 4, 2)
func (fs *Decomposedfs) linkSpaceByUser(ctx context.Context, userID, spaceID, target string) error {
return fs.userSpaceIndex.Add(userID, spaceID, target)
}
func (fs *Decomposedfs) linkSpaceByGroup(ctx context.Context, groupID, spaceID string) error {
target := "../../../spaces/" + lookup.Pathify(spaceID, 1, 2) + "/nodes/" + lookup.Pathify(spaceID, 4, 2)
func (fs *Decomposedfs) linkSpaceByGroup(ctx context.Context, groupID, spaceID, target string) error {
return fs.groupSpaceIndex.Add(groupID, spaceID, target)
}
func (fs *Decomposedfs) linkStorageSpaceType(ctx context.Context, spaceType string, spaceID string) error {
target := "../../../spaces/" + lookup.Pathify(spaceID, 1, 2) + "/nodes/" + lookup.Pathify(spaceID, 4, 2)
func (fs *Decomposedfs) linkStorageSpaceType(ctx context.Context, spaceType, spaceID, target string) error {
return fs.spaceTypeIndex.Add(spaceType, spaceID, target)
}

View File

@@ -98,6 +98,7 @@ func (p SyncPropagator) Propagate(ctx context.Context, n *node.Node, sizeDiff in
if err == nil && cerr != nil && !errors.Is(cerr, os.ErrClosed) {
err = cerr // only overwrite err with en error from close if the former was nil
}
_ = os.Remove(f.Name())
}()
if n, err = n.Parent(ctx); err != nil {

View File

@@ -34,6 +34,7 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
@@ -68,7 +69,6 @@ type Tree struct {
options *options.Options
// used to cache symlink lookups for child names to node ids
idCache store.Store
}
@@ -662,6 +662,49 @@ func (t *Tree) PurgeRecycleItemFunc(ctx context.Context, spaceid, key string, pa
return rn, fn, nil
}
// InitNewNode initializes a new node
func (t *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (metadata.UnlockFunc, error) {
// create folder structure (if needed)
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
return nil, err
}
// create and write lock new node metadata
unlock, err := t.lookup.MetadataBackend().Lock(n.InternalPath())
if err != nil {
return nil, err
}
// we also need to touch the actual node file here it stores the mtime of the resource
h, err := os.OpenFile(n.InternalPath(), os.O_CREATE|os.O_EXCL, 0600)
if err != nil {
return unlock, err
}
h.Close()
if _, err := node.CheckQuota(ctx, n.SpaceRoot, false, 0, fsize); err != nil {
return unlock, err
}
// link child name to parent if it is new
childNameLink := filepath.Join(n.ParentPath(), n.Name)
relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2))
log := appctx.GetLogger(ctx).With().Str("childNameLink", childNameLink).Str("relativeNodePath", relativeNodePath).Logger()
log.Info().Msg("initNewNode: creating symlink")
if err = os.Symlink(relativeNodePath, childNameLink); err != nil {
log.Info().Err(err).Msg("initNewNode: symlink failed")
if errors.Is(err, fs.ErrExist) {
log.Info().Err(err).Msg("initNewNode: symlink already exists")
return unlock, errtypes.AlreadyExists(n.Name)
}
return unlock, errors.Wrap(err, "Decomposedfs: could not symlink child entry")
}
log.Info().Msg("initNewNode: symlink created")
return unlock, nil
}
func (t *Tree) removeNode(ctx context.Context, path, timeSuffix string, n *node.Node) error {
logger := appctx.GetLogger(ctx)
@@ -766,6 +809,29 @@ func (t *Tree) DeleteBlob(node *node.Node) error {
return t.blobstore.Delete(node)
}
// BuildSpaceIDIndexEntry returns the entry for the space id index
func (t *Tree) BuildSpaceIDIndexEntry(spaceID, nodeID string) string {
return "../../../spaces/" + lookup.Pathify(spaceID, 1, 2) + "/nodes/" + lookup.Pathify(spaceID, 4, 2)
}
// ResolveSpaceIDIndexEntry returns the node id for the space id index entry
func (t *Tree) ResolveSpaceIDIndexEntry(_, entry string) (string, string, error) {
return ReadSpaceAndNodeFromIndexLink(entry)
}
// ReadSpaceAndNodeFromIndexLink reads a symlink and parses space and node id if the link has the correct format, eg:
// ../../spaces/4c/510ada-c86b-4815-8820-42cdf82c3d51/nodes/4c/51/0a/da/-c86b-4815-8820-42cdf82c3d51
// ../../spaces/4c/510ada-c86b-4815-8820-42cdf82c3d51/nodes/4c/51/0a/da/-c86b-4815-8820-42cdf82c3d51.T.2022-02-24T12:35:18.196484592Z
func ReadSpaceAndNodeFromIndexLink(link string) (string, string, error) {
// ../../../spaces/sp/ace-id/nodes/sh/or/tn/od/eid
// 0 1 2 3 4 5 6 7 8 9 10 11
parts := strings.Split(link, string(filepath.Separator))
if len(parts) != 12 || parts[0] != ".." || parts[1] != ".." || parts[2] != ".." || parts[3] != "spaces" || parts[6] != "nodes" {
return "", "", errtypes.InternalError("malformed link")
}
return strings.Join(parts[4:6], ""), strings.Join(parts[7:12], ""), nil
}
// TODO check if node exists?
func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) {
ctx, span := tracer.Start(ctx, "createDirNode")

View File

@@ -34,7 +34,7 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
@@ -53,23 +53,25 @@ type PermissionsChecker interface {
// OcisStore manages upload sessions
type OcisStore struct {
lu node.PathLookup
tp Tree
root string
pub events.Publisher
async bool
tknopts options.TokenOptions
lu node.PathLookup
tp node.Tree
root string
pub events.Publisher
async bool
tknopts options.TokenOptions
disableVersioning bool
}
// NewSessionStore returns a new OcisStore
func NewSessionStore(lu node.PathLookup, tp Tree, root string, pub events.Publisher, async bool, tknopts options.TokenOptions) *OcisStore {
func NewSessionStore(lu node.PathLookup, tp node.Tree, root string, pub events.Publisher, async bool, tknopts options.TokenOptions, disableVersioning bool) *OcisStore {
return &OcisStore{
lu: lu,
tp: tp,
root: root,
pub: pub,
async: async,
tknopts: tknopts,
lu: lu,
tp: tp,
root: root,
pub: pub,
async: async,
tknopts: tknopts,
disableVersioning: disableVersioning,
}
}
@@ -204,7 +206,7 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.
return nil, err
}
var f *lockedfile.File
var unlock metadata.UnlockFunc
if session.NodeExists() { // TODO this is wrong. The node should be created when the upload starts, the revisions should be created independently of the node
// we do not need to propagate a change when a node is created, only when the upload is ready.
// that still creates problems for desktop clients because if another change causes propagation it will detects an empty file
@@ -214,21 +216,31 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.
// so we have to check if the node has been created meanwhile (well, only in case the upload does not know the nodeid ... or the NodeExists array that is checked by session.NodeExists())
// FIXME look at the disk again to see if the file has been created in between, or just try initializing a new node and do the update existing node as a fallback. <- the latter!
f, err = store.updateExistingNode(ctx, session, n, session.SpaceID(), uint64(session.Size()))
if f != nil {
appctx.GetLogger(ctx).Debug().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from updateExistingNode")
unlock, err = store.updateExistingNode(ctx, session, n, session.SpaceID(), uint64(session.Size()))
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Msg("failed to update existing node")
}
} else {
f, err = store.initNewNode(ctx, session, n, uint64(session.Size()))
if f != nil {
appctx.GetLogger(ctx).Debug().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from initNewNode")
if c, ok := store.lu.(node.IDCacher); ok {
err := c.CacheID(ctx, n.SpaceID, n.ID, filepath.Join(n.ParentPath(), n.Name))
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Msg("failed to cache id")
}
}
unlock, err = store.tp.InitNewNode(ctx, n, uint64(session.Size()))
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Msg("failed to init new node")
}
session.info.MetaData["sizeDiff"] = strconv.FormatInt(session.Size(), 10)
}
defer func() {
if f == nil {
if unlock == nil {
appctx.GetLogger(ctx).Info().Msg("did not get a unlockfunc, not unlocking")
return
}
if err := f.Close(); err != nil {
if err := unlock(); err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("nodeid", n.ID).Str("parentid", n.ParentID).Msg("could not close lock")
}
}()
@@ -243,6 +255,7 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.
}
// overwrite technical information
initAttrs.SetString(prefixes.IDAttr, n.ID)
initAttrs.SetString(prefixes.MTimeAttr, mtime.UTC().Format(time.RFC3339Nano))
initAttrs.SetInt64(prefixes.TypeAttr, int64(provider.ResourceType_RESOURCE_TYPE_FILE))
initAttrs.SetString(prefixes.ParentidAttr, n.ParentID)
@@ -264,51 +277,7 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.
return n, nil
}
func (store OcisStore) initNewNode(ctx context.Context, session *OcisSession, n *node.Node, fsize uint64) (*lockedfile.File, error) {
// create folder structure (if needed)
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
return nil, err
}
// create and write lock new node metadata
f, err := lockedfile.OpenFile(store.lu.MetadataBackend().LockfilePath(n.InternalPath()), os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return nil, err
}
// we also need to touch the actual node file here it stores the mtime of the resource
h, err := os.OpenFile(n.InternalPath(), os.O_CREATE|os.O_EXCL, 0600)
if err != nil {
return f, err
}
h.Close()
if _, err := node.CheckQuota(ctx, n.SpaceRoot, false, 0, fsize); err != nil {
return f, err
}
// link child name to parent if it is new
childNameLink := filepath.Join(n.ParentPath(), n.Name)
relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2))
log := appctx.GetLogger(ctx).With().Str("childNameLink", childNameLink).Str("relativeNodePath", relativeNodePath).Logger()
log.Info().Msg("initNewNode: creating symlink")
if err = os.Symlink(relativeNodePath, childNameLink); err != nil {
log.Info().Err(err).Msg("initNewNode: symlink failed")
if errors.Is(err, iofs.ErrExist) {
log.Info().Err(err).Msg("initNewNode: symlink already exists")
return f, errtypes.AlreadyExists(n.Name)
}
return f, errors.Wrap(err, "Decomposedfs: could not symlink child entry")
}
log.Info().Msg("initNewNode: symlink created")
// on a new file the sizeDiff is the fileSize
session.info.MetaData["sizeDiff"] = strconv.FormatInt(int64(fsize), 10)
return f, nil
}
func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSession, n *node.Node, spaceID string, fsize uint64) (*lockedfile.File, error) {
func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSession, n *node.Node, spaceID string, fsize uint64) (metadata.UnlockFunc, error) {
targetPath := n.InternalPath()
// write lock existing node before reading any metadata
@@ -317,35 +286,43 @@ func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSess
return nil, err
}
unlock := func() error {
err := f.Close()
if err != nil {
return err
}
return os.Remove(store.lu.MetadataBackend().LockfilePath(targetPath))
}
old, _ := node.ReadNode(ctx, store.lu, spaceID, n.ID, false, nil, false)
if _, err := node.CheckQuota(ctx, n.SpaceRoot, true, uint64(old.Blobsize), fsize); err != nil {
return f, err
return unlock, err
}
oldNodeMtime, err := old.GetMTime(ctx)
if err != nil {
return f, err
return unlock, err
}
oldNodeEtag, err := node.CalculateEtag(old.ID, oldNodeMtime)
if err != nil {
return f, err
return unlock, err
}
// When the if-match header was set we need to check if the
// etag still matches before finishing the upload.
if session.HeaderIfMatch() != "" && session.HeaderIfMatch() != oldNodeEtag {
return f, errtypes.Aborted("etag mismatch")
return unlock, errtypes.Aborted("etag mismatch")
}
// When the if-none-match header was set we need to check if any of the
// etags matches before finishing the upload.
if session.HeaderIfNoneMatch() != "" {
if session.HeaderIfNoneMatch() == "*" {
return f, errtypes.Aborted("etag mismatch, resource exists")
return unlock, errtypes.Aborted("etag mismatch, resource exists")
}
for _, ifNoneMatchTag := range strings.Split(session.HeaderIfNoneMatch(), ",") {
if ifNoneMatchTag == oldNodeEtag {
return f, errtypes.Aborted("etag mismatch")
return unlock, errtypes.Aborted("etag mismatch")
}
}
}
@@ -355,37 +332,41 @@ func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSess
if session.HeaderIfUnmodifiedSince() != "" {
ifUnmodifiedSince, err := time.Parse(time.RFC3339Nano, session.HeaderIfUnmodifiedSince())
if err != nil {
return f, errtypes.InternalError(fmt.Sprintf("failed to parse if-unmodified-since time: %s", err))
return unlock, errtypes.InternalError(fmt.Sprintf("failed to parse if-unmodified-since time: %s", err))
}
if oldNodeMtime.After(ifUnmodifiedSince) {
return f, errtypes.Aborted("if-unmodified-since mismatch")
return unlock, errtypes.Aborted("if-unmodified-since mismatch")
}
}
session.info.MetaData["versionsPath"] = session.store.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+oldNodeMtime.UTC().Format(time.RFC3339Nano))
versionPath := n.InternalPath()
if !store.disableVersioning {
versionPath = session.store.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+oldNodeMtime.UTC().Format(time.RFC3339Nano))
// create version node
if _, err := os.Create(versionPath); err != nil {
return unlock, err
}
// copy blob metadata to version node
if err := store.lu.CopyMetadataWithSourceLock(ctx, targetPath, versionPath, func(attributeName string, value []byte) (newValue []byte, copy bool) {
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
attributeName == prefixes.TypeAttr ||
attributeName == prefixes.BlobIDAttr ||
attributeName == prefixes.BlobsizeAttr ||
attributeName == prefixes.MTimeAttr
}, f, true); err != nil {
return unlock, err
}
}
session.info.MetaData["sizeDiff"] = strconv.FormatInt((int64(fsize) - old.Blobsize), 10)
// create version node
if _, err := os.Create(session.info.MetaData["versionsPath"]); err != nil {
return f, err
}
// copy blob metadata to version node
if err := store.lu.CopyMetadataWithSourceLock(ctx, targetPath, session.info.MetaData["versionsPath"], func(attributeName string, value []byte) (newValue []byte, copy bool) {
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
attributeName == prefixes.TypeAttr ||
attributeName == prefixes.BlobIDAttr ||
attributeName == prefixes.BlobsizeAttr ||
attributeName == prefixes.MTimeAttr
}, f, true); err != nil {
return f, err
}
session.info.MetaData["versionsPath"] = versionPath
// keep mtime from previous version
if err := os.Chtimes(session.info.MetaData["versionsPath"], oldNodeMtime, oldNodeMtime); err != nil {
return f, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err))
return unlock, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err))
}
return f, nil
return unlock, nil
}

View File

@@ -20,12 +20,9 @@ package upload
import (
"context"
"crypto/md5"
"crypto/sha1"
"encoding/hex"
"fmt"
"hash"
"hash/adler32"
"io"
"io/fs"
"os"
@@ -53,27 +50,6 @@ func init() {
tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/upload")
}
// Tree is used to manage a tree hierarchy
type Tree interface {
Setup() error
GetMD(ctx context.Context, node *node.Node) (os.FileInfo, error)
ListFolder(ctx context.Context, node *node.Node) ([]*node.Node, error)
// CreateHome(owner *userpb.UserId) (n *node.Node, err error)
CreateDir(ctx context.Context, node *node.Node) (err error)
// CreateReference(ctx context.Context, node *node.Node, targetURI *url.URL) error
Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) (err error)
Delete(ctx context.Context, node *node.Node) (err error)
RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *node.Node) (*node.Node, *node.Node, func() error, error)
PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*node.Node, func() error, error)
WriteBlob(node *node.Node, binPath string) error
ReadBlob(node *node.Node) (io.ReadCloser, error)
DeleteBlob(node *node.Node) error
Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error)
}
var defaultFilePerm = os.FileMode(0664)
// WriteChunk writes the stream from the reader to the given offset of the upload
@@ -131,37 +107,13 @@ func (session *OcisSession) FinishUpload(ctx context.Context) error {
ctx = ctxpkg.ContextSetInitiator(ctx, session.InitiatorID())
// calculate the checksum of the written bytes
// they will all be written to the metadata later, so we cannot omit any of them
// TODO only calculate the checksum in sync that was requested to match, the rest could be async ... but the tests currently expect all to be present
// TODO the hashes all implement BinaryMarshaler so we could try to persist the state for resumable upload. we would neet do keep track of the copied bytes ...
sha1h := sha1.New()
md5h := md5.New()
adler32h := adler32.New()
{
_, subspan := tracer.Start(ctx, "os.Open")
f, err := os.Open(session.binPath())
subspan.End()
if err != nil {
// we can continue if no oc checksum header is set
log.Info().Err(err).Str("binPath", session.binPath()).Msg("error opening binPath")
}
defer f.Close()
r1 := io.TeeReader(f, sha1h)
r2 := io.TeeReader(r1, md5h)
_, subspan = tracer.Start(ctx, "io.Copy")
_, err = io.Copy(adler32h, r2)
subspan.End()
if err != nil {
log.Info().Err(err).Msg("error copying checksums")
}
sha1h, md5h, adler32h, err := node.CalculateChecksums(ctx, session.binPath())
if err != nil {
log.Info().Err(err).Msg("error copying checksums")
}
// compare if they match the sent checksum
// TODO the tus checksum extension would do this on every chunk, but I currently don't see an easy way to pass in the requested checksum. for now we do it in FinishUpload which is also called for chunked uploads
var err error
if session.info.MetaData["checksum"] != "" {
var err error
parts := strings.SplitN(session.info.MetaData["checksum"], " ", 2)

View File

@@ -52,6 +52,7 @@ type (
*UserData
SpaceType string
SpaceName string
SpaceId string
}
// EmailData contains mail data
@@ -89,9 +90,9 @@ func WithUser(u *userpb.User, tpl string) string {
}
// WithSpacePropertiesAndUser generates a layout based on user data and a space type.
func WithSpacePropertiesAndUser(u *userpb.User, spaceType string, spaceName string, tpl string) string {
func WithSpacePropertiesAndUser(u *userpb.User, spaceType string, spaceName string, spaceID string, tpl string) string {
tpl = clean(tpl)
sd := newSpaceData(u, spaceType, spaceName)
sd := newSpaceData(u, spaceType, spaceName, spaceID)
// compile given template tpl
t, err := template.New("tpl").Funcs(sprig.TxtFuncMap()).Parse(tpl)
if err != nil {
@@ -147,12 +148,13 @@ func newUserData(u *userpb.User) *UserData {
return ut
}
func newSpaceData(u *userpb.User, st string, n string) *SpaceData {
func newSpaceData(u *userpb.User, st, n, id string) *SpaceData {
userData := newUserData(u)
sd := &SpaceData{
userData,
st,
n,
id,
}
return sd
}

2
vendor/modules.txt vendored
View File

@@ -366,7 +366,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
# github.com/cs3org/reva/v2 v2.19.2-0.20240502102837-7e48a5145133
# github.com/cs3org/reva/v2 v2.19.2-0.20240503075142-9982214f5702
## explicit; go 1.21
github.com/cs3org/reva/v2/cmd/revad/internal/grace
github.com/cs3org/reva/v2/cmd/revad/runtime