mirror of
https://github.com/rclone/rclone.git
synced 2026-03-30 21:33:15 -04:00
fs: add --resume-listings global flag and CanResumeListing feature flag
Add ResumeListings option to ConfigInfo. When set to a local directory path, rclone saves listing checkpoints so interrupted listings can be resumed from where they left off.
This commit is contained in:
@@ -2546,6 +2546,50 @@ This sets the interval between each retry specified by `--retries`
|
||||
|
||||
The default is `0`. Use `0` to disable.
|
||||
|
||||
### --resume-listings string
|
||||
|
||||
Set this to a local directory path to enable resumable listings.
|
||||
When a long-running listing operation (e.g. `ls`, `sync`, `copy`) is
|
||||
interrupted, rclone saves checkpoint files in this directory so that
|
||||
the next run can resume from where it left off rather than starting
|
||||
from scratch.
|
||||
|
||||
This is useful when working with very large buckets containing
|
||||
millions or billions of files, where listing alone can take hours or
|
||||
days.
|
||||
|
||||
How it works:
|
||||
|
||||
- During listing, rclone saves a checkpoint after each page of results
|
||||
from the backend.
|
||||
- If the operation is interrupted (e.g. Ctrl-C, crash, reboot), the
|
||||
checkpoint records how far the listing got.
|
||||
- On the next run with the same `--resume-listings` directory, rclone
|
||||
detects the checkpoint and resumes listing from that point.
|
||||
- When a listing completes successfully, its checkpoint is
|
||||
automatically deleted.
|
||||
|
||||
Only backends that support key-name-based pagination can resume
|
||||
listings. Currently supported backends include S3, B2, and Oracle
|
||||
Object Storage. Other backends will work normally but won't benefit
|
||||
from resume checkpoints.
|
||||
|
||||
Resume works with both normal listings and `--fast-list` (ListR).
|
||||
|
||||
`--resume-listings` cannot be used with `sync` or other commands that
|
||||
delete files from the destination. A resumed listing skips
|
||||
already-listed source files, which would cause them to appear missing
|
||||
and be deleted from the destination. Use `--resume-listings` with
|
||||
`copy` or `ls` instead.
|
||||
|
||||
After a resumed run completes, it is recommended to do one full run
|
||||
without `--resume-listings` to catch any files that were added in the
|
||||
already-processed key range between runs.
|
||||
|
||||
```console
|
||||
rclone copy --resume-listings /tmp/rclone-resume s3:big-bucket /local/dest
|
||||
```
|
||||
|
||||
### --server-side-across-configs
|
||||
|
||||
Allow server-side operations (e.g. copy or move) to work across
|
||||
|
||||
@@ -277,6 +277,11 @@ var ConfigOptionsInfo = Options{{
|
||||
Default: false,
|
||||
Help: "Use recursive list if available; uses more memory but fewer transactions",
|
||||
Groups: "Listing",
|
||||
}, {
|
||||
Name: "resume_listings",
|
||||
Default: "",
|
||||
Help: "Local directory path for saving listing checkpoints so interrupted listings can be resumed",
|
||||
Groups: "Listing",
|
||||
}, {
|
||||
Name: "list_cutoff",
|
||||
Default: 1_000_000,
|
||||
@@ -617,6 +622,7 @@ type ConfigInfo struct {
|
||||
Suffix string `config:"suffix"`
|
||||
SuffixKeepExtension bool `config:"suffix_keep_extension"`
|
||||
UseListR bool `config:"fast_list"`
|
||||
ResumeListings string `config:"resume_listings"`
|
||||
ListCutoff int `config:"list_cutoff"`
|
||||
BufferSize SizeSuffix `config:"buffer_size"`
|
||||
BwLimit BwTimetable `config:"bwlimit"`
|
||||
|
||||
@@ -174,6 +174,10 @@ type Features struct {
|
||||
// immediately.
|
||||
ListP func(ctx context.Context, dir string, callback ListRCallback) error
|
||||
|
||||
// CanResumeListing indicates the backend handles --resume-listings
|
||||
// internally in its ListP/ListR implementations.
|
||||
CanResumeListing bool
|
||||
|
||||
// About gets quota information from the Fs
|
||||
About func(ctx context.Context) (*Usage, error)
|
||||
|
||||
@@ -456,6 +460,7 @@ func (ft *Features) Mask(ctx context.Context, f Fs) *Features {
|
||||
if mask.ListP == nil {
|
||||
ft.ListP = nil
|
||||
}
|
||||
ft.CanResumeListing = ft.CanResumeListing && mask.CanResumeListing
|
||||
if mask.About == nil {
|
||||
ft.About = nil
|
||||
}
|
||||
|
||||
@@ -1341,6 +1341,9 @@ func runSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete
|
||||
if deleteMode != fs.DeleteModeOff && DoMove {
|
||||
return fserrors.FatalError(errors.New("can't delete and move at the same time"))
|
||||
}
|
||||
if ci.ResumeListings != "" && deleteMode != fs.DeleteModeOff {
|
||||
return fserrors.FatalError(errors.New("can't use --resume-listings with sync or other delete modes - a resumed listing skips already-listed files which would cause them to be deleted from the destination"))
|
||||
}
|
||||
// Run an extra pass to delete only
|
||||
if deleteMode == fs.DeleteModeBefore {
|
||||
if ci.TrackRenames {
|
||||
|
||||
@@ -38,6 +38,7 @@ import (
|
||||
"github.com/rclone/rclone/lib/encoder"
|
||||
"github.com/rclone/rclone/lib/random"
|
||||
"github.com/rclone/rclone/lib/readers"
|
||||
"github.com/rclone/rclone/lib/resume"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@@ -1135,6 +1136,154 @@ func Run(t *testing.T, opt *Opt) {
|
||||
fstest.CheckListing(t, f, []fstest.Item{file1, file2})
|
||||
})
|
||||
|
||||
// TestFsCanResumeListing tests the resumable listing feature
|
||||
t.Run("FsCanResumeListing", func(t *testing.T) {
|
||||
skipIfNotOk(t)
|
||||
if !f.Features().CanResumeListing {
|
||||
t.Skip("FS does not support resumable listings")
|
||||
}
|
||||
require.NotNil(t, f.Features().ListP, "CanResumeListing backends must have ListP")
|
||||
|
||||
// Helper to collect all entries from ListP
|
||||
listAll := func(ctx context.Context, dir string) fs.DirEntries {
|
||||
t.Helper()
|
||||
var entries fs.DirEntries
|
||||
err := f.Features().ListP(ctx, dir, func(e fs.DirEntries) error {
|
||||
entries = append(entries, e...)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
return entries
|
||||
}
|
||||
|
||||
// List without resume - get baseline
|
||||
baseEntries := listAll(ctx, "")
|
||||
require.NotEmpty(t, baseEntries, "test requires existing files")
|
||||
|
||||
t.Run("CheckpointSavedAndCleaned", func(t *testing.T) {
|
||||
// List with --resume-listings enabled
|
||||
resumeDir := t.TempDir()
|
||||
resumeCtx, resumeCi := fs.AddConfig(ctx)
|
||||
resumeCi.ResumeListings = resumeDir
|
||||
|
||||
entries := listAll(resumeCtx, "")
|
||||
assert.Equal(t, len(baseEntries), len(entries), "resumed listing should return same entries when no checkpoint exists")
|
||||
|
||||
// After successful completion, checkpoint should be cleaned up
|
||||
dirEntries, err := os.ReadDir(resumeDir)
|
||||
require.NoError(t, err)
|
||||
// Filter out any non-json files
|
||||
var jsonFiles []os.DirEntry
|
||||
for _, e := range dirEntries {
|
||||
if strings.HasSuffix(e.Name(), ".json") {
|
||||
jsonFiles = append(jsonFiles, e)
|
||||
}
|
||||
}
|
||||
assert.Empty(t, jsonFiles, "checkpoint should be deleted after successful listing")
|
||||
})
|
||||
|
||||
t.Run("ResumeFromCheckpoint", func(t *testing.T) {
|
||||
// Sort baseline entries to find a good startAfter key
|
||||
var names []string
|
||||
for _, e := range baseEntries {
|
||||
names = append(names, e.Remote())
|
||||
}
|
||||
sort.Strings(names)
|
||||
if len(names) < 2 {
|
||||
t.Skip("need at least 2 entries to test resume")
|
||||
}
|
||||
|
||||
// Pick the first name as startAfter - resumed listing should skip it
|
||||
startAfter := names[0]
|
||||
|
||||
// Create a checkpoint that resumes after the first entry
|
||||
resumeDir := t.TempDir()
|
||||
store, err := resume.NewStore(resumeDir)
|
||||
require.NoError(t, err)
|
||||
remoteName := fs.ConfigString(f)
|
||||
require.NoError(t, store.Save(&resume.Checkpoint{
|
||||
RemoteName: remoteName,
|
||||
Dir: "",
|
||||
LastKey: startAfter,
|
||||
}))
|
||||
|
||||
// List with resume - should skip entries <= startAfter
|
||||
resumeCtx, resumeCi := fs.AddConfig(ctx)
|
||||
resumeCi.ResumeListings = resumeDir
|
||||
|
||||
resumedEntries := listAll(resumeCtx, "")
|
||||
|
||||
var resumedNames []string
|
||||
for _, e := range resumedEntries {
|
||||
resumedNames = append(resumedNames, e.Remote())
|
||||
}
|
||||
|
||||
// The resumed listing should have fewer entries
|
||||
assert.Less(t, len(resumedEntries), len(baseEntries),
|
||||
"resumed listing should have fewer entries than full listing")
|
||||
|
||||
// The startAfter entry should not appear in the resumed listing
|
||||
assert.NotContains(t, resumedNames, startAfter,
|
||||
"entry at checkpoint should not appear in resumed listing")
|
||||
|
||||
// All resumed entries should exist in the full listing
|
||||
for _, name := range resumedNames {
|
||||
assert.Contains(t, names, name,
|
||||
"resumed entry %q should exist in full listing", name)
|
||||
}
|
||||
})
|
||||
|
||||
// Test ListR resume if available
|
||||
if f.Features().ListR != nil {
|
||||
t.Run("ListRResumeFromCheckpoint", func(t *testing.T) {
|
||||
// Get baseline from ListR
|
||||
var baseNames []string
|
||||
err := f.Features().ListR(ctx, "", func(entries fs.DirEntries) error {
|
||||
for _, e := range entries {
|
||||
baseNames = append(baseNames, e.Remote())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
sort.Strings(baseNames)
|
||||
if len(baseNames) < 2 {
|
||||
t.Skip("need at least 2 entries to test ListR resume")
|
||||
}
|
||||
|
||||
startAfter := baseNames[0]
|
||||
|
||||
// Create checkpoint
|
||||
resumeDir := t.TempDir()
|
||||
store, err := resume.NewStore(resumeDir)
|
||||
require.NoError(t, err)
|
||||
remoteName := fs.ConfigString(f)
|
||||
require.NoError(t, store.Save(&resume.Checkpoint{
|
||||
RemoteName: remoteName,
|
||||
Dir: "",
|
||||
LastKey: startAfter,
|
||||
}))
|
||||
|
||||
// List with resume
|
||||
resumeCtx, resumeCi := fs.AddConfig(ctx)
|
||||
resumeCi.ResumeListings = resumeDir
|
||||
|
||||
var resumedNames []string
|
||||
err = f.Features().ListR(resumeCtx, "", func(entries fs.DirEntries) error {
|
||||
for _, e := range entries {
|
||||
resumedNames = append(resumedNames, e.Remote())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Less(t, len(resumedNames), len(baseNames),
|
||||
"resumed ListR should have fewer entries than full listing")
|
||||
assert.NotContains(t, resumedNames, startAfter,
|
||||
"entry at checkpoint should not appear in resumed ListR")
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
// TestFsNewObjectDir tests NewObject on a directory which should produce fs.ErrorIsDir if possible or fs.ErrorObjectNotFound if not
|
||||
t.Run("FsNewObjectDir", func(t *testing.T) {
|
||||
skipIfNotOk(t)
|
||||
|
||||
Reference in New Issue
Block a user