mirror of
https://github.com/rclone/rclone.git
synced 2026-03-26 03:12:36 -04:00
vfs: add --vfs-handle-caching to keep file handles alive after close
Add a configurable grace period (default 5s) that delays closing file handles and downloaders when the last handle closes. If a new handle opens within the grace period, it reuses the existing resources. This fixes 40x performance degradation with serve nfs vs serve sftp caused by go-nfs opening/reading/closing on every NFS READ RPC, which destroyed read-ahead prefetch before it could accumulate. The grace period only applies to non-dirty files so that writeback proceeds immediately on close. Fixes #9251
This commit is contained in:
@@ -113,6 +113,9 @@ func newTestCache(t *testing.T) (r *fstest.Run, c *Cache) {
|
||||
// Disable synchronous write
|
||||
opt.WriteBack = 0
|
||||
|
||||
// Disable handle caching so existing tests get immediate close behavior
|
||||
opt.HandleCaching = 0
|
||||
|
||||
return newTestCacheOpt(t, opt)
|
||||
}
|
||||
|
||||
|
||||
@@ -68,6 +68,8 @@ type Item struct {
|
||||
pendingAccesses int // number of threads - cache reset not allowed if not zero
|
||||
modified bool // set if the file has been modified since the last Open
|
||||
beingReset bool // cache cleaner is resetting the cache file, access not allowed
|
||||
graceTimer *time.Timer // timer for delayed close after grace period
|
||||
graceStoreFn StoreFn // saved StoreFn for use when grace timer fires
|
||||
}
|
||||
|
||||
// Info is persisted to backing store
|
||||
@@ -535,6 +537,20 @@ func (item *Item) open(o fs.Object) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if recovering from grace period (fd and downloaders still alive)
|
||||
if item.graceTimer != nil {
|
||||
item.graceTimer.Stop()
|
||||
item.graceTimer = nil
|
||||
item.graceStoreFn = nil
|
||||
// fd and downloaders still alive - reuse them
|
||||
// Still check object for changes
|
||||
err = item._checkObject(o)
|
||||
if err != nil {
|
||||
return fmt.Errorf("vfs cache item: check object failed: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
err = item._createFile(osPath)
|
||||
if err != nil {
|
||||
item._remove("item.open failed on _createFile, remove cache data/metadata files")
|
||||
@@ -647,10 +663,7 @@ func (item *Item) Close(storeFn StoreFn) (err error) {
|
||||
// defer log.Trace(item.o, "Item.Close")("err=%v", &err)
|
||||
item.preAccess()
|
||||
defer item.postAccess()
|
||||
var (
|
||||
downloaders *downloaders.Downloaders
|
||||
syncWriteBack = item.c.opt.WriteBack <= 0
|
||||
)
|
||||
syncWriteBack := item.c.opt.WriteBack <= 0
|
||||
item.mu.Lock()
|
||||
defer item.mu.Unlock()
|
||||
|
||||
@@ -663,6 +676,46 @@ func (item *Item) Close(storeFn StoreFn) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// opens == 0: check for grace period (only for non-dirty files,
|
||||
// dirty files need immediate close so writeback can proceed)
|
||||
gracePeriod := time.Duration(item.c.opt.HandleCaching)
|
||||
if gracePeriod > 0 && !item.info.Dirty {
|
||||
item.graceStoreFn = storeFn
|
||||
item.graceTimer = time.AfterFunc(gracePeriod, item.closeAfterGrace)
|
||||
return nil
|
||||
}
|
||||
|
||||
return item._actualClose(storeFn, syncWriteBack)
|
||||
}
|
||||
|
||||
// closeAfterGrace is called by the grace timer to perform the actual close
|
||||
func (item *Item) closeAfterGrace() {
|
||||
item.preAccess()
|
||||
defer item.postAccess()
|
||||
syncWriteBack := item.c.opt.WriteBack <= 0
|
||||
item.mu.Lock()
|
||||
defer item.mu.Unlock()
|
||||
|
||||
// Someone re-opened or another timer took over
|
||||
if item.opens > 0 || item.graceTimer == nil {
|
||||
return
|
||||
}
|
||||
item.graceTimer = nil
|
||||
storeFn := item.graceStoreFn
|
||||
item.graceStoreFn = nil
|
||||
|
||||
err := item._actualClose(storeFn, syncWriteBack)
|
||||
if err != nil {
|
||||
fs.Errorf(item.name, "vfs cache: close after grace period failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// _actualClose performs the actual close operations on the item.
|
||||
//
|
||||
// Call with item.mu held. May temporarily unlock item.mu.
|
||||
func (item *Item) _actualClose(storeFn StoreFn, syncWriteBack bool) (err error) {
|
||||
var dls *downloaders.Downloaders
|
||||
|
||||
// Update the size on close
|
||||
_, _ = item._getSize()
|
||||
|
||||
@@ -690,7 +743,7 @@ func (item *Item) Close(storeFn StoreFn) (err error) {
|
||||
}
|
||||
|
||||
// Close the downloaders
|
||||
if downloaders = item.downloaders; downloaders != nil {
|
||||
if dls = item.downloaders; dls != nil {
|
||||
item.downloaders = nil
|
||||
// FIXME need to unlock to kill downloader - should we
|
||||
// re-arrange locking so this isn't necessary? maybe
|
||||
@@ -700,7 +753,7 @@ func (item *Item) Close(storeFn StoreFn) (err error) {
|
||||
// downloader.Write calls ensure which needs the lock
|
||||
// close downloader with mutex unlocked
|
||||
item.mu.Unlock()
|
||||
checkErr(downloaders.Close(nil))
|
||||
checkErr(dls.Close(nil))
|
||||
item.mu.Lock()
|
||||
}
|
||||
|
||||
@@ -918,6 +971,23 @@ func (item *Item) RemoveNotInUse(maxAge time.Duration, emptyOnly bool) (removed
|
||||
return
|
||||
}
|
||||
|
||||
// Cancel any grace period timer and close the resources it was keeping alive
|
||||
if item.graceTimer != nil {
|
||||
item.graceTimer.Stop()
|
||||
item.graceTimer = nil
|
||||
item.graceStoreFn = nil
|
||||
if dls := item.downloaders; dls != nil {
|
||||
item.downloaders = nil
|
||||
item.mu.Unlock()
|
||||
_ = dls.Close(nil)
|
||||
item.mu.Lock()
|
||||
}
|
||||
if item.fd != nil {
|
||||
_ = item.fd.Close()
|
||||
item.fd = nil
|
||||
}
|
||||
}
|
||||
|
||||
removeIt := false
|
||||
if maxAge == 0 {
|
||||
removeIt = true // quota-driven removal
|
||||
@@ -949,6 +1019,25 @@ func (item *Item) Reset() (rr ResetResult, spaceFreed int64, err error) {
|
||||
item.mu.Lock()
|
||||
defer item.mu.Unlock()
|
||||
|
||||
// Cancel any grace period timer and close the resources it was keeping alive
|
||||
if item.graceTimer != nil {
|
||||
item.graceTimer.Stop()
|
||||
item.graceTimer = nil
|
||||
item.graceStoreFn = nil
|
||||
// Close the downloaders that the grace period was keeping alive
|
||||
if dls := item.downloaders; dls != nil {
|
||||
item.downloaders = nil
|
||||
item.mu.Unlock()
|
||||
_ = dls.Close(nil)
|
||||
item.mu.Lock()
|
||||
}
|
||||
// Close the file handle that the grace period was keeping alive
|
||||
if item.fd != nil {
|
||||
_ = item.fd.Close()
|
||||
item.fd = nil
|
||||
}
|
||||
}
|
||||
|
||||
// The item is not being used now. Just remove it instead of resetting it.
|
||||
if item.opens == 0 && !item.info.Dirty {
|
||||
spaceFreed = item.info.Rs.Size()
|
||||
|
||||
@@ -32,6 +32,9 @@ func newItemTestCache(t *testing.T) (r *fstest.Run, c *Cache) {
|
||||
// Disable synchronous write
|
||||
opt.WriteBack = 0
|
||||
|
||||
// Disable handle caching so existing tests get immediate close behavior
|
||||
opt.HandleCaching = 0
|
||||
|
||||
return newTestCacheOpt(t, opt)
|
||||
}
|
||||
|
||||
@@ -580,3 +583,113 @@ func TestItemReadWrite(t *testing.T) {
|
||||
assert.False(t, item.remove(fileName))
|
||||
})
|
||||
}
|
||||
|
||||
func newItemTestCacheHandleCaching(t *testing.T, handleCaching time.Duration) (r *fstest.Run, c *Cache) {
|
||||
opt := vfscommon.Opt
|
||||
|
||||
// Disable the cache cleaner as it interferes with these tests
|
||||
opt.CachePollInterval = 0
|
||||
|
||||
// Disable synchronous write
|
||||
opt.WriteBack = 0
|
||||
|
||||
// Set handle caching grace period
|
||||
opt.HandleCaching = fs.Duration(handleCaching)
|
||||
|
||||
return newTestCacheOpt(t, opt)
|
||||
}
|
||||
|
||||
func TestItemHandleCaching(t *testing.T) {
|
||||
r, c := newItemTestCacheHandleCaching(t, 1*time.Second)
|
||||
|
||||
contents, obj, item := newFile(t, r, c, "existing")
|
||||
|
||||
// Open, read, and close the item
|
||||
require.NoError(t, item.Open(obj))
|
||||
|
||||
buf := make([]byte, 10)
|
||||
n, err := item.ReadAt(buf, 0)
|
||||
assert.Equal(t, 10, n)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, contents[:10], string(buf[:n]))
|
||||
|
||||
require.NoError(t, item.Close(nil))
|
||||
|
||||
// After close, grace period should keep fd and downloaders alive
|
||||
item.mu.Lock()
|
||||
assert.NotNil(t, item.fd, "fd should still be open during grace period")
|
||||
assert.NotNil(t, item.downloaders, "downloaders should still be alive during grace period")
|
||||
assert.NotNil(t, item.graceTimer, "grace timer should be set")
|
||||
item.mu.Unlock()
|
||||
|
||||
// Re-open the item - should reuse existing fd and downloaders
|
||||
require.NoError(t, item.Open(obj))
|
||||
|
||||
// Read data to verify it works
|
||||
n, err = item.ReadAt(buf, 10)
|
||||
assert.Equal(t, 10, n)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, contents[10:20], string(buf[:n]))
|
||||
|
||||
// Close again
|
||||
require.NoError(t, item.Close(nil))
|
||||
|
||||
// Wait for grace period to expire
|
||||
time.Sleep(1500 * time.Millisecond)
|
||||
|
||||
// After grace period, fd and downloaders should be cleaned up
|
||||
item.mu.Lock()
|
||||
assert.Nil(t, item.fd, "fd should be closed after grace period")
|
||||
assert.Nil(t, item.downloaders, "downloaders should be closed after grace period")
|
||||
assert.Nil(t, item.graceTimer, "grace timer should be nil after expiry")
|
||||
item.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestItemHandleCachingDisabled(t *testing.T) {
|
||||
r, c := newItemTestCacheHandleCaching(t, 0)
|
||||
|
||||
contents, obj, item := newFile(t, r, c, "existing")
|
||||
_ = contents
|
||||
|
||||
// Open and close the item
|
||||
require.NoError(t, item.Open(obj))
|
||||
require.NoError(t, item.Close(nil))
|
||||
|
||||
// With handle caching disabled, fd and downloaders should be immediately closed
|
||||
item.mu.Lock()
|
||||
assert.Nil(t, item.fd, "fd should be closed immediately when handle caching disabled")
|
||||
assert.Nil(t, item.downloaders, "downloaders should be closed immediately when handle caching disabled")
|
||||
assert.Nil(t, item.graceTimer, "grace timer should not be set when handle caching disabled")
|
||||
item.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestItemHandleCachingReset(t *testing.T) {
|
||||
r, c := newItemTestCacheHandleCaching(t, 1*time.Second)
|
||||
|
||||
_, obj, item := newFile(t, r, c, "existing")
|
||||
|
||||
// Open, read (to instantiate cache), and close the item
|
||||
require.NoError(t, item.Open(obj))
|
||||
|
||||
buf := make([]byte, 10)
|
||||
_, err := item.ReadAt(buf, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, item.Close(nil))
|
||||
|
||||
// Grace timer should be active
|
||||
item.mu.Lock()
|
||||
assert.NotNil(t, item.graceTimer, "grace timer should be set")
|
||||
item.mu.Unlock()
|
||||
|
||||
// Reset should cancel the grace timer and clean up
|
||||
rr, _, err := item.Reset()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, RemovedNotInUse, rr)
|
||||
|
||||
// After reset, grace timer should be cancelled
|
||||
item.mu.Lock()
|
||||
assert.Nil(t, item.graceTimer, "grace timer should be cancelled after reset")
|
||||
assert.Nil(t, item.fd, "fd should be closed after reset")
|
||||
item.mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -165,6 +165,11 @@ var OptionsInfo = fs.Options{{
|
||||
Default: getGID(),
|
||||
Help: "Override the gid field set by the filesystem (not supported on Windows)",
|
||||
Groups: "VFS",
|
||||
}, {
|
||||
Name: "vfs_handle_caching",
|
||||
Default: fs.Duration(5 * time.Second),
|
||||
Help: "Time to keep file handle and downloaders alive after last close",
|
||||
Groups: "VFS",
|
||||
}, {
|
||||
Name: "vfs_metadata_extension",
|
||||
Default: "",
|
||||
@@ -209,6 +214,7 @@ type Options struct {
|
||||
UsedIsSize bool `config:"vfs_used_is_size"` // if true, use the `rclone size` algorithm for Used size
|
||||
FastFingerprint bool `config:"vfs_fast_fingerprint"` // if set use fast fingerprints
|
||||
DiskSpaceTotalSize fs.SizeSuffix `config:"vfs_disk_space_total_size"`
|
||||
HandleCaching fs.Duration `config:"vfs_handle_caching"` // time to keep handle alive after last close
|
||||
MetadataExtension string `config:"vfs_metadata_extension"` // if set respond to files with this extension with metadata
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user