diff --git a/docs/content/docs.md b/docs/content/docs.md index 79db295b8..d96420bff 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -2546,6 +2546,50 @@ This sets the interval between each retry specified by `--retries` The default is `0`. Use `0` to disable. +### --resume-listings string + +Set this to a local directory path to enable resumable listings. +When a long-running listing operation (e.g. `ls`, `sync`, `copy`) is +interrupted, rclone saves checkpoint files in this directory so that +the next run can resume from where it left off rather than starting +from scratch. + +This is useful when working with very large buckets containing +millions or billions of files, where listing alone can take hours or +days. + +How it works: + +- During listing, rclone saves a checkpoint after each page of results + from the backend. +- If the operation is interrupted (e.g. Ctrl-C, crash, reboot), the + checkpoint records how far the listing got. +- On the next run with the same `--resume-listings` directory, rclone + detects the checkpoint and resumes listing from that point. +- When a listing completes successfully, its checkpoint is + automatically deleted. + +Only backends that support key-name-based pagination can resume +listings. Currently supported backends include S3, B2, and Oracle +Object Storage. Other backends will work normally but won't benefit +from resume checkpoints. + +Resume works with both normal listings and `--fast-list` (ListR). + +`--resume-listings` cannot be used with `sync` or other commands that +delete files from the destination. A resumed listing skips +already-listed source files, which would cause them to appear missing +and be deleted from the destination. Use `--resume-listings` with +`copy` or `ls` instead. + +After a resumed run completes, it is recommended to do one full run +without `--resume-listings` to catch any files that were added in the +already-processed key range between runs. + +```console +rclone copy --resume-listings /tmp/rclone-resume s3:big-bucket /local/dest +``` + ### --server-side-across-configs Allow server-side operations (e.g. copy or move) to work across diff --git a/fs/config.go b/fs/config.go index ce0a1df9e..aa2f7d90a 100644 --- a/fs/config.go +++ b/fs/config.go @@ -277,6 +277,11 @@ var ConfigOptionsInfo = Options{{ Default: false, Help: "Use recursive list if available; uses more memory but fewer transactions", Groups: "Listing", +}, { + Name: "resume_listings", + Default: "", + Help: "Local directory path for saving listing checkpoints so interrupted listings can be resumed", + Groups: "Listing", }, { Name: "list_cutoff", Default: 1_000_000, @@ -617,6 +622,7 @@ type ConfigInfo struct { Suffix string `config:"suffix"` SuffixKeepExtension bool `config:"suffix_keep_extension"` UseListR bool `config:"fast_list"` + ResumeListings string `config:"resume_listings"` ListCutoff int `config:"list_cutoff"` BufferSize SizeSuffix `config:"buffer_size"` BwLimit BwTimetable `config:"bwlimit"` diff --git a/fs/features.go b/fs/features.go index 1563dc459..6c1b8709b 100644 --- a/fs/features.go +++ b/fs/features.go @@ -174,6 +174,10 @@ type Features struct { // immediately. ListP func(ctx context.Context, dir string, callback ListRCallback) error + // CanResumeListing indicates the backend handles --resume-listings + // internally in its ListP/ListR implementations. + CanResumeListing bool + // About gets quota information from the Fs About func(ctx context.Context) (*Usage, error) @@ -456,6 +460,7 @@ func (ft *Features) Mask(ctx context.Context, f Fs) *Features { if mask.ListP == nil { ft.ListP = nil } + ft.CanResumeListing = ft.CanResumeListing && mask.CanResumeListing if mask.About == nil { ft.About = nil } diff --git a/fs/sync/sync.go b/fs/sync/sync.go index 6ec0c074f..089623bdb 100644 --- a/fs/sync/sync.go +++ b/fs/sync/sync.go @@ -1341,6 +1341,9 @@ func runSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete if deleteMode != fs.DeleteModeOff && DoMove { return fserrors.FatalError(errors.New("can't delete and move at the same time")) } + if ci.ResumeListings != "" && deleteMode != fs.DeleteModeOff { + return fserrors.FatalError(errors.New("can't use --resume-listings with sync or other delete modes - a resumed listing skips already-listed files which would cause them to be deleted from the destination")) + } // Run an extra pass to delete only if deleteMode == fs.DeleteModeBefore { if ci.TrackRenames { diff --git a/fstest/fstests/fstests.go b/fstest/fstests/fstests.go index 4db6f2907..eb873a396 100644 --- a/fstest/fstests/fstests.go +++ b/fstest/fstests/fstests.go @@ -38,6 +38,7 @@ import ( "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/random" "github.com/rclone/rclone/lib/readers" + "github.com/rclone/rclone/lib/resume" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -1135,6 +1136,154 @@ func Run(t *testing.T, opt *Opt) { fstest.CheckListing(t, f, []fstest.Item{file1, file2}) }) + // TestFsCanResumeListing tests the resumable listing feature + t.Run("FsCanResumeListing", func(t *testing.T) { + skipIfNotOk(t) + if !f.Features().CanResumeListing { + t.Skip("FS does not support resumable listings") + } + require.NotNil(t, f.Features().ListP, "CanResumeListing backends must have ListP") + + // Helper to collect all entries from ListP + listAll := func(ctx context.Context, dir string) fs.DirEntries { + t.Helper() + var entries fs.DirEntries + err := f.Features().ListP(ctx, dir, func(e fs.DirEntries) error { + entries = append(entries, e...) + return nil + }) + require.NoError(t, err) + return entries + } + + // List without resume - get baseline + baseEntries := listAll(ctx, "") + require.NotEmpty(t, baseEntries, "test requires existing files") + + t.Run("CheckpointSavedAndCleaned", func(t *testing.T) { + // List with --resume-listings enabled + resumeDir := t.TempDir() + resumeCtx, resumeCi := fs.AddConfig(ctx) + resumeCi.ResumeListings = resumeDir + + entries := listAll(resumeCtx, "") + assert.Equal(t, len(baseEntries), len(entries), "resumed listing should return same entries when no checkpoint exists") + + // After successful completion, checkpoint should be cleaned up + dirEntries, err := os.ReadDir(resumeDir) + require.NoError(t, err) + // Filter out any non-json files + var jsonFiles []os.DirEntry + for _, e := range dirEntries { + if strings.HasSuffix(e.Name(), ".json") { + jsonFiles = append(jsonFiles, e) + } + } + assert.Empty(t, jsonFiles, "checkpoint should be deleted after successful listing") + }) + + t.Run("ResumeFromCheckpoint", func(t *testing.T) { + // Sort baseline entries to find a good startAfter key + var names []string + for _, e := range baseEntries { + names = append(names, e.Remote()) + } + sort.Strings(names) + if len(names) < 2 { + t.Skip("need at least 2 entries to test resume") + } + + // Pick the first name as startAfter - resumed listing should skip it + startAfter := names[0] + + // Create a checkpoint that resumes after the first entry + resumeDir := t.TempDir() + store, err := resume.NewStore(resumeDir) + require.NoError(t, err) + remoteName := fs.ConfigString(f) + require.NoError(t, store.Save(&resume.Checkpoint{ + RemoteName: remoteName, + Dir: "", + LastKey: startAfter, + })) + + // List with resume - should skip entries <= startAfter + resumeCtx, resumeCi := fs.AddConfig(ctx) + resumeCi.ResumeListings = resumeDir + + resumedEntries := listAll(resumeCtx, "") + + var resumedNames []string + for _, e := range resumedEntries { + resumedNames = append(resumedNames, e.Remote()) + } + + // The resumed listing should have fewer entries + assert.Less(t, len(resumedEntries), len(baseEntries), + "resumed listing should have fewer entries than full listing") + + // The startAfter entry should not appear in the resumed listing + assert.NotContains(t, resumedNames, startAfter, + "entry at checkpoint should not appear in resumed listing") + + // All resumed entries should exist in the full listing + for _, name := range resumedNames { + assert.Contains(t, names, name, + "resumed entry %q should exist in full listing", name) + } + }) + + // Test ListR resume if available + if f.Features().ListR != nil { + t.Run("ListRResumeFromCheckpoint", func(t *testing.T) { + // Get baseline from ListR + var baseNames []string + err := f.Features().ListR(ctx, "", func(entries fs.DirEntries) error { + for _, e := range entries { + baseNames = append(baseNames, e.Remote()) + } + return nil + }) + require.NoError(t, err) + sort.Strings(baseNames) + if len(baseNames) < 2 { + t.Skip("need at least 2 entries to test ListR resume") + } + + startAfter := baseNames[0] + + // Create checkpoint + resumeDir := t.TempDir() + store, err := resume.NewStore(resumeDir) + require.NoError(t, err) + remoteName := fs.ConfigString(f) + require.NoError(t, store.Save(&resume.Checkpoint{ + RemoteName: remoteName, + Dir: "", + LastKey: startAfter, + })) + + // List with resume + resumeCtx, resumeCi := fs.AddConfig(ctx) + resumeCi.ResumeListings = resumeDir + + var resumedNames []string + err = f.Features().ListR(resumeCtx, "", func(entries fs.DirEntries) error { + for _, e := range entries { + resumedNames = append(resumedNames, e.Remote()) + } + return nil + }) + require.NoError(t, err) + + assert.Less(t, len(resumedNames), len(baseNames), + "resumed ListR should have fewer entries than full listing") + assert.NotContains(t, resumedNames, startAfter, + "entry at checkpoint should not appear in resumed ListR") + }) + } + }) + // TestFsNewObjectDir tests NewObject on a directory which should produce fs.ErrorIsDir if possible or fs.ErrorObjectNotFound if not t.Run("FsNewObjectDir", func(t *testing.T) { skipIfNotOk(t)