chore:reva bump v.2.32 (#737)

This commit is contained in:
Viktor Scharf
2025-04-28 15:32:22 +02:00
committed by GitHub
parent 4dea7ed870
commit ecedf7dc6d
25 changed files with 794 additions and 128 deletions

8
go.mod
View File

@@ -63,12 +63,12 @@ require (
github.com/onsi/ginkgo/v2 v2.23.4
github.com/onsi/gomega v1.37.0
github.com/open-policy-agent/opa v1.3.0
github.com/opencloud-eu/reva/v2 v2.31.0
github.com/opencloud-eu/reva/v2 v2.32.0
github.com/orcaman/concurrent-map v1.0.0
github.com/owncloud/libre-graph-api-go v1.0.5-0.20240829135935-80dc00d6f5ea
github.com/pkg/errors v0.9.1
github.com/pkg/xattr v0.4.10
github.com/prometheus/client_golang v1.21.1
github.com/prometheus/client_golang v1.22.0
github.com/r3labs/sse/v2 v2.10.0
github.com/riandyrn/otelchi v0.12.1
github.com/rogpeppe/go-internal v1.14.1
@@ -152,7 +152,7 @@ require (
github.com/bluele/gcache v0.0.2 // indirect
github.com/bombsimon/logrusr/v3 v3.1.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/ceph/go-ceph v0.32.0 // indirect
github.com/ceph/go-ceph v0.33.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cevaris/ordered_map v0.0.0-20190319150403-3adeae072e73 // indirect
github.com/cloudflare/circl v1.3.7 // indirect
@@ -197,7 +197,7 @@ require (
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/go-resty/resty/v2 v2.7.0 // indirect
github.com/go-sql-driver/mysql v1.9.1 // indirect
github.com/go-sql-driver/mysql v1.9.2 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/go-test/deep v1.1.0 // indirect

22
go.sum
View File

@@ -202,8 +202,8 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/ceph/go-ceph v0.32.0 h1:iXRUGdPmH7h9Vf/WA1Dg3Wo1tgL7gcUbylfpbxrlGLs=
github.com/ceph/go-ceph v0.32.0/go.mod h1:42eoJzyLS3VREzqrg2ot44NtuluQZi55hFRSoLF36GQ=
github.com/ceph/go-ceph v0.33.0 h1:xT9v/MAa+DIBmflyITyFkGRgWngATghGegKJguEOInQ=
github.com/ceph/go-ceph v0.33.0/go.mod h1:6ef0lIyDHnwArykqfWZDWCfbbJAVTXL1tOYrM1M4bAE=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
@@ -277,8 +277,6 @@ github.com/dlclark/regexp2 v1.4.0 h1:F1rxgk7p4uKjwIQxBs9oAXe5CqrXlCduYEJvrF4u93E
github.com/dlclark/regexp2 v1.4.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc=
github.com/dnaeon/go-vcr v1.0.1/go.mod h1:aBB1+wY4s93YsC3HHjMBMrwTj2R9FHDzUr9KyGc8n1E=
github.com/dnsimple/dnsimple-go v0.63.0/go.mod h1:O5TJ0/U6r7AfT8niYNlmohpLbCSG+c71tQlGr9SeGrg=
github.com/dragonchaser/lico v0.0.0-20250416141658-4d60b0ff2e7d h1:eMHIpPhZenDGOBgAHToKBy6gssUuDhyva+pQPlNDe/8=
github.com/dragonchaser/lico v0.0.0-20250416141658-4d60b0ff2e7d/go.mod h1:GLIhLiUD3QUvbdZ+d7tKdkTwaotVQ3qhC8t1biWzFf8=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e h1:rcHHSQqzCgvlwP0I/fQ8rQMn/MpHE5gWSLdtpxtP6KQ=
@@ -418,8 +416,8 @@ github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq
github.com/go-resty/resty/v2 v2.1.1-0.20191201195748-d7b97669fe48/go.mod h1:dZGr0i9PLlaaTD4H/hoZIDjQ+r6xq8mgbRzHZf7f2J8=
github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
github.com/go-sql-driver/mysql v1.9.1 h1:FrjNGn/BsJQjVRuSa8CBrM5BWA9BWoXXat3KrtSb/iI=
github.com/go-sql-driver/mysql v1.9.1/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU=
github.com/go-sql-driver/mysql v1.9.2 h1:4cNKDYQ1I84SXslGddlsrMhc8k4LeDVj6Ad6WRjiHuU=
github.com/go-sql-driver/mysql v1.9.2/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
@@ -447,8 +445,8 @@ github.com/gofrs/flock v0.12.1/go.mod h1:9zxTsyu5xtJ9DK+1tFZyibEV7y3uwDxPPfbxeeH
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA=
github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gofrs/uuid/v5 v5.3.0 h1:m0mUMr+oVYUdxpMLgSYCZiXe7PuVPnI94+OMeVBNedk=
github.com/gofrs/uuid/v5 v5.3.0/go.mod h1:CDOjlDMVAtN56jqyRUZh58JT31Tiw7/oQyEXZV+9bD8=
github.com/gofrs/uuid/v5 v5.3.2 h1:2jfO8j3XgSwlz/wHqemAEugfnTlikAYHhnqQ8Xh4fE0=
github.com/gofrs/uuid/v5 v5.3.2/go.mod h1:CDOjlDMVAtN56jqyRUZh58JT31Tiw7/oQyEXZV+9bD8=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
@@ -861,8 +859,8 @@ github.com/onsi/gomega v1.37.0 h1:CdEG8g0S133B4OswTDC/5XPSzE1OeP29QOioj2PID2Y=
github.com/onsi/gomega v1.37.0/go.mod h1:8D9+Txp43QWKhM24yyOBEdpkzN8FvJyAwecBgsU4KU0=
github.com/open-policy-agent/opa v1.3.0 h1:zVvQvQg+9+FuSRBt4LgKNzJwsWl/c85kD5jPozJTydY=
github.com/open-policy-agent/opa v1.3.0/go.mod h1:t9iPNhaplD2qpiBqeudzJtEX3fKHK8zdA29oFvofAHo=
github.com/opencloud-eu/reva/v2 v2.31.0 h1:UVgeb0hSPoaDdqcKSJ7XZAhXCtHaVK9qm/JtFtJM/7U=
github.com/opencloud-eu/reva/v2 v2.31.0/go.mod h1:8MT1a/WJASZZhlSMC0oeE3ECQdjqFw3BUiiAIZ/JR8I=
github.com/opencloud-eu/reva/v2 v2.32.0 h1:JRWPleHiEl0film95Gkh1iBEhc6eikEsx5FKLfVx6l8=
github.com/opencloud-eu/reva/v2 v2.32.0/go.mod h1:FDhGVC+ZsRRWdC3am4EbuILBtviTbCDVrTUjFECOqvg=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
@@ -921,8 +919,8 @@ github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqr
github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
github.com/prometheus/client_golang v1.12.2/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
github.com/prometheus/client_golang v1.13.0/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ=
github.com/prometheus/client_golang v1.21.1 h1:DOvXXTqVzvkIewV/CDPFdejpMCGeMcbGCQ8YOmu+Ibk=
github.com/prometheus/client_golang v1.21.1/go.mod h1:U9NM32ykUErtVBxdvD3zfi+EuFkkaBvMb09mIfe0Zgg=
github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q=
github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0=
github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=

View File

@@ -47,7 +47,14 @@ func (mount *MountInfo) OpenDir(path string) (*Directory, error) {
//
// int ceph_closedir(struct ceph_mount_info *cmount, struct ceph_dir_result *dirp);
func (dir *Directory) Close() error {
return getError(C.ceph_closedir(dir.mount.mount, dir.dir))
if dir.dir == nil {
return nil
}
if err := getError(C.ceph_closedir(dir.mount.mount, dir.dir)); err != nil {
return err
}
dir.dir = nil
return nil
}
// Inode represents an inode number in the file system.
@@ -141,6 +148,9 @@ func toDirEntryPlus(de *C.struct_dirent, s C.struct_ceph_statx) *DirEntryPlus {
//
// int ceph_readdir_r(struct ceph_mount_info *cmount, struct ceph_dir_result *dirp, struct dirent *de);
func (dir *Directory) ReadDir() (*DirEntry, error) {
if dir.dir == nil {
return nil, errBadFile
}
var de C.struct_dirent
ret := C.ceph_readdir_r(dir.mount.mount, dir.dir, &de)
if ret < 0 {
@@ -165,6 +175,9 @@ func (dir *Directory) ReadDir() (*DirEntry, error) {
func (dir *Directory) ReadDirPlus(
want StatxMask, flags AtFlags) (*DirEntryPlus, error) {
if dir.dir == nil {
return nil, errBadFile
}
var (
de C.struct_dirent
s C.struct_ceph_statx
@@ -193,6 +206,9 @@ func (dir *Directory) ReadDirPlus(
//
// void ceph_rewinddir(struct ceph_mount_info *cmount, struct ceph_dir_result *dirp);
func (dir *Directory) RewindDir() {
if dir.dir == nil {
return
}
C.ceph_rewinddir(dir.mount.mount, dir.dir)
}

View File

@@ -46,4 +46,6 @@ var (
errInvalid = getError(-C.EINVAL)
errNameTooLong = getError(-C.ENAMETOOLONG)
errRange = getError(-C.ERANGE)
errBadFile = getError(-C.EBADF)
errNotDir = getError(-C.ENOTDIR)
)

View File

@@ -101,6 +101,9 @@ func (f *File) read(buf []byte, offset int64) (int, error) {
if err := f.validate(); err != nil {
return 0, err
}
if len(buf) == 0 {
return 0, nil
}
bufptr := (*C.char)(unsafe.Pointer(&buf[0]))
ret := C.ceph_read(
f.mount.mount, f.fd, bufptr, C.int64_t(len(buf)), C.int64_t(offset))
@@ -178,6 +181,9 @@ func (f *File) write(buf []byte, offset int64) (int, error) {
if err := f.validate(); err != nil {
return 0, err
}
if len(buf) == 0 {
return 0, nil
}
bufptr := (*C.char)(unsafe.Pointer(&buf[0]))
ret := C.ceph_write(
f.mount.mount, f.fd, bufptr, C.int64_t(len(buf)), C.int64_t(offset))

408
vendor/github.com/ceph/go-ceph/cephfs/fscompat.go generated vendored Normal file
View File

@@ -0,0 +1,408 @@
//go:build ceph_preview
package cephfs
import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"path"
"strings"
"time"
"github.com/ceph/go-ceph/internal/log"
)
var (
errIsDir = errors.New("is a directory")
)
// MountWrapper provides a wrapper type that adapts a CephFS Mount into a
// io.FS compatible type.
type MountWrapper struct {
mount *MountInfo
enableTrace bool
}
type fileWrapper struct {
parent *MountWrapper
file *File
name string
}
type dirWrapper struct {
parent *MountWrapper
directory *Directory
name string
}
type dentryWrapper struct {
parent *MountWrapper
de *DirEntryPlus
}
type infoWrapper struct {
parent *MountWrapper
sx *CephStatx
name string
}
// Wrap a CephFS Mount object into a new type that is compatible with Go's io.FS
// interface. CephFS Mounts are not compatible with io.FS directly because the
// go-ceph library predates the addition of io.FS to Go as well as the fact that
// go-ceph attempts to provide APIs that match the cephfs libraries first and
// foremost.
func Wrap(mount *MountInfo) *MountWrapper {
wm := &MountWrapper{mount: mount}
debugf(wm, "Wrap", "created")
return wm
}
/* MountWrapper:
** Implements https://pkg.go.dev/io/fs#FS
** Wraps cephfs.MountInfo
*/
// SetTracing configures the MountWrapper and objects connected to it for debug
// tracing. True enables tracing and false disables it. A debug logging
// function must also be set using go-ceph's common.log.SetDebugf function.
func (mw *MountWrapper) SetTracing(enable bool) {
mw.enableTrace = enable
}
// identify the MountWrapper object for logging purposes.
func (mw *MountWrapper) identify() string {
return fmt.Sprintf("MountWrapper<%p>", mw)
}
// trace returns true if debug tracing is enabled.
func (mw *MountWrapper) trace() bool {
return mw.enableTrace
}
// Open opens the named file. This may be either a regular file or a directory.
// Directories opened with this function will return object compatible with the
// io.ReadDirFile interface.
func (mw *MountWrapper) Open(name string) (fs.File, error) {
debugf(mw, "Open", "(%v)", name)
// there are a bunch of patterns that fsTetster/testfs looks for that seems
// under-documented. They mainly seem to try and enforce "clean" paths.
// look for them and reject them here because ceph libs won't reject on
// its own
if strings.HasPrefix(name, "/") ||
strings.HasSuffix(name, "/.") ||
strings.Contains(name, "//") ||
strings.Contains(name, "/./") ||
strings.Contains(name, "/../") {
return nil, &fs.PathError{Op: "open", Path: name, Err: errInvalid}
}
d, err := mw.mount.OpenDir(name)
if err == nil {
debugf(mw, "Open", "(%v): dir ok", name)
dw := &dirWrapper{parent: mw, directory: d, name: name}
return dw, nil
}
if !errors.Is(err, errNotDir) {
debugf(mw, "Open", "(%v): dir error: %v", name, err)
return nil, &fs.PathError{Op: "open", Path: name, Err: err}
}
f, err := mw.mount.Open(name, os.O_RDONLY, 0)
if err == nil {
debugf(mw, "Open", "(%v): file ok", name)
fw := &fileWrapper{parent: mw, file: f, name: name}
return fw, nil
}
debugf(mw, "Open", "(%v): file error: %v", name, err)
return nil, &fs.PathError{Op: "open", Path: name, Err: err}
}
/* fileWrapper:
** Implements https://pkg.go.dev/io/fs#FS
** Wraps cephfs.File
*/
func (fw *fileWrapper) Stat() (fs.FileInfo, error) {
debugf(fw, "Stat", "()")
sx, err := fw.file.Fstatx(StatxBasicStats, AtSymlinkNofollow)
if err != nil {
debugf(fw, "Stat", "() -> err:%v", err)
return nil, &fs.PathError{Op: "stat", Path: fw.name, Err: err}
}
debugf(fw, "Stat", "() ok")
return &infoWrapper{fw.parent, sx, path.Base(fw.name)}, nil
}
func (fw *fileWrapper) Read(b []byte) (int, error) {
debugf(fw, "Read", "(...)")
return fw.file.Read(b)
}
func (fw *fileWrapper) Close() error {
debugf(fw, "Close", "()")
return fw.file.Close()
}
func (fw *fileWrapper) identify() string {
return fmt.Sprintf("fileWrapper<%p>[%v]", fw, fw.name)
}
func (fw *fileWrapper) trace() bool {
return fw.parent.trace()
}
/* dirWrapper:
** Implements https://pkg.go.dev/io/fs#ReadDirFile
** Wraps cephfs.Directory
*/
func (dw *dirWrapper) Stat() (fs.FileInfo, error) {
debugf(dw, "Stat", "()")
sx, err := dw.parent.mount.Statx(dw.name, StatxBasicStats, AtSymlinkNofollow)
if err != nil {
debugf(dw, "Stat", "() -> err:%v", err)
return nil, &fs.PathError{Op: "stat", Path: dw.name, Err: err}
}
debugf(dw, "Stat", "() ok")
return &infoWrapper{dw.parent, sx, path.Base(dw.name)}, nil
}
func (dw *dirWrapper) Read(_ []byte) (int, error) {
debugf(dw, "Read", "(...)")
return 0, &fs.PathError{Op: "read", Path: dw.name, Err: errIsDir}
}
func (dw *dirWrapper) ReadDir(n int) ([]fs.DirEntry, error) {
debugf(dw, "ReadDir", "(%v)", n)
if n > 0 {
return dw.readDirSome(n)
}
return dw.readDirAll()
}
const defaultDirReadCount = 256 // how many entries to read per loop
func (dw *dirWrapper) readDirAll() ([]fs.DirEntry, error) {
debugf(dw, "readDirAll", "()")
var (
err error
egroup []fs.DirEntry
entries = make([]fs.DirEntry, 0)
size = defaultDirReadCount
)
for {
egroup, err = dw.readDirSome(size)
entries = append(entries, egroup...)
if err == io.EOF {
err = nil
break
}
if err != nil {
break
}
}
debugf(dw, "readDirAll", "() -> len:%v, err:%v", len(entries), err)
return entries, err
}
func (dw *dirWrapper) readDirSome(n int) ([]fs.DirEntry, error) {
debugf(dw, "readDirSome", "(%v)", n)
var (
idx int
err error
entry *DirEntryPlus
entries = make([]fs.DirEntry, n)
)
for {
entry, err = dw.directory.ReadDirPlus(StatxBasicStats, AtSymlinkNofollow)
debugf(dw, "readDirSome", "(%v): got entry:%v, err:%v", n, entry, err)
if err != nil || entry == nil {
break
}
switch entry.Name() {
case ".", "..":
continue
}
entries[idx] = &dentryWrapper{dw.parent, entry}
idx++
if idx >= n {
break
}
}
if idx == 0 {
debugf(dw, "readDirSome", "(%v): EOF", n)
return nil, io.EOF
}
debugf(dw, "readDirSome", "(%v): got entry:%v, err:%v", n, entries[:idx], err)
return entries[:idx], err
}
func (dw *dirWrapper) Close() error {
debugf(dw, "Close", "()")
return dw.directory.Close()
}
func (dw *dirWrapper) identify() string {
return fmt.Sprintf("dirWrapper<%p>[%v]", dw, dw.name)
}
func (dw *dirWrapper) trace() bool {
return dw.parent.trace()
}
/* dentryWrapper:
** Implements https://pkg.go.dev/io/fs#DirEntry
** Wraps cephfs.DirEntryPlus
*/
func (dew *dentryWrapper) Name() string {
debugf(dew, "Name", "()")
return dew.de.Name()
}
func (dew *dentryWrapper) IsDir() bool {
v := dew.de.DType() == DTypeDir
debugf(dew, "IsDir", "() -> %v", v)
return v
}
func (dew *dentryWrapper) Type() fs.FileMode {
m := dew.de.Statx().Mode
v := cephModeToFileMode(m).Type()
debugf(dew, "Type", "() -> %v", v)
return v
}
func (dew *dentryWrapper) Info() (fs.FileInfo, error) {
debugf(dew, "Info", "()")
sx := dew.de.Statx()
name := dew.de.Name()
return &infoWrapper{dew.parent, sx, name}, nil
}
func (dew *dentryWrapper) identify() string {
return fmt.Sprintf("dentryWrapper<%p>[%v]", dew, dew.de.Name())
}
func (dew *dentryWrapper) trace() bool {
return dew.parent.trace()
}
/* infoWrapper:
** Implements https://pkg.go.dev/io/fs#FileInfo
** Wraps cephfs.CephStatx
*/
func (iw *infoWrapper) Name() string {
debugf(iw, "Name", "()")
return iw.name
}
func (iw *infoWrapper) Size() int64 {
debugf(iw, "Size", "() -> %v", iw.sx.Size)
return int64(iw.sx.Size)
}
func (iw *infoWrapper) Sys() any {
debugf(iw, "Sys", "()")
return iw.sx
}
func (iw *infoWrapper) Mode() fs.FileMode {
v := cephModeToFileMode(iw.sx.Mode)
debugf(iw, "Mode", "() -> %#o -> %#o/%v", iw.sx.Mode, uint32(v), v.Type())
return v
}
func (iw *infoWrapper) IsDir() bool {
v := iw.sx.Mode&modeIFMT == modeIFDIR
debugf(iw, "IsDir", "() -> %v", v)
return v
}
func (iw *infoWrapper) ModTime() time.Time {
v := time.Unix(iw.sx.Mtime.Sec, iw.sx.Mtime.Nsec)
debugf(iw, "ModTime", "() -> %v", v)
return v
}
func (iw *infoWrapper) identify() string {
return fmt.Sprintf("infoWrapper<%p>[%v]", iw, iw.name)
}
func (iw *infoWrapper) trace() bool {
return iw.parent.trace()
}
/* copy and paste values from the linux headers. We always need to use
** the linux header values, regardless of the platform go-ceph is built
** for. Rather than jumping through header hoops, copy and paste is
** more consistent and reliable.
*/
const (
/* file type mask */
modeIFMT = uint16(0170000)
/* file types */
modeIFDIR = uint16(0040000)
modeIFCHR = uint16(0020000)
modeIFBLK = uint16(0060000)
modeIFREG = uint16(0100000)
modeIFIFO = uint16(0010000)
modeIFLNK = uint16(0120000)
modeIFSOCK = uint16(0140000)
/* protection bits */
modeISUID = uint16(0004000)
modeISGID = uint16(0002000)
modeISVTX = uint16(0001000)
)
// cephModeToFileMode takes a linux compatible cephfs mode value
// and returns a Go-compatiable os-agnostic FileMode value.
func cephModeToFileMode(m uint16) fs.FileMode {
// start with permission bits
mode := fs.FileMode(m & 0777)
// file type - inspired by go's src/os/stat_linux.go
switch m & modeIFMT {
case modeIFBLK:
mode |= fs.ModeDevice
case modeIFCHR:
mode |= fs.ModeDevice | fs.ModeCharDevice
case modeIFDIR:
mode |= fs.ModeDir
case modeIFIFO:
mode |= fs.ModeNamedPipe
case modeIFLNK:
mode |= fs.ModeSymlink
case modeIFREG:
// nothing to do
case modeIFSOCK:
mode |= fs.ModeSocket
}
// protection bits
if m&modeISUID != 0 {
mode |= fs.ModeSetuid
}
if m&modeISGID != 0 {
mode |= fs.ModeSetgid
}
if m&modeISVTX != 0 {
mode |= fs.ModeSticky
}
return mode
}
// wrapperObject helps identify an object to be logged.
type wrapperObject interface {
identify() string
trace() bool
}
// debugf formats info about a function and logs it.
func debugf(o wrapperObject, fname, format string, args ...any) {
if o.trace() {
log.Debugf(fmt.Sprintf("%v.%v: %s", o.identify(), fname, format), args...)
}
}

View File

@@ -1,5 +1,3 @@
//go:build ceph_preview
package rados
// #cgo LDFLAGS: -lrados

View File

@@ -1,5 +1,10 @@
# Changelog
## v1.9.2 (2025-04-07)
v1.9.2 is a re-release of v1.9.1 due to a release process issue; no changes were made to the content.
## v1.9.1 (2025-03-21)
### Major Changes

View File

@@ -56,7 +56,7 @@ func (s *svc) handlePathCopy(w http.ResponseWriter, r *http.Request, ns string)
ctx, span := appctx.GetTracerProvider(r.Context()).Tracer(tracerName).Start(r.Context(), "copy")
defer span.End()
if r.Body != http.NoBody {
if !isBodyEmpty(r) {
w.WriteHeader(http.StatusUnsupportedMediaType)
b, err := errors.Marshal(http.StatusUnsupportedMediaType, "body must be empty", "", "")
errors.HandleWebdavError(appctx.GetLogger(ctx), w, b, err)
@@ -331,7 +331,7 @@ func (s *svc) handleSpacesCopy(w http.ResponseWriter, r *http.Request, spaceID s
ctx, span := appctx.GetTracerProvider(r.Context()).Tracer(tracerName).Start(r.Context(), "spaces_copy")
defer span.End()
if r.Body != http.NoBody {
if !isBodyEmpty(r) {
w.WriteHeader(http.StatusUnsupportedMediaType)
b, err := errors.Marshal(http.StatusUnsupportedMediaType, "body must be empty", "", "")
errors.HandleWebdavError(appctx.GetLogger(ctx), w, b, err)

View File

@@ -39,7 +39,7 @@ func (s *svc) handlePathDelete(w http.ResponseWriter, r *http.Request, ns string
ctx, span := appctx.GetTracerProvider(r.Context()).Tracer(tracerName).Start(ctx, "path_delete")
defer span.End()
if r.Body != http.NoBody {
if !isBodyEmpty(r) {
return http.StatusUnsupportedMediaType, errors.New("body must be empty")
}
@@ -126,7 +126,7 @@ func (s *svc) handleSpacesDelete(w http.ResponseWriter, r *http.Request, spaceID
ctx, span := appctx.GetTracerProvider(r.Context()).Tracer(tracerName).Start(ctx, "spaces_delete")
defer span.End()
if r.Body != http.NoBody {
if !isBodyEmpty(r) {
return http.StatusUnsupportedMediaType, errors.New("body must be empty")
}

View File

@@ -107,7 +107,7 @@ func (s *svc) handleSpacesMkCol(w http.ResponseWriter, r *http.Request, spaceID
}
func (s *svc) handleMkcol(ctx context.Context, w http.ResponseWriter, r *http.Request, parentRef, childRef *provider.Reference, log zerolog.Logger) (status int, err error) {
if r.Body != http.NoBody {
if !isBodyEmpty(r) {
// We currently do not support extended mkcol https://datatracker.ietf.org/doc/rfc5689/
// TODO let clients send a body with properties to set on the new resource
return http.StatusUnsupportedMediaType, fmt.Errorf("extended-mkcol not supported")

View File

@@ -42,7 +42,7 @@ func (s *svc) handlePathMove(w http.ResponseWriter, r *http.Request, ns string)
ctx, span := appctx.GetTracerProvider(r.Context()).Tracer(tracerName).Start(r.Context(), "move")
defer span.End()
if r.Body != http.NoBody {
if !isBodyEmpty(r) {
w.WriteHeader(http.StatusUnsupportedMediaType)
b, err := errors.Marshal(http.StatusUnsupportedMediaType, "body must be empty", "", "")
errors.HandleWebdavError(appctx.GetLogger(ctx), w, b, err)
@@ -106,7 +106,7 @@ func (s *svc) handleSpacesMove(w http.ResponseWriter, r *http.Request, srcSpaceI
ctx, span := appctx.GetTracerProvider(r.Context()).Tracer(tracerName).Start(r.Context(), "spaces_move")
defer span.End()
if r.Body != http.NoBody {
if !isBodyEmpty(r) {
w.WriteHeader(http.StatusUnsupportedMediaType)
b, err := errors.Marshal(http.StatusUnsupportedMediaType, "body must be empty", "", "")
errors.HandleWebdavError(appctx.GetLogger(ctx), w, b, err)

View File

@@ -20,6 +20,7 @@ package ocdav
import (
"context"
"io"
"net/http"
"path"
"strings"
@@ -399,3 +400,17 @@ func (s *svc) referenceIsChildOf(ctx context.Context, selector pool.Selectable[g
func filename(p string) string {
return strings.Trim(path.Base(p), "/")
}
// isBodyEmpty returns true when the Body of the request is Empty
func isBodyEmpty(r *http.Request) bool {
if r.Body != nil && r.Body != http.NoBody {
buf := make([]byte, 0)
_, err := r.Body.Read(buf)
if err != io.EOF {
// We currently do not support extended mkcol https://datatracker.ietf.org/doc/rfc5689/
// TODO let clients send a body with properties to set on the new resource
return false
}
}
return true
}

View File

@@ -399,11 +399,14 @@ func (lu *Lookup) GenerateSpaceID(spaceType string, owner *user.User) (string, e
case _spaceTypeProject:
return uuid.New().String(), nil
case _spaceTypePersonal:
path := templates.WithUser(owner, lu.Options.PersonalSpacePathTemplate)
relPath := templates.WithUser(owner, lu.Options.PersonalSpacePathTemplate)
path := filepath.Join(lu.Options.Root, relPath)
spaceID, _, err := lu.IDsForPath(context.TODO(), filepath.Join(lu.Options.Root, path))
// do we already know about this space?
spaceID, _, err := lu.IDsForPath(context.TODO(), path)
if err != nil {
_, err := os.Stat(filepath.Join(lu.Options.Root, path))
// check if the space exists on disk incl. attributes
spaceID, _, _, _, err := lu.metadataBackend.IdentifyPath(context.TODO(), path)
if err != nil {
if metadata.IsNotExist(err) || metadata.IsAttrUnset(err) {
return uuid.New().String(), nil
@@ -411,6 +414,10 @@ func (lu *Lookup) GenerateSpaceID(spaceType string, owner *user.User) (string, e
return "", err
}
}
if len(spaceID) == 0 {
return "", errtypes.InternalError("encountered empty space id on disk")
}
return spaceID, nil
}
return spaceID, nil
default:

View File

@@ -39,11 +39,12 @@ type Options struct {
// a revision when the file is changed.
EnableFSRevisions bool `mapstructure:"enable_fs_revisions"`
ScanFS bool `mapstructure:"scan_fs"`
WatchFS bool `mapstructure:"watch_fs"`
WatchType string `mapstructure:"watch_type"`
WatchPath string `mapstructure:"watch_path"`
WatchFolderKafkaBrokers string `mapstructure:"watch_folder_kafka_brokers"`
ScanFS bool `mapstructure:"scan_fs"`
WatchFS bool `mapstructure:"watch_fs"`
WatchType string `mapstructure:"watch_type"`
WatchPath string `mapstructure:"watch_path"`
WatchRoot string `mapstructure:"watch_root"` // base directory for the watch. events will be considered relative to this path
WatchNotificationBrokers string `mapstructure:"watch_notification_brokers"`
// InotifyWatcher specific options
InotifyStatsFrequency time.Duration `mapstructure:"inotify_stats_frequency"`

View File

@@ -36,7 +36,6 @@ import (
"github.com/pkg/xattr"
"github.com/rs/zerolog/log"
userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/opencloud-eu/reva/v2/pkg/events"
"github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/metadata"
@@ -342,10 +341,9 @@ func (t *Tree) getNodeForPath(path string) (*node.Node, error) {
return node.ReadNode(context.Background(), t.lookup, spaceID, nodeID, false, nil, false)
}
func (t *Tree) findSpaceId(path string) (string, node.Attributes, error) {
func (t *Tree) findSpaceId(path string) (string, error) {
// find the space id, scope by the according user
spaceCandidate := path
spaceAttrs := node.Attributes{}
for strings.HasPrefix(spaceCandidate, t.options.Root) {
spaceID, _, err := t.lookup.IDsForPath(context.Background(), spaceCandidate)
if err == nil && len(spaceID) > 0 {
@@ -353,67 +351,62 @@ func (t *Tree) findSpaceId(path string) (string, node.Attributes, error) {
// set the uid and gid for the space
fi, err := os.Stat(spaceCandidate)
if err != nil {
return "", spaceAttrs, err
return "", err
}
sys := fi.Sys().(*syscall.Stat_t)
gid := int(sys.Gid)
_, err = t.userMapper.ScopeUserByIds(-1, gid)
if err != nil {
return "", spaceAttrs, err
return "", err
}
}
return spaceID, spaceAttrs, nil
return spaceID, nil
}
spaceCandidate = filepath.Dir(spaceCandidate)
}
return "", spaceAttrs, fmt.Errorf("could not find space for path %s", path)
return "", fmt.Errorf("could not find space for path %s", path)
}
func (t *Tree) assimilate(item scanItem) error {
t.log.Debug().Str("path", item.Path).Bool("rescan", item.ForceRescan).Bool("recurse", item.Recurse).Msg("assimilate")
var err error
// First find the space id
spaceID, spaceAttrs, err := t.findSpaceId(item.Path)
spaceID, id, parentID, mtime, err := t.lookup.MetadataBackend().IdentifyPath(context.Background(), item.Path)
if err != nil {
return err
}
assimilationNode := &assimilationNode{
spaceID: spaceID,
path: item.Path,
}
// lock the file for assimilation
unlock, err := t.lookup.MetadataBackend().Lock(assimilationNode)
if err != nil {
return errors.Wrap(err, "failed to lock item for assimilation")
}
defer func() {
_ = unlock()
}()
user := &userv1beta1.UserId{
Idp: string(spaceAttrs[prefixes.OwnerIDPAttr]),
OpaqueId: string(spaceAttrs[prefixes.OwnerIDAttr]),
}
// check for the id attribute again after grabbing the lock, maybe the file was assimilated/created by us in the meantime
_, id, parentID, mtime, err := t.lookup.MetadataBackend().IdentifyPath(context.Background(), item.Path)
if err != nil {
return err
if spaceID == "" {
// node didn't have a space ID attached. try to find it by walking up the path on disk
spaceID, err = t.findSpaceId(item.Path)
if err != nil {
return err
}
}
if id != "" {
// the file has an id set, we already know it from the past
// n := node.NewBaseNode(spaceID, id, t.lookup)
// lock the file for re-assimilation
assimilationNode := &assimilationNode{
spaceID: spaceID,
nodeId: id,
path: item.Path,
}
unlock, err := t.lookup.MetadataBackend().Lock(assimilationNode)
if err != nil {
return errors.Wrap(err, "failed to lock item for assimilation")
}
defer func() {
_ = unlock()
}()
previousPath, ok := t.lookup.GetCachedID(context.Background(), spaceID, id)
if previousPath == "" || !ok {
previousPath, ok = t.lookup.IDHistoryCache.Get(context.Background(), spaceID, id)
}
// previousParentID, _ := t.lookup.MetadataBackend().Get(context.Background(), n, prefixes.ParentidAttr)
// compare metadata mtime with actual mtime. if it matches AND the path hasn't changed (move operation)
// we can skip the assimilation because the file was handled by us
@@ -446,7 +439,7 @@ func (t *Tree) assimilate(item scanItem) error {
if err := t.lookup.CacheID(context.Background(), spaceID, id, item.Path); err != nil {
t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", id).Str("path", item.Path).Msg("could not cache id")
}
_, attrs, err := t.updateFile(item.Path, id, spaceID)
_, attrs, err := t.updateFile(item.Path, id, spaceID, fi)
if err != nil {
return err
}
@@ -484,9 +477,6 @@ func (t *Tree) assimilate(item scanItem) error {
Path: filepath.Base(previousPath),
}
t.PublishEvent(events.ItemMoved{
SpaceOwner: user,
Executant: user,
Owner: user,
Ref: ref,
OldReference: oldRef,
Timestamp: utils.TSNow(),
@@ -500,16 +490,38 @@ func (t *Tree) assimilate(item scanItem) error {
t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", id).Str("path", item.Path).Msg("could not cache id")
}
_, _, err := t.updateFile(item.Path, id, spaceID)
_, _, err := t.updateFile(item.Path, id, spaceID, fi)
if err != nil {
return err
}
}
} else {
t.log.Debug().Str("path", item.Path).Msg("new item detected")
assimilationNode := &assimilationNode{
spaceID: spaceID,
// Use the path as the node ID (which is used for calculating the lock file path) since we do not have an ID yet
nodeId: strings.ReplaceAll(strings.TrimPrefix(item.Path, "/"), "/", "-"),
}
unlock, err := t.lookup.MetadataBackend().Lock(assimilationNode)
if err != nil {
return err
}
defer func() { _ = unlock() }()
// check if the file got an ID while we were waiting for the lock
_, id, _, _, err = t.lookup.MetadataBackend().IdentifyPath(context.Background(), item.Path)
if err != nil {
return err
}
if id != "" {
// file was assimilated by another thread while we were waiting for the lock
t.log.Debug().Str("path", item.Path).Msg("file was assimilated by another thread")
return nil
}
// assimilate new file
newId := uuid.New().String()
fi, _, err := t.updateFile(item.Path, newId, spaceID)
fi, _, err := t.updateFile(item.Path, newId, spaceID, nil)
if err != nil {
return err
}
@@ -523,25 +535,19 @@ func (t *Tree) assimilate(item scanItem) error {
}
if fi.IsDir() {
t.PublishEvent(events.ContainerCreated{
SpaceOwner: user,
Executant: user,
Owner: user,
Ref: ref,
Timestamp: utils.TSNow(),
Ref: ref,
Timestamp: utils.TSNow(),
})
} else {
if fi.Size() == 0 {
t.PublishEvent(events.FileTouched{
SpaceOwner: user,
Executant: user,
Ref: ref,
Timestamp: utils.TSNow(),
Ref: ref,
Timestamp: utils.TSNow(),
})
} else {
t.PublishEvent(events.UploadReady{
SpaceOwner: user,
FileRef: ref,
Timestamp: utils.TSNow(),
FileRef: ref,
Timestamp: utils.TSNow(),
})
}
}
@@ -549,7 +555,7 @@ func (t *Tree) assimilate(item scanItem) error {
return nil
}
func (t *Tree) updateFile(path, id, spaceID string) (fs.FileInfo, node.Attributes, error) {
func (t *Tree) updateFile(path, id, spaceID string, fi fs.FileInfo) (fs.FileInfo, node.Attributes, error) {
retries := 1
parentID := ""
bn := assimilationNode{spaceID: spaceID, nodeId: id, path: path}
@@ -585,9 +591,12 @@ assimilate:
}
// assimilate file
fi, err := os.Stat(path)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to stat item")
if fi == nil {
var err error
fi, err = os.Stat(path)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to stat item")
}
}
attrs, err := t.lookup.MetadataBackend().All(context.Background(), bn)
@@ -604,13 +613,6 @@ assimilate:
attributes[prefixes.ParentidAttr] = []byte(parentID)
}
sha1h, md5h, adler32h, err := node.CalculateChecksums(context.Background(), path)
if err == nil {
attributes[prefixes.ChecksumPrefix+"sha1"] = sha1h.Sum(nil)
attributes[prefixes.ChecksumPrefix+"md5"] = md5h.Sum(nil)
attributes[prefixes.ChecksumPrefix+"adler32"] = adler32h.Sum(nil)
}
var n *node.Node
if fi.IsDir() {
attributes.SetInt64(prefixes.TypeAttr, int64(provider.ResourceType_RESOURCE_TYPE_CONTAINER))
@@ -625,6 +627,13 @@ assimilate:
}
n = node.New(spaceID, id, parentID, filepath.Base(path), treeSize, "", provider.ResourceType_RESOURCE_TYPE_CONTAINER, nil, t.lookup)
} else {
sha1h, md5h, adler32h, err := node.CalculateChecksums(context.Background(), path)
if err == nil {
attributes[prefixes.ChecksumPrefix+"sha1"] = sha1h.Sum(nil)
attributes[prefixes.ChecksumPrefix+"md5"] = md5h.Sum(nil)
attributes[prefixes.ChecksumPrefix+"adler32"] = adler32h.Sum(nil)
}
blobID := uuid.NewString()
attributes.SetString(prefixes.BlobIDAttr, blobID)
attributes.SetInt64(prefixes.BlobsizeAttr, fi.Size())

View File

@@ -0,0 +1,120 @@
// Copyright 2025 OpenCloud GmbH <mail@opencloud.eu>
// SPDX-License-Identifier: Apache-2.0
package tree
import (
"context"
"encoding/json"
"path/filepath"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
kafka "github.com/segmentio/kafka-go"
)
const (
CEPH_MDS_NOTIFY_ACCESS = 0x0000000000000001 // 1
CEPH_MDS_NOTIFY_ATTRIB = 0x0000000000000002 // 2
CEPH_MDS_NOTIFY_CLOSE_WRITE = 0x0000000000000004 // 4
CEPH_MDS_NOTIFY_CLOSE_NOWRITE = 0x0000000000000008 // 8
CEPH_MDS_NOTIFY_CREATE = 0x0000000000000010 // 16
CEPH_MDS_NOTIFY_DELETE = 0x0000000000000020 // 32
CEPH_MDS_NOTIFY_DELETE_SELF = 0x0000000000000040 // 64
CEPH_MDS_NOTIFY_MODIFY = 0x0000000000000080 // 128
CEPH_MDS_NOTIFY_MOVE_SELF = 0x0000000000000100 // 256
CEPH_MDS_NOTIFY_MOVED_FROM = 0x0000000000000200 // 512
CEPH_MDS_NOTIFY_MOVED_TO = 0x0000000000000400 // 1024
CEPH_MDS_NOTIFY_OPEN = 0x0000000000000800 // 2048
CEPH_MDS_NOTIFY_CLOSE = 0x0000000000001000 // 4096
CEPH_MDS_NOTIFY_MOVE = 0x0000000000002000 // 8192
CEPH_MDS_NOTIFY_ONESHOT = 0x0000000000004000 // 16384
CEPH_MDS_NOTIFY_IGNORED = 0x0000000000008000 // 32768
CEPH_MDS_NOTIFY_ONLYDIR = 0x0000000000010000 // 65536
)
type CephFSWatcher struct {
tree *Tree
root string
brokers []string
log *zerolog.Logger
}
func NewCephfsWatcher(tree *Tree, brokers []string, log *zerolog.Logger) (*CephFSWatcher, error) {
return &CephFSWatcher{
tree: tree,
root: tree.options.WatchRoot,
brokers: brokers,
log: log,
}, nil
}
type cephfsEvent struct {
// Mask/Path are the event mask and path of the according entity
Mask int `json:"mask"`
Path string `json:"path"`
// Src*/Dst* are emitted for the source and destination of move events
SrcMask int `json:"src_mask"`
SrcPath string `json:"src_path"`
DestMask int `json:"dest_mask"`
DestPath string `json:"dest_path"`
}
func (w *CephFSWatcher) Watch(topic string) {
w.log.Info().Str("topic", topic).Msg("cephfs watcher watching topic")
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: w.brokers,
GroupID: "opencloud-posixfs",
Topic: topic,
})
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
log.Error().Err(err).Msg("error reading message")
continue
}
ev := &cephfsEvent{}
err = json.Unmarshal(m.Value, ev)
if err != nil {
w.log.Error().Err(err).Msg("error unmarshalling message")
continue
}
if w.tree.isIgnored(ev.Path) {
continue
}
mask := ev.Mask
path := filepath.Join(w.tree.options.WatchRoot, ev.Path)
if ev.DestMask > 0 {
mask = ev.DestMask
path = filepath.Join(w.tree.options.WatchRoot, ev.DestPath)
}
isDir := mask&CEPH_MDS_NOTIFY_ONLYDIR > 0
go func() {
switch {
case mask&CEPH_MDS_NOTIFY_DELETE > 0:
err = w.tree.Scan(path, ActionDelete, isDir)
case mask&CEPH_MDS_NOTIFY_CREATE > 0 || mask&CEPH_MDS_NOTIFY_MOVED_TO > 0:
if ev.SrcMask > 0 {
// This is a move, clean up the old path
err = w.tree.Scan(filepath.Join(w.tree.options.WatchRoot, ev.SrcPath), ActionMoveFrom, isDir)
}
err = w.tree.Scan(path, ActionCreate, isDir)
case mask&CEPH_MDS_NOTIFY_CLOSE_WRITE > 0:
err = w.tree.Scan(path, ActionUpdate, isDir)
case mask&CEPH_MDS_NOTIFY_CLOSE > 0:
// ignore, already handled by CLOSE_WRITE
default:
w.log.Trace().Interface("event", ev).Msg("unhandled event")
return
}
if err != nil {
w.log.Error().Err(err).Str("path", path).Msg("error scanning file")
}
}()
}
}

View File

@@ -22,6 +22,7 @@ import (
"context"
"encoding/json"
"log"
"path/filepath"
"strconv"
"strings"
@@ -30,16 +31,18 @@ import (
)
type GpfsWatchFolderWatcher struct {
tree *Tree
brokers []string
log *zerolog.Logger
tree *Tree
brokers []string
log *zerolog.Logger
watch_root string
}
func NewGpfsWatchFolderWatcher(tree *Tree, kafkaBrokers []string, log *zerolog.Logger) (*GpfsWatchFolderWatcher, error) {
return &GpfsWatchFolderWatcher{
tree: tree,
brokers: kafkaBrokers,
log: log,
tree: tree,
brokers: kafkaBrokers,
watch_root: tree.options.WatchRoot,
log: log,
}, nil
}
@@ -66,30 +69,32 @@ func (w *GpfsWatchFolderWatcher) Watch(topic string) {
continue
}
path := filepath.Join(w.watch_root, lwev.Path)
go func() {
isDir := strings.Contains(lwev.Event, "IN_ISDIR")
var err error
switch {
case strings.Contains(lwev.Event, "IN_DELETE"):
err = w.tree.Scan(lwev.Path, ActionDelete, isDir)
err = w.tree.Scan(path, ActionDelete, isDir)
case strings.Contains(lwev.Event, "IN_MOVE_FROM"):
err = w.tree.Scan(lwev.Path, ActionMoveFrom, isDir)
err = w.tree.Scan(path, ActionMoveFrom, isDir)
case strings.Contains(lwev.Event, "IN_CREATE"):
err = w.tree.Scan(lwev.Path, ActionCreate, isDir)
err = w.tree.Scan(path, ActionCreate, isDir)
case strings.Contains(lwev.Event, "IN_CLOSE_WRITE"):
bytesWritten, convErr := strconv.Atoi(lwev.BytesWritten)
if convErr == nil && bytesWritten > 0 {
err = w.tree.Scan(lwev.Path, ActionUpdate, isDir)
err = w.tree.Scan(path, ActionUpdate, isDir)
}
case strings.Contains(lwev.Event, "IN_MOVED_TO"):
err = w.tree.Scan(lwev.Path, ActionMove, isDir)
err = w.tree.Scan(path, ActionMove, isDir)
}
if err != nil {
w.log.Error().Err(err).Str("path", lwev.Path).Msg("error scanning path")
w.log.Error().Err(err).Str("path", path).Msg("error scanning path")
}
}()
}

View File

@@ -117,9 +117,12 @@ func New(lu node.PathLookup, bs node.Blobstore, um usermapper.Mapper, trashbin *
if o.WatchFS {
watchPath := o.WatchPath
var err error
t.log.Info().Str("watch type", o.WatchType).Str("path", o.WatchPath).Str("root", o.WatchRoot).
Str("brokers", o.WatchNotificationBrokers).Msg("Watching fs")
switch o.WatchType {
case "gpfswatchfolder":
t.watcher, err = NewGpfsWatchFolderWatcher(t, strings.Split(o.WatchFolderKafkaBrokers, ","), log)
t.watcher, err = NewGpfsWatchFolderWatcher(t, strings.Split(o.WatchNotificationBrokers, ","), log)
if err != nil {
return nil, err
}
@@ -128,6 +131,11 @@ func New(lu node.PathLookup, bs node.Blobstore, um usermapper.Mapper, trashbin *
if err != nil {
return nil, err
}
case "cephfs":
t.watcher, err = NewCephfsWatcher(t, strings.Split(o.WatchNotificationBrokers, ","), log)
if err != nil {
return nil, err
}
default:
t.watcher, err = NewInotifyWatcher(t, o, log)
if err != nil {

View File

@@ -46,7 +46,12 @@ func (HybridBackend) Name() string { return "hybrid" }
// IdentifyPath returns the space id, node id and mtime of a file
func (b HybridBackend) IdentifyPath(_ context.Context, path string) (string, string, string, time.Time, error) {
spaceID, _ := xattr.Get(path, prefixes.SpaceIDAttr)
spaceID, err := xattr.Get(path, prefixes.SpaceIDAttr)
if err != nil {
if IsNotExist(err) {
return "", "", "", time.Time{}, err
}
}
id, _ := xattr.Get(path, prefixes.IDAttr)
parentID, _ := xattr.Get(path, prefixes.ParentidAttr)

View File

@@ -292,6 +292,13 @@ func (session *DecomposedFsSession) Finalize(ctx context.Context) (err error) {
revisionNode := node.New(session.SpaceID(), session.NodeID(), "", "", session.Size(), session.ID(),
provider.ResourceType_RESOURCE_TYPE_FILE, session.SpaceOwner(), session.store.lu)
// lock the node before writing the blob
unlock, err := session.store.lu.MetadataBackend().Lock(revisionNode)
if err != nil {
return err
}
defer func() { _ = unlock() }()
// upload the data to the blobstore
_, subspan := tracer.Start(ctx, "WriteBlob")
err = session.store.tp.WriteBlob(revisionNode, session.binPath())

View File

@@ -0,0 +1,30 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheus
// CollectorFunc is a convenient way to implement a Prometheus Collector
// without interface boilerplate.
// This implementation is based on DescribeByCollect method.
// familiarize yourself to it before using.
type CollectorFunc func(chan<- Metric)
// Collect calls the defined CollectorFunc function with the provided Metrics channel
func (f CollectorFunc) Collect(ch chan<- Metric) {
f(ch)
}
// Describe sends the descriptor information using DescribeByCollect
func (f CollectorFunc) Describe(ch chan<- *Desc) {
DescribeByCollect(f, ch)
}

View File

@@ -41,11 +41,11 @@ import (
"sync"
"time"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/client_golang/internal/github.com/golang/gddo/httputil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp/internal"
)
const (
@@ -65,7 +65,13 @@ const (
Zstd Compression = "zstd"
)
var defaultCompressionFormats = []Compression{Identity, Gzip, Zstd}
func defaultCompressionFormats() []Compression {
if internal.NewZstdWriter != nil {
return []Compression{Identity, Gzip, Zstd}
} else {
return []Compression{Identity, Gzip}
}
}
var gzipPool = sync.Pool{
New: func() interface{} {
@@ -138,7 +144,7 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO
// Select compression formats to offer based on default or user choice.
var compressions []string
if !opts.DisableCompression {
offers := defaultCompressionFormats
offers := defaultCompressionFormats()
if len(opts.OfferedCompressions) > 0 {
offers = opts.OfferedCompressions
}
@@ -466,14 +472,12 @@ func negotiateEncodingWriter(r *http.Request, rw io.Writer, compressions []strin
switch selected {
case "zstd":
// TODO(mrueg): Replace klauspost/compress with stdlib implementation once https://github.com/golang/go/issues/62513 is implemented.
z, err := zstd.NewWriter(rw, zstd.WithEncoderLevel(zstd.SpeedFastest))
if err != nil {
return nil, "", func() {}, err
if internal.NewZstdWriter == nil {
// The content encoding was not implemented yet.
return nil, "", func() {}, fmt.Errorf("content compression format not recognized: %s. Valid formats are: %s", selected, defaultCompressionFormats())
}
z.Reset(rw)
return z, selected, func() { _ = z.Close() }, nil
writer, closeWriter, err := internal.NewZstdWriter(rw)
return writer, selected, closeWriter, err
case "gzip":
gz := gzipPool.Get().(*gzip.Writer)
gz.Reset(rw)
@@ -483,6 +487,6 @@ func negotiateEncodingWriter(r *http.Request, rw io.Writer, compressions []strin
return rw, selected, func() {}, nil
default:
// The content encoding was not implemented yet.
return nil, "", func() {}, fmt.Errorf("content compression format not recognized: %s. Valid formats are: %s", selected, defaultCompressionFormats)
return nil, "", func() {}, fmt.Errorf("content compression format not recognized: %s. Valid formats are: %s", selected, defaultCompressionFormats())
}
}

View File

@@ -0,0 +1,21 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"io"
)
// NewZstdWriter enables zstd write support if non-nil.
var NewZstdWriter func(rw io.Writer) (_ io.Writer, closeWriter func(), _ error)

13
vendor/modules.txt vendored
View File

@@ -271,8 +271,8 @@ github.com/cenkalti/backoff
# github.com/cenkalti/backoff/v4 v4.3.0
## explicit; go 1.18
github.com/cenkalti/backoff/v4
# github.com/ceph/go-ceph v0.32.0
## explicit; go 1.21
# github.com/ceph/go-ceph v0.33.0
## explicit; go 1.23.0
github.com/ceph/go-ceph/cephfs
github.com/ceph/go-ceph/cephfs/admin
github.com/ceph/go-ceph/common/admin/manager
@@ -597,7 +597,7 @@ github.com/go-redis/redis/v8/internal/util
# github.com/go-resty/resty/v2 v2.7.0
## explicit; go 1.11
github.com/go-resty/resty/v2
# github.com/go-sql-driver/mysql v1.9.1
# github.com/go-sql-driver/mysql v1.9.2
## explicit; go 1.21.0
github.com/go-sql-driver/mysql
# github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572
@@ -1197,7 +1197,7 @@ github.com/open-policy-agent/opa/v1/types
github.com/open-policy-agent/opa/v1/util
github.com/open-policy-agent/opa/v1/util/decoding
github.com/open-policy-agent/opa/v1/version
# github.com/opencloud-eu/reva/v2 v2.31.0
# github.com/opencloud-eu/reva/v2 v2.32.0
## explicit; go 1.24.1
github.com/opencloud-eu/reva/v2/cmd/revad/internal/grace
github.com/opencloud-eu/reva/v2/cmd/revad/runtime
@@ -1642,14 +1642,15 @@ github.com/prometheus/alertmanager/matcher/parse
github.com/prometheus/alertmanager/pkg/labels
github.com/prometheus/alertmanager/template
github.com/prometheus/alertmanager/types
# github.com/prometheus/client_golang v1.21.1
## explicit; go 1.21
# github.com/prometheus/client_golang v1.22.0
## explicit; go 1.22
github.com/prometheus/client_golang/internal/github.com/golang/gddo/httputil
github.com/prometheus/client_golang/internal/github.com/golang/gddo/httputil/header
github.com/prometheus/client_golang/prometheus
github.com/prometheus/client_golang/prometheus/internal
github.com/prometheus/client_golang/prometheus/promauto
github.com/prometheus/client_golang/prometheus/promhttp
github.com/prometheus/client_golang/prometheus/promhttp/internal
# github.com/prometheus/client_model v0.6.1
## explicit; go 1.19
github.com/prometheus/client_model/go