mirror of
https://github.com/rclone/rclone.git
synced 2026-05-12 01:57:56 -04:00
vfs: replace context.TODO/Background with stored VFS context
Add a ctx field to the VFS struct, initialized in New() from the existing cancellable context. Propagate this through the cache subsystem hierarchy. This ensures proper context cancellation when a VFS shuts down, rather than using disconnected context.TODO() or context.Background() calls throughout and paves the way for VFS to have its own config.
This commit is contained in:
33
vfs/dir.go
33
vfs/dir.go
@@ -1,7 +1,6 @@
|
||||
package vfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -64,7 +63,7 @@ func newDir(vfs *VFS, f fs.Fs, parent *Dir, fsDir fs.Directory) *Dir {
|
||||
parent: parent,
|
||||
entry: fsDir,
|
||||
path: fsDir.Remote(),
|
||||
modTime: fsDir.ModTime(context.TODO()),
|
||||
modTime: fsDir.ModTime(vfs.ctx),
|
||||
inode: newInode(),
|
||||
items: make(map[string]Node),
|
||||
}
|
||||
@@ -380,7 +379,7 @@ func (d *Dir) renameTree(dirPath string) {
|
||||
delete(d.parent.items, name(d.path))
|
||||
d.path = dirPath
|
||||
d.parent.items[name(d.path)] = d
|
||||
d.entry = fs.NewDirCopy(context.TODO(), d.entry).SetRemote(dirPath)
|
||||
d.entry = fs.NewDirCopy(d.vfs.ctx, d.entry).SetRemote(dirPath)
|
||||
}
|
||||
|
||||
// Do the same to any child directories and files
|
||||
@@ -404,7 +403,7 @@ func (d *Dir) rename(newParent *Dir, fsDir fs.Directory) {
|
||||
d.ForgetAll()
|
||||
|
||||
d.modTimeMu.Lock()
|
||||
d.modTime = fsDir.ModTime(context.TODO())
|
||||
d.modTime = fsDir.ModTime(d.vfs.ctx)
|
||||
d.modTimeMu.Unlock()
|
||||
d.mu.Lock()
|
||||
oldPath := d.path
|
||||
@@ -539,7 +538,7 @@ func (d *Dir) _readDir() error {
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
entries, err := list.DirSorted(context.TODO(), d.f, false, d.path)
|
||||
entries, err := list.DirSorted(d.vfs.ctx, d.f, false, d.path)
|
||||
if err == fs.ErrorDirNotFound {
|
||||
// We treat directory not found as empty because we
|
||||
// create directories on the fly
|
||||
@@ -548,7 +547,7 @@ func (d *Dir) _readDir() error {
|
||||
}
|
||||
|
||||
if d.vfs.Opt.BlockNormDupes { // do this only if requested, as it will have a performance hit
|
||||
ci := fs.GetConfig(context.TODO())
|
||||
ci := fs.GetConfig(d.vfs.ctx)
|
||||
|
||||
// sort entries such that NFD comes before NFC of same name
|
||||
sort.Slice(entries, func(i, j int) bool {
|
||||
@@ -761,7 +760,7 @@ func (d *Dir) _readDirFromEntries(entries fs.DirEntries, dirTree dirtree.DirTree
|
||||
}
|
||||
dir := node.(*Dir)
|
||||
dir.mu.Lock()
|
||||
dir.modTime = item.ModTime(context.TODO())
|
||||
dir.modTime = item.ModTime(d.vfs.ctx)
|
||||
dir.entry = item
|
||||
if dirTree != nil {
|
||||
err = dir._readDirFromDirTree(dirTree, when)
|
||||
@@ -794,7 +793,7 @@ func (d *Dir) readDirTree() error {
|
||||
d.mu.RUnlock()
|
||||
when := time.Now()
|
||||
fs.Debugf(path, "Reading directory tree")
|
||||
dt, err := walk.NewDirTree(context.TODO(), f, path, false, -1)
|
||||
dt, err := walk.NewDirTree(d.vfs.ctx, f, path, false, -1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -840,7 +839,7 @@ func (d *Dir) statMetadata(leaf, baseLeaf string) (metaNode Node, err error) {
|
||||
entry := node.DirEntry()
|
||||
var metadataDump []byte
|
||||
if entry != nil {
|
||||
metadata, err := fs.GetMetadata(context.TODO(), entry)
|
||||
metadata, err := fs.GetMetadata(d.vfs.ctx, entry)
|
||||
if err != nil {
|
||||
metadataDump = jsonErrorf("failed to read metadata: %v", err)
|
||||
} else if metadata == nil {
|
||||
@@ -856,7 +855,7 @@ func (d *Dir) statMetadata(leaf, baseLeaf string) (metaNode Node, err error) {
|
||||
}
|
||||
// Make a memory based file with metadataDump in
|
||||
remote := path.Join(d.path, leaf)
|
||||
o := object.NewMemoryObject(remote, entry.ModTime(context.TODO()), metadataDump)
|
||||
o := object.NewMemoryObject(remote, entry.ModTime(d.vfs.ctx), metadataDump)
|
||||
f := newFile(d, d.path, o, leaf)
|
||||
// Base the metadata inode number off the real file inode number
|
||||
// to keep it constant
|
||||
@@ -892,7 +891,7 @@ func (d *Dir) stat(leaf string) (Node, error) {
|
||||
}
|
||||
}
|
||||
|
||||
ci := fs.GetConfig(context.TODO())
|
||||
ci := fs.GetConfig(d.vfs.ctx)
|
||||
normUnicode := !ci.NoUnicodeNormalization
|
||||
normCase := ci.IgnoreCaseSync || d.vfs.Opt.CaseInsensitive
|
||||
if !ok && (normUnicode || normCase) {
|
||||
@@ -1085,7 +1084,7 @@ func (d *Dir) Mkdir(name string) (*Dir, error) {
|
||||
return nil, err
|
||||
}
|
||||
// fs.Debugf(path, "Dir.Mkdir")
|
||||
err = d.f.Mkdir(context.TODO(), path)
|
||||
err = d.f.Mkdir(d.vfs.ctx, path)
|
||||
if err != nil {
|
||||
fs.Errorf(d, "Dir.Mkdir failed to create directory: %v", err)
|
||||
return nil, err
|
||||
@@ -1117,7 +1116,7 @@ func (d *Dir) Remove() error {
|
||||
return ENOTEMPTY
|
||||
}
|
||||
// remove directory
|
||||
err = d.f.Rmdir(context.TODO(), d.path)
|
||||
err = d.f.Rmdir(d.vfs.ctx, d.path)
|
||||
if err != nil {
|
||||
fs.Errorf(d, "Dir.Remove failed to remove directory: %v", err)
|
||||
return err
|
||||
@@ -1192,7 +1191,7 @@ func (d *Dir) Rename(oldName, newName string, destDir *Dir) error {
|
||||
switch x := oldNode.DirEntry().(type) {
|
||||
case nil:
|
||||
if oldFile, ok := oldNode.(*File); ok {
|
||||
if err = oldFile.rename(context.TODO(), destDir, newName); err != nil {
|
||||
if err = oldFile.rename(d.vfs.ctx, destDir, newName); err != nil {
|
||||
fs.Errorf(oldPath, "Dir.Rename error: %v", err)
|
||||
return err
|
||||
}
|
||||
@@ -1202,7 +1201,7 @@ func (d *Dir) Rename(oldName, newName string, destDir *Dir) error {
|
||||
}
|
||||
case fs.Object:
|
||||
if oldFile, ok := oldNode.(*File); ok {
|
||||
if err = oldFile.rename(context.TODO(), destDir, newName); err != nil {
|
||||
if err = oldFile.rename(d.vfs.ctx, destDir, newName); err != nil {
|
||||
fs.Errorf(oldPath, "Dir.Rename error: %v", err)
|
||||
return err
|
||||
}
|
||||
@@ -1220,12 +1219,12 @@ func (d *Dir) Rename(oldName, newName string, destDir *Dir) error {
|
||||
}
|
||||
srcRemote := x.Remote()
|
||||
dstRemote := newPath
|
||||
err = operations.DirMove(context.TODO(), d.f, srcRemote, dstRemote)
|
||||
err = operations.DirMove(d.vfs.ctx, d.f, srcRemote, dstRemote)
|
||||
if err != nil {
|
||||
fs.Errorf(oldPath, "Dir.Rename error: %v", err)
|
||||
return err
|
||||
}
|
||||
newDir := fs.NewDirCopy(context.TODO(), x).SetRemote(newPath)
|
||||
newDir := fs.NewDirCopy(d.vfs.ctx, x).SetRemote(newPath)
|
||||
// Update the node with the new details
|
||||
if oldNode != nil {
|
||||
if oldDir, ok := oldNode.(*Dir); ok {
|
||||
|
||||
16
vfs/file.go
16
vfs/file.go
@@ -40,8 +40,9 @@ import (
|
||||
|
||||
// File represents a file or a symlink
|
||||
type File struct {
|
||||
inode uint64 // inode number - read only
|
||||
size atomic.Int64 // size of file
|
||||
inode uint64 // inode number - read only
|
||||
size atomic.Int64 // size of file
|
||||
ctx context.Context // context for VFS operations - read only
|
||||
|
||||
muRW sync.Mutex // synchronize RWFileHandle.openPending(), RWFileHandle.close() and File.Remove
|
||||
|
||||
@@ -70,6 +71,7 @@ func newFile(d *Dir, dPath string, o fs.Object, leaf string) *File {
|
||||
o: o,
|
||||
leaf: leaf,
|
||||
inode: newInode(),
|
||||
ctx: d.vfs.ctx,
|
||||
}
|
||||
if o != nil {
|
||||
f.size.Store(o.Size())
|
||||
@@ -220,7 +222,7 @@ func (f *File) applyPendingRename() {
|
||||
return
|
||||
}
|
||||
fs.Debugf(f.Path(), "Running delayed rename now")
|
||||
if err := fun(context.TODO()); err != nil {
|
||||
if err := fun(f.ctx); err != nil {
|
||||
fs.Errorf(f.Path(), "delayed File.Rename error: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -415,7 +417,7 @@ func (f *File) ModTime() (modTime time.Time) {
|
||||
if o == nil {
|
||||
return time.Now()
|
||||
}
|
||||
return o.ModTime(context.TODO())
|
||||
return o.ModTime(f.ctx)
|
||||
}
|
||||
|
||||
// nonNegative returns 0 if i is -ve, i otherwise
|
||||
@@ -491,7 +493,7 @@ func (f *File) _applyPendingModTime() error {
|
||||
return errors.New("cannot apply ModTime, file object is not available")
|
||||
}
|
||||
|
||||
dt := f.pendingModTime.Sub(f.o.ModTime(context.Background()))
|
||||
dt := f.pendingModTime.Sub(f.o.ModTime(f.ctx))
|
||||
modifyWindow := f.o.Fs().Precision()
|
||||
if dt < modifyWindow && dt > -modifyWindow {
|
||||
fs.Debugf(f.o, "Not setting pending mod time %v as it is already set", f.pendingModTime)
|
||||
@@ -499,7 +501,7 @@ func (f *File) _applyPendingModTime() error {
|
||||
}
|
||||
|
||||
// set the time of the object
|
||||
err := f.o.SetModTime(context.TODO(), f.pendingModTime)
|
||||
err := f.o.SetModTime(f.ctx, f.pendingModTime)
|
||||
switch err {
|
||||
case nil:
|
||||
fs.Debugf(f.o, "Applied pending mod time %v OK", f.pendingModTime)
|
||||
@@ -682,7 +684,7 @@ func (f *File) Remove() (err error) {
|
||||
f.muRW.Lock() // muRW must be locked before mu to avoid
|
||||
f.mu.Lock() // deadlock in RWFileHandle.openPending and .close
|
||||
if f.o != nil {
|
||||
err = f.o.Remove(context.TODO())
|
||||
err = f.o.Remove(f.ctx)
|
||||
}
|
||||
f.mu.Unlock()
|
||||
f.muRW.Unlock()
|
||||
|
||||
16
vfs/read.go
16
vfs/read.go
@@ -76,13 +76,13 @@ func (fh *ReadFileHandle) openPending() (err error) {
|
||||
}
|
||||
o := fh.file.getObject()
|
||||
opt := &fh.file.VFS().Opt
|
||||
r, err := chunkedreader.New(context.TODO(), o, int64(opt.ChunkSize), int64(opt.ChunkSizeLimit), opt.ChunkStreams).Open()
|
||||
r, err := chunkedreader.New(fh.file.ctx, o, int64(opt.ChunkSize), int64(opt.ChunkSizeLimit), opt.ChunkStreams).Open()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tr := accounting.GlobalStats().NewTransfer(o, nil)
|
||||
fh.done = tr.Done
|
||||
fh.r = tr.Account(context.TODO(), r).WithBuffer() // account the transfer
|
||||
fh.r = tr.Account(fh.file.ctx, r).WithBuffer() // account the transfer
|
||||
fh.opened = true
|
||||
|
||||
return nil
|
||||
@@ -135,7 +135,7 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
|
||||
}
|
||||
if !reopen {
|
||||
fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d (fs.RangeSeeker)", fh.offset, offset)
|
||||
_, err = r.RangeSeek(context.TODO(), offset, io.SeekStart, -1)
|
||||
_, err = r.RangeSeek(fh.file.ctx, offset, io.SeekStart, -1)
|
||||
if err != nil {
|
||||
fs.Debugf(fh.remote, "ReadFileHandle.Read fs.RangeSeeker failed: %v", err)
|
||||
return err
|
||||
@@ -150,7 +150,7 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
|
||||
// re-open with a seek
|
||||
o := fh.file.getObject()
|
||||
opt := &fh.file.VFS().Opt
|
||||
r = chunkedreader.New(context.TODO(), o, int64(opt.ChunkSize), int64(opt.ChunkSizeLimit), opt.ChunkStreams)
|
||||
r = chunkedreader.New(fh.file.ctx, o, int64(opt.ChunkSize), int64(opt.ChunkSizeLimit), opt.ChunkStreams)
|
||||
_, err := r.Seek(offset, 0)
|
||||
if err != nil {
|
||||
fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err)
|
||||
@@ -162,7 +162,7 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
fh.r.UpdateReader(context.TODO(), r)
|
||||
fh.r.UpdateReader(fh.file.ctx, r)
|
||||
fh.offset = offset
|
||||
return nil
|
||||
}
|
||||
@@ -277,7 +277,7 @@ func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) {
|
||||
retries := 0
|
||||
reqSize := len(p)
|
||||
doReopen := false
|
||||
lowLevelRetries := fs.GetConfig(context.TODO()).LowLevelRetries
|
||||
lowLevelRetries := fs.GetConfig(fh.file.ctx).LowLevelRetries
|
||||
for {
|
||||
if doSeek {
|
||||
// Are we attempting to seek beyond the end of the
|
||||
@@ -352,7 +352,7 @@ func (fh *ReadFileHandle) checkHash() error {
|
||||
|
||||
o := fh.file.getObject()
|
||||
for hashType, dstSum := range fh.hash.Sums() {
|
||||
srcSum, err := o.Hash(context.TODO(), hashType)
|
||||
srcSum, err := o.Hash(fh.file.ctx, hashType)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
// if it was file not found then at
|
||||
@@ -416,7 +416,7 @@ func (fh *ReadFileHandle) close() error {
|
||||
if fh.opened {
|
||||
var err error
|
||||
defer func() {
|
||||
fh.done(context.TODO(), err)
|
||||
fh.done(fh.file.ctx, err)
|
||||
}()
|
||||
// Close first so that we have hashes
|
||||
err = fh.r.Close()
|
||||
|
||||
13
vfs/vfs.go
13
vfs/vfs.go
@@ -176,6 +176,7 @@ var (
|
||||
// VFS represents the top level filing system
|
||||
type VFS struct {
|
||||
f fs.Fs
|
||||
ctx context.Context
|
||||
root *Dir
|
||||
Opt vfscommon.Options
|
||||
cache *vfscache.Cache
|
||||
@@ -201,6 +202,7 @@ func New(f fs.Fs, opt *vfscommon.Options) *VFS {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
vfs := &VFS{
|
||||
f: f,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
vfs.inUse.Store(1)
|
||||
@@ -213,7 +215,7 @@ func New(f fs.Fs, opt *vfscommon.Options) *VFS {
|
||||
}
|
||||
|
||||
// Fill out anything else
|
||||
vfs.Opt.Init()
|
||||
vfs.Opt.Init(ctx)
|
||||
|
||||
// Find a VFS with the same name and options and return it if possible
|
||||
activeMu.Lock()
|
||||
@@ -223,6 +225,7 @@ func New(f fs.Fs, opt *vfscommon.Options) *VFS {
|
||||
if vfs.Opt == activeVFS.Opt {
|
||||
fs.Debugf(f, "Reusing VFS from active cache")
|
||||
activeVFS.inUse.Add(1)
|
||||
cancel()
|
||||
return activeVFS
|
||||
}
|
||||
}
|
||||
@@ -236,7 +239,7 @@ func New(f fs.Fs, opt *vfscommon.Options) *VFS {
|
||||
features := vfs.f.Features()
|
||||
if do := features.ChangeNotify; do != nil {
|
||||
vfs.pollChan = make(chan time.Duration)
|
||||
do(context.TODO(), vfs.root.changeNotify, vfs.pollChan)
|
||||
do(vfs.ctx, vfs.root.changeNotify, vfs.pollChan)
|
||||
vfs.pollChan <- time.Duration(vfs.Opt.PollInterval)
|
||||
} else if vfs.Opt.PollInterval > 0 {
|
||||
fs.Infof(f, "poll-interval is not supported by this remote")
|
||||
@@ -351,8 +354,8 @@ func (vfs *VFS) SetCacheMode(cacheMode vfscommon.CacheMode) {
|
||||
vfs.shutdownCache()
|
||||
vfs.cache = nil
|
||||
if cacheMode > vfscommon.CacheModeOff {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cache, err := vfscache.New(ctx, vfs.f, &vfs.Opt, vfs.AddVirtual) // FIXME pass on context or get from Opt?
|
||||
ctx, cancel := context.WithCancel(vfs.ctx)
|
||||
cache, err := vfscache.New(ctx, vfs.f, &vfs.Opt, vfs.AddVirtual)
|
||||
if err != nil {
|
||||
fs.Errorf(nil, "Failed to create vfs cache - disabling: %v", err)
|
||||
vfs.Opt.CacheMode = vfscommon.CacheModeOff
|
||||
@@ -652,7 +655,7 @@ func (vfs *VFS) Statfs() (total, used, free int64) {
|
||||
doAbout := vfs.f.Features().About
|
||||
if (doAbout != nil || vfs.Opt.UsedIsSize) && (vfs.usageTime.IsZero() || time.Since(vfs.usageTime) >= time.Duration(vfs.Opt.DirCacheTime)) {
|
||||
var err error
|
||||
ctx := context.TODO()
|
||||
ctx := vfs.ctx
|
||||
if doAbout == nil {
|
||||
vfs.usage = &fs.Usage{}
|
||||
} else {
|
||||
|
||||
@@ -138,7 +138,7 @@ func TestVFSNew(t *testing.T) {
|
||||
|
||||
// Check making a VFS with nil options
|
||||
var defaultOpt = vfscommon.Opt
|
||||
defaultOpt.Init()
|
||||
defaultOpt.Init(context.Background())
|
||||
|
||||
checkActiveCacheEntries(1)
|
||||
|
||||
|
||||
@@ -41,6 +41,7 @@ import (
|
||||
// Cache opened files
|
||||
type Cache struct {
|
||||
// read only - no locking needed to read these
|
||||
ctx context.Context // context for cache lifetime
|
||||
fremote fs.Fs // fs for the remote we are caching
|
||||
fcache fs.Fs // fs for the cache directory
|
||||
fcacheMeta fs.Fs // fs for the cache metadata directory
|
||||
@@ -115,6 +116,7 @@ func New(ctx context.Context, fremote fs.Fs, opt *vfscommon.Options, avFn AddVir
|
||||
|
||||
// Create the cache object
|
||||
c := &Cache{
|
||||
ctx: ctx,
|
||||
fremote: fremote,
|
||||
fcache: fdata,
|
||||
fcacheMeta: fmeta,
|
||||
@@ -689,7 +691,7 @@ func (c *Cache) purgeOld(maxAge time.Duration) {
|
||||
|
||||
// Purge any empty directories
|
||||
func (c *Cache) purgeEmptyDirs(dir string, leaveRoot bool) {
|
||||
ctx := context.Background()
|
||||
ctx := c.ctx
|
||||
err := operations.Rmdirs(ctx, c.fcache, dir, leaveRoot)
|
||||
if err != nil {
|
||||
fs.Errorf(c.fcache, "vfs cache: failed to remove empty directories from cache path %q: %v", dir, err)
|
||||
|
||||
@@ -102,11 +102,11 @@ type downloader struct {
|
||||
}
|
||||
|
||||
// New makes a downloader for item
|
||||
func New(item Item, opt *vfscommon.Options, remote string, src fs.Object) (dls *Downloaders) {
|
||||
func New(ctx context.Context, item Item, opt *vfscommon.Options, remote string, src fs.Object) (dls *Downloaders) {
|
||||
if src == nil {
|
||||
panic("internal error: newDownloaders called with nil src object")
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
dls = &Downloaders{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
@@ -284,7 +284,7 @@ func (dls *Downloaders) _ensureDownloader(r ranges.Range) (err error) {
|
||||
// defer log.Trace(dls.src, "r=%v", r)("err=%v", &err)
|
||||
|
||||
// The window includes potentially unread data in the buffer
|
||||
window := int64(fs.GetConfig(context.TODO()).BufferSize)
|
||||
window := int64(fs.GetConfig(dls.ctx).BufferSize)
|
||||
|
||||
// Increase the read range by the read ahead if set
|
||||
if dls.opt.ReadAhead > 0 {
|
||||
@@ -536,7 +536,7 @@ func (dl *downloader) open(offset int64) (err error) {
|
||||
// }
|
||||
// in0, err := operations.NewReOpen(dl.dls.ctx, dl.dls.src, ci.LowLevelRetries, dl.dls.item.c.hashOption, rangeOption)
|
||||
|
||||
in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.opt.ChunkSize), int64(dl.dls.opt.ChunkSizeLimit), dl.dls.opt.ChunkStreams)
|
||||
in0 := chunkedreader.New(dl.dls.ctx, dl.dls.src, int64(dl.dls.opt.ChunkSize), int64(dl.dls.opt.ChunkSizeLimit), dl.dls.opt.ChunkStreams)
|
||||
_, err = in0.Seek(offset, 0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("vfs reader: failed to open source file: %w", err)
|
||||
|
||||
@@ -94,7 +94,7 @@ func TestDownloaders(t *testing.T) {
|
||||
size: size,
|
||||
}
|
||||
opt := vfscommon.Opt
|
||||
dls := New(item, &opt, remote, src)
|
||||
dls := New(ctx, item, &opt, remote, src)
|
||||
return item, dls
|
||||
}
|
||||
cancel := func(dls *Downloaders) {
|
||||
|
||||
@@ -496,7 +496,7 @@ func (item *Item) _createFile(osPath string) (err error) {
|
||||
// Open the local file from the object passed in. Wraps open()
|
||||
// to provide recovery from out of space error.
|
||||
func (item *Item) Open(o fs.Object) (err error) {
|
||||
for range fs.GetConfig(context.TODO()).LowLevelRetries {
|
||||
for range fs.GetConfig(item.c.ctx).LowLevelRetries {
|
||||
item.preAccess()
|
||||
err = item.open(o)
|
||||
item.postAccess()
|
||||
@@ -595,7 +595,7 @@ func (item *Item) open(o fs.Object) (err error) {
|
||||
|
||||
// Create the downloaders
|
||||
if item.o != nil {
|
||||
item.downloaders = downloaders.New(item, item.c.opt, item.name, item.o)
|
||||
item.downloaders = downloaders.New(item.c.ctx, item, item.c.opt, item.name, item.o)
|
||||
}
|
||||
|
||||
return err
|
||||
@@ -783,7 +783,7 @@ func (item *Item) _actualClose(storeFn StoreFn, syncWriteBack bool) (err error)
|
||||
// set the modtime from the object otherwise set it from the info
|
||||
if item._exists() {
|
||||
if !item.info.Dirty && item.o != nil {
|
||||
item._setModTime(item.o.ModTime(context.Background()))
|
||||
item._setModTime(item.o.ModTime(item.c.ctx))
|
||||
} else {
|
||||
item._setModTime(item.info.ModTime)
|
||||
}
|
||||
@@ -794,7 +794,7 @@ func (item *Item) _actualClose(storeFn StoreFn, syncWriteBack bool) (err error)
|
||||
fs.Infof(item.name, "vfs cache: queuing for upload in %v", item.c.opt.WriteBack)
|
||||
if syncWriteBack {
|
||||
// do synchronous writeback
|
||||
checkErr(item._store(context.Background(), storeFn))
|
||||
checkErr(item._store(item.c.ctx, storeFn))
|
||||
} else {
|
||||
// asynchronous writeback
|
||||
item.c.writeback.SetID(&item.writeBackID)
|
||||
@@ -874,7 +874,7 @@ func (item *Item) _checkObject(o fs.Object) error {
|
||||
// OK
|
||||
}
|
||||
} else {
|
||||
remoteFingerprint := fs.Fingerprint(context.TODO(), o, item.c.opt.FastFingerprint)
|
||||
remoteFingerprint := fs.Fingerprint(item.c.ctx, o, item.c.opt.FastFingerprint)
|
||||
fs.Debugf(item.name, "vfs cache: checking remote fingerprint %q against cached fingerprint %q", remoteFingerprint, item.info.Fingerprint)
|
||||
if item.info.Fingerprint != "" {
|
||||
// remote object && local object
|
||||
@@ -1129,7 +1129,7 @@ func (item *Item) Reset() (rr ResetResult, spaceFreed int64, err error) {
|
||||
|
||||
// Create the downloaders
|
||||
if item.o != nil {
|
||||
item.downloaders = downloaders.New(item, item.c.opt, item.name, item.o)
|
||||
item.downloaders = downloaders.New(item.c.ctx, item, item.c.opt, item.name, item.o)
|
||||
}
|
||||
|
||||
/* The item will stay in the beingReset state if we get an error that prevents us from
|
||||
@@ -1235,13 +1235,13 @@ func (item *Item) _ensure(offset, size int64) (err error) {
|
||||
// See: https://github.com/rclone/rclone/issues/6190
|
||||
// See: https://github.com/rclone/rclone/issues/6235
|
||||
if item.o == nil {
|
||||
o, err := item.c.fremote.NewObject(context.Background(), item.name)
|
||||
o, err := item.c.fremote.NewObject(item.c.ctx, item.name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
item.o = o
|
||||
}
|
||||
item.downloaders = downloaders.New(item, item.c.opt, item.name, item.o)
|
||||
item.downloaders = downloaders.New(item.c.ctx, item, item.c.opt, item.name, item.o)
|
||||
}
|
||||
return item.downloaders.Download(r)
|
||||
}
|
||||
@@ -1269,7 +1269,7 @@ func (item *Item) _updateFingerprint() {
|
||||
return
|
||||
}
|
||||
oldFingerprint := item.info.Fingerprint
|
||||
item.info.Fingerprint = fs.Fingerprint(context.TODO(), item.o, item.c.opt.FastFingerprint)
|
||||
item.info.Fingerprint = fs.Fingerprint(item.c.ctx, item.o, item.c.opt.FastFingerprint)
|
||||
if oldFingerprint != item.info.Fingerprint {
|
||||
fs.Debugf(item.o, "vfs cache: fingerprint now %q", item.info.Fingerprint)
|
||||
}
|
||||
@@ -1317,7 +1317,7 @@ func (item *Item) GetModTime() (modTime time.Time, err error) {
|
||||
func (item *Item) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
n = 0
|
||||
var expBackOff int
|
||||
for retries := range fs.GetConfig(context.TODO()).LowLevelRetries {
|
||||
for retries := range fs.GetConfig(item.c.ctx).LowLevelRetries {
|
||||
item.preAccess()
|
||||
n, err = item.readAt(b, off)
|
||||
item.postAccess()
|
||||
|
||||
@@ -435,7 +435,7 @@ func (wb *WriteBack) processItems(ctx context.Context) {
|
||||
resetTimer := true
|
||||
for wbItem := wb._peekItem(); wbItem != nil && time.Until(wbItem.expiry) <= 0; wbItem = wb._peekItem() {
|
||||
// If reached transfer limit don't restart the timer
|
||||
if wb.uploads >= fs.GetConfig(context.TODO()).Transfers {
|
||||
if wb.uploads >= fs.GetConfig(wb.ctx).Transfers {
|
||||
fs.Debugf(wbItem.name, "vfs cache: delaying writeback as --transfers exceeded")
|
||||
resetTimer = false
|
||||
break
|
||||
|
||||
@@ -222,8 +222,8 @@ type Options struct {
|
||||
var Opt Options
|
||||
|
||||
// Init the options, making sure everything is within range
|
||||
func (opt *Options) Init() {
|
||||
ci := fs.GetConfig(context.Background())
|
||||
func (opt *Options) Init(ctx context.Context) {
|
||||
ci := fs.GetConfig(ctx)
|
||||
|
||||
// Override --vfs-links with --links if set
|
||||
if ci.Links {
|
||||
|
||||
@@ -147,7 +147,7 @@ func newRun(useVFS bool, vfsOpt *vfscommon.Options, mountFn mountlib.MountFn) *R
|
||||
useVFS: useVFS,
|
||||
vfsOpt: vfsOpt,
|
||||
}
|
||||
r.vfsOpt.Init()
|
||||
r.vfsOpt.Init(context.Background())
|
||||
fstest.Initialise()
|
||||
|
||||
var err error
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package vfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
@@ -71,7 +70,7 @@ func (fh *WriteFileHandle) openPending() (err error) {
|
||||
pipeReader, fh.pipeWriter = io.Pipe()
|
||||
go func() {
|
||||
// NB Rcat deals with Stats.Transferring, etc.
|
||||
o, err := operations.Rcat(context.TODO(), fh.file.Fs(), fh.remote, pipeReader, time.Now(), nil)
|
||||
o, err := operations.Rcat(fh.file.ctx, fh.file.Fs(), fh.remote, pipeReader, time.Now(), nil)
|
||||
if err != nil {
|
||||
fs.Errorf(fh.remote, "WriteFileHandle.New Rcat failed: %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user