From 8aac5f6318bdbe0cf7211e9be9f0acbc45685d9d Mon Sep 17 00:00:00 2001 From: Viktor Scharf Date: Mon, 15 Dec 2025 17:03:58 +0100 Subject: [PATCH] reva-bump-2.41.0 (#2032) --- go.mod | 2 +- go.sum | 4 +- .../pkg/storage/fs/posix/options/options.go | 17 ++ .../pkg/storage/fs/posix/trashbin/trashbin.go | 5 + .../pkg/storage/fs/posix/tree/assimilation.go | 81 +++--- .../storage/fs/posix/tree/cephfswatcher.go | 11 +- .../posix/tree/gpfsfilauditloggingwatcher.go | 7 +- .../fs/posix/tree/gpfswatchfolderwatcher.go | 11 +- .../storage/fs/posix/tree/inotifywatcher.go | 11 +- .../reva/v2/pkg/storage/fs/posix/tree/tree.go | 34 ++- .../pkg/storage/fs/posix/watcher/actions.go | 11 + .../posix/watcher/natswatcher/natswatcher.go | 236 ++++++++++++++++++ .../decomposedfs/metadata/hybrid_backend.go | 5 +- vendor/modules.txt | 4 +- 14 files changed, 370 insertions(+), 69 deletions(-) create mode 100644 vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher/actions.go create mode 100644 vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher/natswatcher/natswatcher.go diff --git a/go.mod b/go.mod index f4dd437f0a..8ef5795248 100644 --- a/go.mod +++ b/go.mod @@ -64,7 +64,7 @@ require ( github.com/open-policy-agent/opa v1.10.1 github.com/opencloud-eu/icap-client v0.0.0-20250930132611-28a2afe62d89 github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76 - github.com/opencloud-eu/reva/v2 v2.40.1 + github.com/opencloud-eu/reva/v2 v2.41.0 github.com/opensearch-project/opensearch-go/v4 v4.5.0 github.com/orcaman/concurrent-map v1.0.0 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 75e411b265..11bbc87f90 100644 --- a/go.sum +++ b/go.sum @@ -963,8 +963,8 @@ github.com/opencloud-eu/inotifywaitgo v0.0.0-20251111171128-a390bae3c5e9 h1:dIft github.com/opencloud-eu/inotifywaitgo v0.0.0-20251111171128-a390bae3c5e9/go.mod h1:JWyDC6H+5oZRdUJUgKuaye+8Ph5hEs6HVzVoPKzWSGI= github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76 h1:vD/EdfDUrv4omSFjrinT8Mvf+8D7f9g4vgQ2oiDrVUI= github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76/go.mod h1:pzatilMEHZFT3qV7C/X3MqOa3NlRQuYhlRhZTL+hN6Q= -github.com/opencloud-eu/reva/v2 v2.40.1 h1:QwMkbGMhwDSwfk2WxbnTpIig2BugPBaVFjWcy2DSU3U= -github.com/opencloud-eu/reva/v2 v2.40.1/go.mod h1:DGH08n2mvtsQLkt8o15FV6m51FwSJJGhjR8Ty+iIJww= +github.com/opencloud-eu/reva/v2 v2.41.0 h1:oie8+sxcA+drREXRTqm0LmfUdy/mmaa6pA6wkdF6tF4= +github.com/opencloud-eu/reva/v2 v2.41.0/go.mod h1:DGH08n2mvtsQLkt8o15FV6m51FwSJJGhjR8Ty+iIJww= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= diff --git a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/options/options.go b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/options/options.go index 567f0cd3d1..22d3dfe94b 100644 --- a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/options/options.go +++ b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/options/options.go @@ -46,10 +46,27 @@ type Options struct { WatchRoot string `mapstructure:"watch_root"` // base directory for the watch. events will be considered relative to this path WatchNotificationBrokers string `mapstructure:"watch_notification_brokers"` + NatsWatcher NatsWatcherConfig `mapstructure:"natswatcher"` + // InotifyWatcher specific options InotifyStatsFrequency time.Duration `mapstructure:"inotify_stats_frequency"` } +// NatsWatcherConfig is the configuration needed for a NATS watcher event stream. +type NatsWatcherConfig struct { + Endpoint string `mapstructure:"address"` + Cluster string `mapstructure:"clusterID"` + Stream string `mapstructure:"stream"` + Durable string `mapstructure:"durable-name"` + TLSInsecure bool `mapstructure:"tls-insecure"` + TLSRootCACertificate string `mapstructure:"tls-root-ca-cert"` + EnableTLS bool `mapstructure:"enable-tls"` + AuthUsername string `mapstructure:"username"` + AuthPassword string `mapstructure:"password"` + MaxAckPending int `mapstructure:"max-ack-pending"` + AckWait time.Duration `mapstructure:"ack-wait"` +} + // New returns a new Options instance for the given configuration func New(m map[string]interface{}) (*Options, error) { // default to hybrid metadatabackend for posixfs diff --git a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/trashbin/trashbin.go b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/trashbin/trashbin.go index 60f5b6be42..e9dd136269 100644 --- a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/trashbin/trashbin.go +++ b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/trashbin/trashbin.go @@ -21,6 +21,7 @@ package trashbin import ( "context" "fmt" + "io" "io/fs" "os" "path/filepath" @@ -589,6 +590,10 @@ func (tb *Trashbin) IsEmpty(ctx context.Context, spaceID string) bool { } dirItems, err := trash.ReadDir(1) if err != nil { + if err == io.EOF { + // empty trash + return true + } // if we cannot read the trash, we assume there are no trashed items tb.log.Error().Err(err).Str("spaceID", spaceID).Msg("trashbin: error reading trash directory") return true diff --git a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/assimilation.go b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/assimilation.go index b21f2050e0..b660fb1350 100644 --- a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/assimilation.go +++ b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/assimilation.go @@ -39,6 +39,7 @@ import ( provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/opencloud-eu/reva/v2/pkg/events" + "github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher" "github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/metadata" "github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/metadata/prefixes" "github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/node" @@ -54,16 +55,6 @@ type ScanDebouncer struct { mutex sync.Mutex } -type EventAction int - -const ( - ActionCreate EventAction = iota - ActionUpdate - ActionMove - ActionDelete - ActionMoveFrom -) - type queueItem struct { item scanItem timer *time.Timer @@ -190,10 +181,10 @@ func (t *Tree) workScanQueue() { } // Scan scans the given path and updates the id chache -func (t *Tree) Scan(path string, action EventAction, isDir bool) error { +func (t *Tree) Scan(path string, action watcher.EventAction, isDir bool) error { // cases: switch action { - case ActionCreate: + case watcher.ActionCreate: t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionCreate)") if !isDir { // 1. New file (could be emitted as part of a new directory) @@ -225,7 +216,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error { }) } - case ActionUpdate: + case watcher.ActionUpdate: t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionUpdate)") // 3. Updated file // -> update file unless parent directory is being rescanned @@ -241,7 +232,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error { AssimilationCounter.WithLabelValues(_labelDir, _labelUpdated).Inc() } - case ActionMove: + case watcher.ActionMove: t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionMove)") // 4. Moved file // -> update file @@ -258,7 +249,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error { AssimilationCounter.WithLabelValues(_labelDir, _labelMoved).Inc() } - case ActionMoveFrom: + case watcher.ActionMoveFrom: t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionMoveFrom)") // 6. file/directory moved out of the watched directory // -> remove from caches @@ -279,7 +270,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error { // We do not do metrics here because this has been handled in `ActionMove` - case ActionDelete: + case watcher.ActionDelete: t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("handling deleted item") // 7. Deleted file or directory @@ -426,6 +417,15 @@ func (t *Tree) assimilate(item scanItem) error { } } + fi, err := os.Lstat(item.Path) + if err != nil { + return err + } + if !fi.IsDir() && !fi.Mode().IsRegular() { + t.log.Trace().Str("path", item.Path).Msg("skipping non-regular file") + return nil + } + if id != "" { // the file has an id set, we already know it from the past @@ -451,20 +451,10 @@ func (t *Tree) assimilate(item scanItem) error { // compare metadata mtime with actual mtime. if it matches AND the path hasn't changed (move operation) // we can skip the assimilation because the file was handled by us - fi, err := os.Lstat(item.Path) - if err != nil { - return err - } - if previousPath == item.Path && mtime.Equal(fi.ModTime()) { return nil } - if !fi.IsDir() && !fi.Mode().IsRegular() { - t.log.Trace().Str("path", item.Path).Msg("skipping non-regular file") - return nil - } - // was it moved or copied/restored with a clashing id? if ok && len(parentID) > 0 && previousPath != item.Path { _, err := os.Stat(previousPath) @@ -675,6 +665,7 @@ assimilate: } var n *node.Node + sizeDiff := int64(0) if fi.IsDir() { // The Space's name attribute might not match the directory name. Use the name as // it was set before. Also the space root doesn't have a 'type' attribute @@ -712,44 +703,46 @@ assimilate: n.SpaceRoot = &node.Node{BaseNode: node.BaseNode{SpaceID: spaceID, ID: spaceID}} prevBlobSize, err := previousAttribs.Int64(prefixes.BlobsizeAttr) - if err == nil && prevBlobSize != fi.Size() { - // file size changed, trigger propagation of tree size changes - err = t.Propagate(context.Background(), n, fi.Size()-prevBlobSize) - if err != nil { - t.log.Error().Err(err).Str("path", path).Msg("could not propagate tree size changes") - } + if err != nil || prevBlobSize < 0 { + prevBlobSize = 0 + } + if prevBlobSize != fi.Size() { + sizeDiff = fi.Size() - prevBlobSize } } attributes.SetTime(prefixes.MTimeAttr, fi.ModTime()) n.SpaceRoot = &node.Node{BaseNode: node.BaseNode{SpaceID: spaceID, ID: spaceID}} - if t.options.EnableFSRevisions { + if !fi.IsDir() && t.options.EnableFSRevisions { go func() { // Copy the previous current version to a revision currentNode := node.NewBaseNode(n.SpaceID, n.ID+node.CurrentIDDelimiter, t.lookup) currentPath := currentNode.InternalPath() stat, err := os.Stat(currentPath) - if err != nil { - t.log.Error().Err(err).Str("path", path).Str("currentPath", currentPath).Msg("could not stat current path") - return - } - revisionPath := t.lookup.VersionPath(n.SpaceID, n.ID, stat.ModTime().UTC().Format(time.RFC3339Nano)) + if err == nil { + revisionPath := t.lookup.VersionPath(n.SpaceID, n.ID, stat.ModTime().UTC().Format(time.RFC3339Nano)) - err = os.Rename(currentPath, revisionPath) - if err != nil { - t.log.Error().Err(err).Str("path", path).Str("revisionPath", revisionPath).Msg("could not create revision") - return + err = os.Rename(currentPath, revisionPath) + if err != nil { + t.log.Error().Err(err).Str("path", path).Str("revisionPath", revisionPath).Msg("could not create revision") + return + } } // Copy the new version to the current version + if err := os.MkdirAll(filepath.Dir(currentPath), 0700); err != nil { + t.log.Error().Err(err).Str("path", path).Str("currentPath", currentPath).Msg("could not create base path for current file") + return + } + w, err := os.OpenFile(currentPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) if err != nil { t.log.Error().Err(err).Str("path", path).Str("currentPath", currentPath).Msg("could not open current path for writing") return } defer w.Close() - r, err := os.OpenFile(n.InternalPath(), os.O_RDONLY, 0600) + r, err := os.OpenFile(path, os.O_RDONLY, 0600) if err != nil { t.log.Error().Err(err).Str("path", path).Msg("could not open file for reading") return @@ -775,7 +768,7 @@ assimilate: }() } - err = t.Propagate(context.Background(), n, 0) + err = t.Propagate(context.Background(), n, sizeDiff) if err != nil { return nil, nil, errors.Wrap(err, "failed to propagate") } diff --git a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/cephfswatcher.go b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/cephfswatcher.go index 6c278dd10c..540907afda 100644 --- a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/cephfswatcher.go +++ b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/cephfswatcher.go @@ -8,6 +8,7 @@ import ( "encoding/json" "path/filepath" + "github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher" "github.com/rs/zerolog" "github.com/rs/zerolog/log" kafka "github.com/segmentio/kafka-go" @@ -97,17 +98,17 @@ func (w *CephFSWatcher) Watch(topic string) { go func() { switch { case mask&CEPH_MDS_NOTIFY_DELETE > 0: - err = w.tree.Scan(path, ActionDelete, isDir) + err = w.tree.Scan(path, watcher.ActionDelete, isDir) case mask&CEPH_MDS_NOTIFY_MOVED_TO > 0: if ev.SrcMask > 0 { // This is a move, clean up the old path - err = w.tree.Scan(filepath.Join(w.tree.options.WatchRoot, ev.SrcPath), ActionMoveFrom, isDir) + err = w.tree.Scan(filepath.Join(w.tree.options.WatchRoot, ev.SrcPath), watcher.ActionMoveFrom, isDir) } - err = w.tree.Scan(path, ActionMove, isDir) + err = w.tree.Scan(path, watcher.ActionMove, isDir) case mask&CEPH_MDS_NOTIFY_CREATE > 0: - err = w.tree.Scan(path, ActionCreate, isDir) + err = w.tree.Scan(path, watcher.ActionCreate, isDir) case mask&CEPH_MDS_NOTIFY_CLOSE_WRITE > 0: - err = w.tree.Scan(path, ActionUpdate, isDir) + err = w.tree.Scan(path, watcher.ActionUpdate, isDir) case mask&CEPH_MDS_NOTIFY_CLOSE > 0: // ignore, already handled by CLOSE_WRITE default: diff --git a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go index c00f93474f..b43f23ea13 100644 --- a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go +++ b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go @@ -26,6 +26,7 @@ import ( "strconv" "time" + "github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher" "github.com/rs/zerolog" ) @@ -88,15 +89,15 @@ start: go func() { switch ev.Event { case "CREATE": - err = w.tree.Scan(ev.Path, ActionCreate, false) + err = w.tree.Scan(ev.Path, watcher.ActionCreate, false) case "CLOSE": var bytesWritten int bytesWritten, err = strconv.Atoi(ev.BytesWritten) if err == nil && bytesWritten > 0 { - err = w.tree.Scan(ev.Path, ActionUpdate, false) + err = w.tree.Scan(ev.Path, watcher.ActionUpdate, false) } case "RENAME": - err = w.tree.Scan(ev.Path, ActionMove, false) + err = w.tree.Scan(ev.Path, watcher.ActionMove, false) if warmupErr := w.tree.WarmupIDCache(ev.Path, false, false); warmupErr != nil { w.log.Error().Err(warmupErr).Str("path", ev.Path).Msg("error warming up id cache") } diff --git a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/gpfswatchfolderwatcher.go b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/gpfswatchfolderwatcher.go index c79440ddd3..93d1b6218e 100644 --- a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/gpfswatchfolderwatcher.go +++ b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/gpfswatchfolderwatcher.go @@ -26,6 +26,7 @@ import ( "strconv" "strings" + "github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher" "github.com/rs/zerolog" kafka "github.com/segmentio/kafka-go" ) @@ -77,21 +78,21 @@ func (w *GpfsWatchFolderWatcher) Watch(topic string) { var err error switch { case strings.Contains(lwev.Event, "IN_DELETE"): - err = w.tree.Scan(path, ActionDelete, isDir) + err = w.tree.Scan(path, watcher.ActionDelete, isDir) case strings.Contains(lwev.Event, "IN_MOVE_FROM"): - err = w.tree.Scan(path, ActionMoveFrom, isDir) + err = w.tree.Scan(path, watcher.ActionMoveFrom, isDir) case strings.Contains(lwev.Event, "IN_CREATE"): - err = w.tree.Scan(path, ActionCreate, isDir) + err = w.tree.Scan(path, watcher.ActionCreate, isDir) case strings.Contains(lwev.Event, "IN_CLOSE_WRITE"): bytesWritten, convErr := strconv.Atoi(lwev.BytesWritten) if convErr == nil && bytesWritten > 0 { - err = w.tree.Scan(path, ActionUpdate, isDir) + err = w.tree.Scan(path, watcher.ActionUpdate, isDir) } case strings.Contains(lwev.Event, "IN_MOVED_TO"): - err = w.tree.Scan(path, ActionMove, isDir) + err = w.tree.Scan(path, watcher.ActionMove, isDir) } if err != nil { w.log.Error().Err(err).Str("path", path).Msg("error scanning path") diff --git a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/inotifywatcher.go b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/inotifywatcher.go index 14cf329779..d5a1b475a3 100644 --- a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/inotifywatcher.go +++ b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/inotifywatcher.go @@ -30,6 +30,7 @@ import ( "time" "github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/options" + "github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher" "github.com/pablodz/inotifywaitgo/inotifywaitgo" "github.com/rs/zerolog" slogzerolog "github.com/samber/slog-zerolog/v2" @@ -96,15 +97,15 @@ func (iw *InotifyWatcher) Watch(path string) { var err error switch e { case inotifywaitgo.DELETE: - err = iw.tree.Scan(event.Filename, ActionDelete, event.IsDir) + err = iw.tree.Scan(event.Filename, watcher.ActionDelete, event.IsDir) case inotifywaitgo.MOVED_FROM: - err = iw.tree.Scan(event.Filename, ActionMoveFrom, event.IsDir) + err = iw.tree.Scan(event.Filename, watcher.ActionMoveFrom, event.IsDir) case inotifywaitgo.MOVED_TO: - err = iw.tree.Scan(event.Filename, ActionMove, event.IsDir) + err = iw.tree.Scan(event.Filename, watcher.ActionMove, event.IsDir) case inotifywaitgo.CREATE: - err = iw.tree.Scan(event.Filename, ActionCreate, event.IsDir) + err = iw.tree.Scan(event.Filename, watcher.ActionCreate, event.IsDir) case inotifywaitgo.CLOSE_WRITE: - err = iw.tree.Scan(event.Filename, ActionUpdate, event.IsDir) + err = iw.tree.Scan(event.Filename, watcher.ActionUpdate, event.IsDir) case inotifywaitgo.CLOSE: // ignore, already handled by CLOSE_WRITE default: diff --git a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/tree.go b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/tree.go index 24256a60bd..c4852f571a 100644 --- a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/tree.go +++ b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree/tree.go @@ -47,6 +47,7 @@ import ( "github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/lookup" "github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/options" "github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/trashbin" + "github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher/natswatcher" "github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs" "github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/metadata" "github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/metadata/prefixes" @@ -147,6 +148,11 @@ func New(lu node.PathLookup, bs node.Blobstore, um usermapper.Mapper, trashbin * if err != nil { return nil, err } + case "natswatcher": + t.watcher, err = natswatcher.New(context.TODO(), t, o.NatsWatcher, o.WatchRoot, log) + if err != nil { + return nil, err + } default: t.watcher, err = NewInotifyWatcher(t, o, log) if err != nil { @@ -499,8 +505,18 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro _, nodeID, err := t.lookup.IDsForPath(ctx, path) if err != nil { - t.log.Error().Err(err).Str("path", path).Msg("failed to get ids for entry") - continue + // we don't know about this node yet for some reason, assimilate it on the fly + t.log.Info().Err(err).Str("path", path).Msg("encountered unknown entity while listing the directory. Assimilate.") + err = t.assimilate(scanItem{Path: path}) + if err != nil { + t.log.Error().Err(err).Str("path", path).Msg("failed to assimilate node") + continue + } + _, nodeID, err = t.lookup.IDsForPath(ctx, path) + if err != nil || nodeID == "" { + t.log.Error().Err(err).Str("path", path).Msg("still could not resolve node after assimilation") + continue + } } child, err := node.ReadNode(ctx, t.lookup, n.SpaceID, nodeID, false, n.SpaceRoot, true) @@ -708,9 +724,23 @@ func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) { t.log.Error().Err(err).Str("spaceID", n.SpaceID).Str("id", n.ID).Str("path", path).Msg("could not cache id") } + // Write mtime from filesystem to metadata to preven re-assimilation + d, err := os.Open(path) + if err != nil { + + return err + } + fi, err := d.Stat() + if err != nil { + return err + } + mtime := fi.ModTime() + attributes := n.NodeMetadata(ctx) + attributes[prefixes.MTimeAttr] = []byte(mtime.UTC().Format(time.RFC3339Nano)) attributes[prefixes.IDAttr] = []byte(n.ID) attributes[prefixes.TreesizeAttr] = []byte("0") // initialize as empty, TODO why bother? if it is not set we could treat it as 0? + if t.options.TreeTimeAccounting || t.options.TreeSizeAccounting { attributes[prefixes.PropagationAttr] = []byte("1") // mark the node for propagation } diff --git a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher/actions.go b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher/actions.go new file mode 100644 index 0000000000..145c9203ad --- /dev/null +++ b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher/actions.go @@ -0,0 +1,11 @@ +package watcher + +type EventAction int + +const ( + ActionCreate EventAction = iota + ActionUpdate + ActionMove + ActionDelete + ActionMoveFrom +) diff --git a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher/natswatcher/natswatcher.go b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher/natswatcher/natswatcher.go new file mode 100644 index 0000000000..a5d97440bf --- /dev/null +++ b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher/natswatcher/natswatcher.go @@ -0,0 +1,236 @@ +package natswatcher + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/cenkalti/backoff" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/options" + "github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher" + "github.com/rs/zerolog" + "github.com/vmihailenco/msgpack/v5" +) + +// natsEvent represents the event encoded in MessagePack. +// we abbreviate the the properties to save some space +type natsEvent struct { + Event string `msgpack:"e"` + Path string `msgpack:"p,omitempty"` + ToPath string `msgpack:"t,omitempty"` + IsDir bool `msgpack:"d,omitempty"` +} + +// NatsWatcher consumes filesystem-style events from NATS JetStream. +type NatsWatcher struct { + ctx context.Context + tree Scannable + log *zerolog.Logger + watchRoot string + config options.NatsWatcherConfig +} + +type Scannable interface { + Scan(path string, action watcher.EventAction, isDir bool) error +} + +// NewNatsWatcher creates a new NATS watcher. +func New(ctx context.Context, tree Scannable, cfg options.NatsWatcherConfig, watchRoot string, log *zerolog.Logger) (*NatsWatcher, error) { + return &NatsWatcher{ + ctx: ctx, + tree: tree, + log: log, + watchRoot: watchRoot, + config: cfg, + }, nil +} + +// Watch starts consuming events from a NATS JetStream subject +func (w *NatsWatcher) Watch(path string) { + w.log.Info().Str("stream", w.config.Stream).Msg("starting NATS watcher with auto-reconnect") + + for { + select { + case <-w.ctx.Done(): + w.log.Debug().Msg("context cancelled, stopping NATS watcher") + return + default: + } + + // Try to connect with exponential backoff + nc, js, err := w.connectWithBackoff() + if err != nil { + w.log.Error().Err(err).Msg("failed to establish NATS connection after retries") + time.Sleep(5 * time.Second) + continue + } + + if err := w.consume(js); err != nil { + w.log.Error().Err(err).Msg("NATS consumer exited with error, reconnecting") + } + + _ = nc.Drain() + nc.Close() + time.Sleep(2 * time.Second) + } +} + +// connectWithBackoff repeatedly attempts to connect to NATS JetStream with exponential backoff. +func (w *NatsWatcher) connectWithBackoff() (*nats.Conn, jetstream.JetStream, error) { + var nc *nats.Conn + var js jetstream.JetStream + + b := backoff.NewExponentialBackOff() + b.InitialInterval = 1 * time.Second + b.MaxInterval = 30 * time.Second + b.MaxElapsedTime = 0 // never stop + + connect := func() error { + select { + case <-w.ctx.Done(): + return backoff.Permanent(w.ctx.Err()) + default: + } + + var err error + nc, err = w.connect() + if err != nil { + w.log.Warn().Err(err).Msg("failed to connect to NATS, retrying") + return err + } + + js, err = jetstream.New(nc) + if err != nil { + nc.Close() + w.log.Warn().Err(err).Msg("failed to create jetstream context, retrying") + return err + } + + w.log.Info().Str("endpoint", w.config.Endpoint).Msg("connected to NATS JetStream") + return nil + } + + if err := backoff.Retry(connect, backoff.WithContext(b, w.ctx)); err != nil { + return nil, nil, err + } + return nc, js, nil +} + +// consume subscribes to JetStream and handles messages. +func (w *NatsWatcher) consume(js jetstream.JetStream) error { + stream, err := js.Stream(w.ctx, w.config.Stream) + if err != nil { + return fmt.Errorf("failed to get stream: %w", err) + } + + consumer, err := stream.CreateOrUpdateConsumer(w.ctx, jetstream.ConsumerConfig{ + Durable: w.config.Durable, + AckPolicy: jetstream.AckExplicitPolicy, + MaxAckPending: w.config.MaxAckPending, + AckWait: w.config.AckWait, + }) + if err != nil { + return fmt.Errorf("failed to create consumer: %w", err) + } + w.log.Info(). + Str("stream", w.config.Stream). + Msg("started consuming from JetStream") + + _, err = consumer.Consume(func(msg jetstream.Msg) { + defer func() { + if ackErr := msg.Ack(); ackErr != nil { + w.log.Warn().Err(ackErr).Msg("failed to ack message") + } + }() + + var ev natsEvent + if err := msgpack.Unmarshal(msg.Data(), &ev); err != nil { + w.log.Error().Err(err).Msg("failed to decode MessagePack event") + return + } + + w.handleEvent(ev) + }) + + if err != nil { + return fmt.Errorf("consumer error: %w", err) + } + + <-w.ctx.Done() + return w.ctx.Err() +} + +// connect establishes a single NATS connection with optional TLS and auth. +func (w *NatsWatcher) connect() (*nats.Conn, error) { + var tlsConf *tls.Config + if w.config.EnableTLS { + var rootCAPool *x509.CertPool + if w.config.TLSRootCACertificate != "" { + rootCrtFile, err := os.ReadFile(w.config.TLSRootCACertificate) + if err != nil { + return nil, fmt.Errorf("failed to read root CA: %w", err) + } + rootCAPool = x509.NewCertPool() + rootCAPool.AppendCertsFromPEM(rootCrtFile) + w.config.TLSInsecure = false + } + tlsConf = &tls.Config{ + MinVersion: tls.VersionTLS12, + InsecureSkipVerify: w.config.TLSInsecure, + RootCAs: rootCAPool, + } + } + + opts := []nats.Option{nats.Name("opencloud-posixfs-natswatcher")} + if tlsConf != nil { + opts = append(opts, nats.Secure(tlsConf)) + } + if w.config.AuthUsername != "" && w.config.AuthPassword != "" { + opts = append(opts, nats.UserInfo(w.config.AuthUsername, w.config.AuthPassword)) + } + return nats.Connect(w.config.Endpoint, opts...) +} + +// handleEvent applies the event to the local tree. +func (w *NatsWatcher) handleEvent(ev natsEvent) { + var err error + + // Determine the relevant path + path := filepath.Join(w.watchRoot, ev.Path) + + switch ev.Event { + case "CREATE": + err = w.tree.Scan(path, watcher.ActionCreate, ev.IsDir) + case "MOVED_TO": + err = w.tree.Scan(path, watcher.ActionMove, ev.IsDir) + case "MOVE_FROM": + err = w.tree.Scan(path, watcher.ActionMoveFrom, ev.IsDir) + case "MOVE": // support event with source and target path + err = w.tree.Scan(path, watcher.ActionMoveFrom, ev.IsDir) + if err == nil { + w.log.Error().Err(err).Interface("event", ev).Msg("error processing event") + } + tgt := filepath.Join(w.watchRoot, ev.ToPath) + if tgt == "" { + w.log.Warn().Interface("event", ev).Msg("MOVE event missing target path") + } else { + err = w.tree.Scan(tgt, watcher.ActionMove, ev.IsDir) + } + case "CLOSE_WRITE": + err = w.tree.Scan(path, watcher.ActionUpdate, ev.IsDir) + case "DELETE": + err = w.tree.Scan(path, watcher.ActionDelete, ev.IsDir) + default: + w.log.Warn().Str("event", ev.Event).Msg("unhandled event type") + } + + if err != nil { + w.log.Error().Err(err).Interface("event", ev).Msg("error processing event") + } +} diff --git a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/metadata/hybrid_backend.go b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/metadata/hybrid_backend.go index 368f567f71..d84851c113 100644 --- a/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/metadata/hybrid_backend.go +++ b/vendor/github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/metadata/hybrid_backend.go @@ -2,6 +2,7 @@ package metadata import ( "context" + "fmt" "io" "io/fs" "os" @@ -292,17 +293,19 @@ func (b HybridBackend) SetMultiple(ctx context.Context, n MetadataNode, attribs } } xerrs := 0 + total := 0 var xerr error // error handling: Count if there are errors while setting the attribs. // if there were any, return an error. for key, val := range attribs { + total++ if xerr = xattr.Set(path, key, val); xerr != nil { // log xerrs++ } } if xerrs > 0 { - return errors.Wrap(xerr, "Failed to set all xattrs") + return fmt.Errorf("failed to set %d/%d xattrs: %w", xerrs, total, xerr) } attribs, err = b.getAll(ctx, n, true, false, false) diff --git a/vendor/modules.txt b/vendor/modules.txt index 9c4068de15..767d0da1e4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1368,7 +1368,7 @@ github.com/opencloud-eu/icap-client # github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250724122329-41ba6b191e76 ## explicit; go 1.18 github.com/opencloud-eu/libre-graph-api-go -# github.com/opencloud-eu/reva/v2 v2.40.1 +# github.com/opencloud-eu/reva/v2 v2.41.0 ## explicit; go 1.24.1 github.com/opencloud-eu/reva/v2/cmd/revad/internal/grace github.com/opencloud-eu/reva/v2/cmd/revad/runtime @@ -1682,6 +1682,8 @@ github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/options github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/timemanager github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/trashbin github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/tree +github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher +github.com/opencloud-eu/reva/v2/pkg/storage/fs/posix/watcher/natswatcher github.com/opencloud-eu/reva/v2/pkg/storage/fs/registry github.com/opencloud-eu/reva/v2/pkg/storage/fs/s3ng github.com/opencloud-eu/reva/v2/pkg/storage/fs/s3ng/blobstore