From 9813ecdd9b6f1120cd8d3ff7e383e508e50df7a4 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 5 Mar 2026 16:37:14 +0000 Subject: [PATCH] march: fix unnecessarily listing dst directory when src listing finished When doing a copy (no delete mode) without a logger, the destination listing can be cancelled as soon as the source listing finishes, since dst-only entries won't be processed. This is particularly beneficial with --fast-list where the dst listing may fetch the entire directory tree upfront via ListR. Cancelling it early avoids waiting for a potentially large listing that won't be used. Adds NoProcessDstOnly flag to March which, when set, cancels the dst listing context once the source channel is exhausted in matchListings. Fixes #9226 --- fs/march/march.go | 29 ++++- fs/march/march_test.go | 243 ++++++++++++++++++++++++++++++++++++++++- fs/sync/sync.go | 1 + 3 files changed, 266 insertions(+), 7 deletions(-) diff --git a/fs/march/march.go b/fs/march/march.go index 4ba97a2f9..effd240bd 100644 --- a/fs/march/march.go +++ b/fs/march/march.go @@ -44,6 +44,7 @@ type March struct { Callback Marcher // object to call with results NoCheckDest bool // transfer all objects regardless without checking dst NoUnicodeNormalization bool // don't normalize unicode characters in filenames + NoProcessDstOnly bool // if set, when source listing finishes, cancel the dst listing // internal state srcListDir listDirFn // function to call to list a directory in the src dstListDir listDirFn // function to call to list a directory in the dst @@ -297,7 +298,7 @@ func (m *March) aborting() bool { // Into match go matchPair's of src and dst which have the same name // // This checks for duplicates and checks the list is sorted. -func (m *March) matchListings(srcChan, dstChan <-chan fs.DirEntry, srcOnly, dstOnly func(fs.DirEntry), match func(dst, src fs.DirEntry)) error { +func (m *March) matchListings(srcChan, dstChan <-chan fs.DirEntry, dstCancel func(), srcOnly, dstOnly func(fs.DirEntry), match func(dst, src fs.DirEntry)) error { var ( srcPrev, dstPrev fs.DirEntry srcPrevName, dstPrevName string @@ -326,6 +327,12 @@ func (m *March) matchListings(srcChan, dstChan <-chan fs.DirEntry, srcOnly, dstO src, srcHasMore = <-srcChan srcName = m.srcKey(src) } + // If the source listing is finished and we don't need + // dst-only entries, cancel the dst listing early. + if !srcHasMore && m.NoProcessDstOnly { + dstCancel() + break + } if dst == nil { dst, dstHasMore = <-dstChan dstName = m.dstKey(dst) @@ -392,9 +399,12 @@ func (m *March) processJob(job listDirJob) ([]listDirJob, error) { srcChan = make(chan fs.DirEntry, 100) dstChan = make(chan fs.DirEntry, 100) srcListErr, dstListErr error + dstListCancelled bool wg sync.WaitGroup ci = fs.GetConfig(m.Ctx) + dstCtx, dstCancel = context.WithCancel(m.Ctx) ) + defer dstCancel() // List the src and dst directories if !job.noSrc { @@ -415,12 +425,19 @@ func (m *March) processJob(job listDirJob) ([]listDirJob, error) { if !m.NoTraverse && !job.noDst { startedDst = true wg.Go(func() { - dstListErr = m.dstListDir(m.Ctx, job.dstRemote, func(entries fs.DirEntries) error { + dstListErr = m.dstListDir(dstCtx, job.dstRemote, func(entries fs.DirEntries) error { for _, entry := range entries { - dstChan <- entry + select { + case <-dstCtx.Done(): + return dstCtx.Err() + case dstChan <- entry: + } } return nil }) + if dstCtx.Err() != nil { + dstListCancelled = true + } close(dstChan) }) } @@ -502,7 +519,7 @@ func (m *March) processJob(job listDirJob) ([]listDirJob, error) { } // Work out what to do and do it - err := m.matchListings(srcChan, dstChan, func(src fs.DirEntry) { + err := m.matchListings(srcChan, dstChan, dstCancel, func(src fs.DirEntry) { recurse := m.Callback.SrcOnly(src) if recurse && job.srcDepth > 0 { jobs = append(jobs, listDirJob{ @@ -548,7 +565,9 @@ func (m *March) processJob(job listDirJob) ([]listDirJob, error) { srcListErr = fs.CountError(m.Ctx, srcListErr) return nil, srcListErr } - if dstListErr == fs.ErrorDirNotFound { + if dstListCancelled { + // Ignore dst listing errors if we cancelled it + } else if dstListErr == fs.ErrorDirNotFound { // Copy the stuff anyway } else if dstListErr != nil { if job.dstRemote != "" { diff --git a/fs/march/march_test.go b/fs/march/march_test.go index d1a866538..2a916c351 100644 --- a/fs/march/march_test.go +++ b/fs/march/march_test.go @@ -274,6 +274,110 @@ func TestMarch(t *testing.T) { } } +func TestMarchNoProcessDstOnly(t *testing.T) { + for _, test := range []struct { + what string + fileSrcOnly []string + dirSrcOnly []string + fileDstOnly []string + dirDstOnly []string + fileMatch []string + dirMatch []string + noProcessDstOnly bool + fastList bool + }{ + { + // dst-only files sort after all src files so they are + // skipped when NoProcessDstOnly cancels the dst listing. + what: "no process dst only - typical sync", + fileSrcOnly: []string{"a_srcOnly"}, + fileMatch: []string{"b_match"}, + fileDstOnly: []string{"z_dstOnly1", "z_dstOnly2"}, + noProcessDstOnly: true, + }, + { + what: "no process dst only - fast list", + fileSrcOnly: []string{"a_srcOnly"}, + fileMatch: []string{"b_match"}, + fileDstOnly: []string{"z_dstOnly1", "z_dstOnly2"}, + noProcessDstOnly: true, + fastList: true, + }, + { + what: "no process dst only - source only", + fileSrcOnly: []string{"test", "test2"}, + noProcessDstOnly: true, + }, + { + what: "no process dst only - dest only skipped", + fileSrcOnly: []string{"a_src"}, + fileDstOnly: []string{"z_dstOnly1", "z_dstOnly2"}, + noProcessDstOnly: true, + }, + } { + t.Run(fmt.Sprintf("TestMarch-%s", test.what), func(t *testing.T) { + r := fstest.NewRun(t) + + var srcOnly []fstest.Item + var match []fstest.Item + + ctx, cancel := context.WithCancel(context.Background()) + + for _, f := range test.fileSrcOnly { + srcOnly = append(srcOnly, r.WriteFile(f, "hello world", t1)) + } + for _, f := range test.fileDstOnly { + r.WriteObject(ctx, f, "hello world", t1) + } + for _, f := range test.fileMatch { + match = append(match, r.WriteBoth(ctx, f, "hello world", t1)) + } + + ctx, ci := fs.AddConfig(ctx) + ci.UseListR = test.fastList + + fi := filter.GetConfig(ctx) + + // Local backend doesn't implement ListR, so monkey patch it for this test + if test.fastList && r.Flocal.Features().ListR == nil { + r.Flocal.Features().ListR = func(ctx context.Context, dir string, callback fs.ListRCallback) error { + r.Flocal.Features().ListR = nil + return walk.ListR(ctx, r.Flocal, dir, true, -1, walk.ListAll, callback) + } + defer func() { + r.Flocal.Features().ListR = nil + }() + } + + mt := &marchTester{ + ctx: ctx, + cancel: cancel, + } + m := &March{ + Ctx: ctx, + Fdst: r.Fremote, + Fsrc: r.Flocal, + Dir: "", + Callback: mt, + DstIncludeAll: fi.Opt.DeleteExcluded, + NoProcessDstOnly: test.noProcessDstOnly, + } + + mt.processError(m.Run(ctx)) + mt.cancel() + err := mt.currentError() + require.NoError(t, err) + + precision := fs.GetModifyWindow(ctx, r.Fremote, r.Flocal) + + // With NoProcessDstOnly, dst-only entries should not be reported + fstest.CompareItems(t, mt.srcOnly, srcOnly, test.dirSrcOnly, precision, "srcOnly") + assert.Empty(t, mt.dstOnly, "dstOnly should be empty with NoProcessDstOnly") + fstest.CompareItems(t, mt.match, match, test.dirMatch, precision, "match") + }) + } +} + // matchPair is a matched pair of direntries returned by matchListings type matchPair struct { src, dst fs.DirEntry @@ -535,7 +639,7 @@ func TestMatchListings(t *testing.T) { matches = append(matches, matchPair{dst: dst, src: src}) } - err := m.matchListings(makeChan(0), makeChan(1), srcOnlyFn, dstOnlyFn, matchFn) + err := m.matchListings(makeChan(0), makeChan(1), func() {}, srcOnlyFn, dstOnlyFn, matchFn) require.NoError(t, err) wg.Wait() assert.Equal(t, test.srcOnly, srcOnly, test.what, "srcOnly differ") @@ -544,7 +648,7 @@ func TestMatchListings(t *testing.T) { // now swap src and dst srcOnly, dstOnly, matches = nil, nil, nil - err = m.matchListings(makeChan(0), makeChan(1), srcOnlyFn, dstOnlyFn, matchFn) + err = m.matchListings(makeChan(0), makeChan(1), func() {}, srcOnlyFn, dstOnlyFn, matchFn) require.NoError(t, err) wg.Wait() assert.Equal(t, test.srcOnly, srcOnly, test.what, "srcOnly differ") @@ -553,3 +657,138 @@ func TestMatchListings(t *testing.T) { }) } } + +func TestMatchListingsNoProcessDstOnly(t *testing.T) { + var ( + a = mockobject.Object("a") + b = mockobject.Object("b") + c = mockobject.Object("c") + d = mockobject.Object("d") + ) + + for _, test := range []struct { + what string + input fs.DirEntries // pairs of input src, dst + srcOnly fs.DirEntries + matches []matchPair + }{ + { + what: "src and dst with dst-only skipped", + input: fs.DirEntries{ + a, nil, + b, b, + nil, c, + nil, d, + }, + srcOnly: fs.DirEntries{ + a, + }, + matches: []matchPair{ + {b, b}, + }, + }, + { + what: "all dst-only skipped", + input: fs.DirEntries{ + nil, a, + nil, b, + nil, c, + }, + }, + { + what: "all src-only", + input: fs.DirEntries{ + a, nil, + b, nil, + c, nil, + }, + srcOnly: fs.DirEntries{ + a, b, c, + }, + }, + { + what: "matches then dst-only skipped", + input: fs.DirEntries{ + a, a, + b, b, + nil, c, + nil, d, + }, + matches: []matchPair{ + {a, a}, + {b, b}, + }, + }, + } { + t.Run(test.what, func(t *testing.T) { + ctx := context.Background() + var wg sync.WaitGroup + + m := March{ + Ctx: context.Background(), + NoProcessDstOnly: true, + } + + // makeChan creates a channel sending sorted entries. + // It returns the channel and a drain function to unblock + // senders if the channel isn't fully consumed. + makeChan := func(offset int) (<-chan fs.DirEntry, func()) { + out := make(chan fs.DirEntry) + key := m.dstKey + if offset == 0 { + key = m.srcKey + } + ls, err := list.NewSorter(ctx, nil, list.SortToChan(out), key) + require.NoError(t, err) + wg.Go(func() { + for i := 0; i < len(test.input); i += 2 { + entry := test.input[i+offset] + if entry != nil { + require.NoError(t, ls.Add(fs.DirEntries{entry})) + } + } + require.NoError(t, ls.Send()) + ls.CleanUp() + close(out) + }) + drain := func() { + for range out { + } + } + return out, drain + } + + var srcOnly fs.DirEntries + srcOnlyFn := func(entry fs.DirEntry) { + srcOnly = append(srcOnly, entry) + } + var dstOnly fs.DirEntries + dstOnlyFn := func(entry fs.DirEntry) { + dstOnly = append(dstOnly, entry) + } + var matches []matchPair + matchFn := func(dst, src fs.DirEntry) { + matches = append(matches, matchPair{dst: dst, src: src}) + } + + srcChan, _ := makeChan(0) + dstChan, drainDst := makeChan(1) + + cancelled := false + dstCancel := func() { + cancelled = true + // Drain remaining dst entries to unblock the sender, + // mimicking what the real code does via context cancellation. + go drainDst() + } + + err := m.matchListings(srcChan, dstChan, dstCancel, srcOnlyFn, dstOnlyFn, matchFn) + require.NoError(t, err) + wg.Wait() + assert.Equal(t, test.srcOnly, srcOnly, test.what, "srcOnly differ") + assert.Empty(t, dstOnly, test.what, "dstOnly should be empty") + assert.Equal(t, test.matches, matches, test.what, "matches differ") + assert.True(t, cancelled, "dstCancel should have been called") + }) + } +} diff --git a/fs/sync/sync.go b/fs/sync/sync.go index 6ec0c074f..eb55e4a44 100644 --- a/fs/sync/sync.go +++ b/fs/sync/sync.go @@ -948,6 +948,7 @@ func (s *syncCopyMove) run() error { DstIncludeAll: s.fi.Opt.DeleteExcluded, NoCheckDest: s.noCheckDest, NoUnicodeNormalization: s.noUnicodeNormalization, + NoProcessDstOnly: s.deleteMode == fs.DeleteModeOff && !s.usingLogger, } s.processError(m.Run(s.ctx))