mirror of
https://github.com/rclone/rclone.git
synced 2026-03-26 19:32:38 -04:00
march: fix unnecessarily listing dst directory when src listing finished
When doing a copy (no delete mode) without a logger, the destination listing can be cancelled as soon as the source listing finishes, since dst-only entries won't be processed. This is particularly beneficial with --fast-list where the dst listing may fetch the entire directory tree upfront via ListR. Cancelling it early avoids waiting for a potentially large listing that won't be used. Adds NoProcessDstOnly flag to March which, when set, cancels the dst listing context once the source channel is exhausted in matchListings. Fixes #9226
This commit is contained in:
@@ -44,6 +44,7 @@ type March struct {
|
||||
Callback Marcher // object to call with results
|
||||
NoCheckDest bool // transfer all objects regardless without checking dst
|
||||
NoUnicodeNormalization bool // don't normalize unicode characters in filenames
|
||||
NoProcessDstOnly bool // if set, when source listing finishes, cancel the dst listing
|
||||
// internal state
|
||||
srcListDir listDirFn // function to call to list a directory in the src
|
||||
dstListDir listDirFn // function to call to list a directory in the dst
|
||||
@@ -297,7 +298,7 @@ func (m *March) aborting() bool {
|
||||
// Into match go matchPair's of src and dst which have the same name
|
||||
//
|
||||
// This checks for duplicates and checks the list is sorted.
|
||||
func (m *March) matchListings(srcChan, dstChan <-chan fs.DirEntry, srcOnly, dstOnly func(fs.DirEntry), match func(dst, src fs.DirEntry)) error {
|
||||
func (m *March) matchListings(srcChan, dstChan <-chan fs.DirEntry, dstCancel func(), srcOnly, dstOnly func(fs.DirEntry), match func(dst, src fs.DirEntry)) error {
|
||||
var (
|
||||
srcPrev, dstPrev fs.DirEntry
|
||||
srcPrevName, dstPrevName string
|
||||
@@ -326,6 +327,12 @@ func (m *March) matchListings(srcChan, dstChan <-chan fs.DirEntry, srcOnly, dstO
|
||||
src, srcHasMore = <-srcChan
|
||||
srcName = m.srcKey(src)
|
||||
}
|
||||
// If the source listing is finished and we don't need
|
||||
// dst-only entries, cancel the dst listing early.
|
||||
if !srcHasMore && m.NoProcessDstOnly {
|
||||
dstCancel()
|
||||
break
|
||||
}
|
||||
if dst == nil {
|
||||
dst, dstHasMore = <-dstChan
|
||||
dstName = m.dstKey(dst)
|
||||
@@ -392,9 +399,12 @@ func (m *March) processJob(job listDirJob) ([]listDirJob, error) {
|
||||
srcChan = make(chan fs.DirEntry, 100)
|
||||
dstChan = make(chan fs.DirEntry, 100)
|
||||
srcListErr, dstListErr error
|
||||
dstListCancelled bool
|
||||
wg sync.WaitGroup
|
||||
ci = fs.GetConfig(m.Ctx)
|
||||
dstCtx, dstCancel = context.WithCancel(m.Ctx)
|
||||
)
|
||||
defer dstCancel()
|
||||
|
||||
// List the src and dst directories
|
||||
if !job.noSrc {
|
||||
@@ -415,12 +425,19 @@ func (m *March) processJob(job listDirJob) ([]listDirJob, error) {
|
||||
if !m.NoTraverse && !job.noDst {
|
||||
startedDst = true
|
||||
wg.Go(func() {
|
||||
dstListErr = m.dstListDir(m.Ctx, job.dstRemote, func(entries fs.DirEntries) error {
|
||||
dstListErr = m.dstListDir(dstCtx, job.dstRemote, func(entries fs.DirEntries) error {
|
||||
for _, entry := range entries {
|
||||
dstChan <- entry
|
||||
select {
|
||||
case <-dstCtx.Done():
|
||||
return dstCtx.Err()
|
||||
case dstChan <- entry:
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if dstCtx.Err() != nil {
|
||||
dstListCancelled = true
|
||||
}
|
||||
close(dstChan)
|
||||
})
|
||||
}
|
||||
@@ -502,7 +519,7 @@ func (m *March) processJob(job listDirJob) ([]listDirJob, error) {
|
||||
}
|
||||
|
||||
// Work out what to do and do it
|
||||
err := m.matchListings(srcChan, dstChan, func(src fs.DirEntry) {
|
||||
err := m.matchListings(srcChan, dstChan, dstCancel, func(src fs.DirEntry) {
|
||||
recurse := m.Callback.SrcOnly(src)
|
||||
if recurse && job.srcDepth > 0 {
|
||||
jobs = append(jobs, listDirJob{
|
||||
@@ -548,7 +565,9 @@ func (m *March) processJob(job listDirJob) ([]listDirJob, error) {
|
||||
srcListErr = fs.CountError(m.Ctx, srcListErr)
|
||||
return nil, srcListErr
|
||||
}
|
||||
if dstListErr == fs.ErrorDirNotFound {
|
||||
if dstListCancelled {
|
||||
// Ignore dst listing errors if we cancelled it
|
||||
} else if dstListErr == fs.ErrorDirNotFound {
|
||||
// Copy the stuff anyway
|
||||
} else if dstListErr != nil {
|
||||
if job.dstRemote != "" {
|
||||
|
||||
@@ -274,6 +274,110 @@ func TestMarch(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarchNoProcessDstOnly(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
what string
|
||||
fileSrcOnly []string
|
||||
dirSrcOnly []string
|
||||
fileDstOnly []string
|
||||
dirDstOnly []string
|
||||
fileMatch []string
|
||||
dirMatch []string
|
||||
noProcessDstOnly bool
|
||||
fastList bool
|
||||
}{
|
||||
{
|
||||
// dst-only files sort after all src files so they are
|
||||
// skipped when NoProcessDstOnly cancels the dst listing.
|
||||
what: "no process dst only - typical sync",
|
||||
fileSrcOnly: []string{"a_srcOnly"},
|
||||
fileMatch: []string{"b_match"},
|
||||
fileDstOnly: []string{"z_dstOnly1", "z_dstOnly2"},
|
||||
noProcessDstOnly: true,
|
||||
},
|
||||
{
|
||||
what: "no process dst only - fast list",
|
||||
fileSrcOnly: []string{"a_srcOnly"},
|
||||
fileMatch: []string{"b_match"},
|
||||
fileDstOnly: []string{"z_dstOnly1", "z_dstOnly2"},
|
||||
noProcessDstOnly: true,
|
||||
fastList: true,
|
||||
},
|
||||
{
|
||||
what: "no process dst only - source only",
|
||||
fileSrcOnly: []string{"test", "test2"},
|
||||
noProcessDstOnly: true,
|
||||
},
|
||||
{
|
||||
what: "no process dst only - dest only skipped",
|
||||
fileSrcOnly: []string{"a_src"},
|
||||
fileDstOnly: []string{"z_dstOnly1", "z_dstOnly2"},
|
||||
noProcessDstOnly: true,
|
||||
},
|
||||
} {
|
||||
t.Run(fmt.Sprintf("TestMarch-%s", test.what), func(t *testing.T) {
|
||||
r := fstest.NewRun(t)
|
||||
|
||||
var srcOnly []fstest.Item
|
||||
var match []fstest.Item
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
for _, f := range test.fileSrcOnly {
|
||||
srcOnly = append(srcOnly, r.WriteFile(f, "hello world", t1))
|
||||
}
|
||||
for _, f := range test.fileDstOnly {
|
||||
r.WriteObject(ctx, f, "hello world", t1)
|
||||
}
|
||||
for _, f := range test.fileMatch {
|
||||
match = append(match, r.WriteBoth(ctx, f, "hello world", t1))
|
||||
}
|
||||
|
||||
ctx, ci := fs.AddConfig(ctx)
|
||||
ci.UseListR = test.fastList
|
||||
|
||||
fi := filter.GetConfig(ctx)
|
||||
|
||||
// Local backend doesn't implement ListR, so monkey patch it for this test
|
||||
if test.fastList && r.Flocal.Features().ListR == nil {
|
||||
r.Flocal.Features().ListR = func(ctx context.Context, dir string, callback fs.ListRCallback) error {
|
||||
r.Flocal.Features().ListR = nil
|
||||
return walk.ListR(ctx, r.Flocal, dir, true, -1, walk.ListAll, callback)
|
||||
}
|
||||
defer func() {
|
||||
r.Flocal.Features().ListR = nil
|
||||
}()
|
||||
}
|
||||
|
||||
mt := &marchTester{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
m := &March{
|
||||
Ctx: ctx,
|
||||
Fdst: r.Fremote,
|
||||
Fsrc: r.Flocal,
|
||||
Dir: "",
|
||||
Callback: mt,
|
||||
DstIncludeAll: fi.Opt.DeleteExcluded,
|
||||
NoProcessDstOnly: test.noProcessDstOnly,
|
||||
}
|
||||
|
||||
mt.processError(m.Run(ctx))
|
||||
mt.cancel()
|
||||
err := mt.currentError()
|
||||
require.NoError(t, err)
|
||||
|
||||
precision := fs.GetModifyWindow(ctx, r.Fremote, r.Flocal)
|
||||
|
||||
// With NoProcessDstOnly, dst-only entries should not be reported
|
||||
fstest.CompareItems(t, mt.srcOnly, srcOnly, test.dirSrcOnly, precision, "srcOnly")
|
||||
assert.Empty(t, mt.dstOnly, "dstOnly should be empty with NoProcessDstOnly")
|
||||
fstest.CompareItems(t, mt.match, match, test.dirMatch, precision, "match")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// matchPair is a matched pair of direntries returned by matchListings
|
||||
type matchPair struct {
|
||||
src, dst fs.DirEntry
|
||||
@@ -535,7 +639,7 @@ func TestMatchListings(t *testing.T) {
|
||||
matches = append(matches, matchPair{dst: dst, src: src})
|
||||
}
|
||||
|
||||
err := m.matchListings(makeChan(0), makeChan(1), srcOnlyFn, dstOnlyFn, matchFn)
|
||||
err := m.matchListings(makeChan(0), makeChan(1), func() {}, srcOnlyFn, dstOnlyFn, matchFn)
|
||||
require.NoError(t, err)
|
||||
wg.Wait()
|
||||
assert.Equal(t, test.srcOnly, srcOnly, test.what, "srcOnly differ")
|
||||
@@ -544,7 +648,7 @@ func TestMatchListings(t *testing.T) {
|
||||
|
||||
// now swap src and dst
|
||||
srcOnly, dstOnly, matches = nil, nil, nil
|
||||
err = m.matchListings(makeChan(0), makeChan(1), srcOnlyFn, dstOnlyFn, matchFn)
|
||||
err = m.matchListings(makeChan(0), makeChan(1), func() {}, srcOnlyFn, dstOnlyFn, matchFn)
|
||||
require.NoError(t, err)
|
||||
wg.Wait()
|
||||
assert.Equal(t, test.srcOnly, srcOnly, test.what, "srcOnly differ")
|
||||
@@ -553,3 +657,138 @@ func TestMatchListings(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMatchListingsNoProcessDstOnly(t *testing.T) {
|
||||
var (
|
||||
a = mockobject.Object("a")
|
||||
b = mockobject.Object("b")
|
||||
c = mockobject.Object("c")
|
||||
d = mockobject.Object("d")
|
||||
)
|
||||
|
||||
for _, test := range []struct {
|
||||
what string
|
||||
input fs.DirEntries // pairs of input src, dst
|
||||
srcOnly fs.DirEntries
|
||||
matches []matchPair
|
||||
}{
|
||||
{
|
||||
what: "src and dst with dst-only skipped",
|
||||
input: fs.DirEntries{
|
||||
a, nil,
|
||||
b, b,
|
||||
nil, c,
|
||||
nil, d,
|
||||
},
|
||||
srcOnly: fs.DirEntries{
|
||||
a,
|
||||
},
|
||||
matches: []matchPair{
|
||||
{b, b},
|
||||
},
|
||||
},
|
||||
{
|
||||
what: "all dst-only skipped",
|
||||
input: fs.DirEntries{
|
||||
nil, a,
|
||||
nil, b,
|
||||
nil, c,
|
||||
},
|
||||
},
|
||||
{
|
||||
what: "all src-only",
|
||||
input: fs.DirEntries{
|
||||
a, nil,
|
||||
b, nil,
|
||||
c, nil,
|
||||
},
|
||||
srcOnly: fs.DirEntries{
|
||||
a, b, c,
|
||||
},
|
||||
},
|
||||
{
|
||||
what: "matches then dst-only skipped",
|
||||
input: fs.DirEntries{
|
||||
a, a,
|
||||
b, b,
|
||||
nil, c,
|
||||
nil, d,
|
||||
},
|
||||
matches: []matchPair{
|
||||
{a, a},
|
||||
{b, b},
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(test.what, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
var wg sync.WaitGroup
|
||||
|
||||
m := March{
|
||||
Ctx: context.Background(),
|
||||
NoProcessDstOnly: true,
|
||||
}
|
||||
|
||||
// makeChan creates a channel sending sorted entries.
|
||||
// It returns the channel and a drain function to unblock
|
||||
// senders if the channel isn't fully consumed.
|
||||
makeChan := func(offset int) (<-chan fs.DirEntry, func()) {
|
||||
out := make(chan fs.DirEntry)
|
||||
key := m.dstKey
|
||||
if offset == 0 {
|
||||
key = m.srcKey
|
||||
}
|
||||
ls, err := list.NewSorter(ctx, nil, list.SortToChan(out), key)
|
||||
require.NoError(t, err)
|
||||
wg.Go(func() {
|
||||
for i := 0; i < len(test.input); i += 2 {
|
||||
entry := test.input[i+offset]
|
||||
if entry != nil {
|
||||
require.NoError(t, ls.Add(fs.DirEntries{entry}))
|
||||
}
|
||||
}
|
||||
require.NoError(t, ls.Send())
|
||||
ls.CleanUp()
|
||||
close(out)
|
||||
})
|
||||
drain := func() {
|
||||
for range out {
|
||||
}
|
||||
}
|
||||
return out, drain
|
||||
}
|
||||
|
||||
var srcOnly fs.DirEntries
|
||||
srcOnlyFn := func(entry fs.DirEntry) {
|
||||
srcOnly = append(srcOnly, entry)
|
||||
}
|
||||
var dstOnly fs.DirEntries
|
||||
dstOnlyFn := func(entry fs.DirEntry) {
|
||||
dstOnly = append(dstOnly, entry)
|
||||
}
|
||||
var matches []matchPair
|
||||
matchFn := func(dst, src fs.DirEntry) {
|
||||
matches = append(matches, matchPair{dst: dst, src: src})
|
||||
}
|
||||
|
||||
srcChan, _ := makeChan(0)
|
||||
dstChan, drainDst := makeChan(1)
|
||||
|
||||
cancelled := false
|
||||
dstCancel := func() {
|
||||
cancelled = true
|
||||
// Drain remaining dst entries to unblock the sender,
|
||||
// mimicking what the real code does via context cancellation.
|
||||
go drainDst()
|
||||
}
|
||||
|
||||
err := m.matchListings(srcChan, dstChan, dstCancel, srcOnlyFn, dstOnlyFn, matchFn)
|
||||
require.NoError(t, err)
|
||||
wg.Wait()
|
||||
assert.Equal(t, test.srcOnly, srcOnly, test.what, "srcOnly differ")
|
||||
assert.Empty(t, dstOnly, test.what, "dstOnly should be empty")
|
||||
assert.Equal(t, test.matches, matches, test.what, "matches differ")
|
||||
assert.True(t, cancelled, "dstCancel should have been called")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -948,6 +948,7 @@ func (s *syncCopyMove) run() error {
|
||||
DstIncludeAll: s.fi.Opt.DeleteExcluded,
|
||||
NoCheckDest: s.noCheckDest,
|
||||
NoUnicodeNormalization: s.noUnicodeNormalization,
|
||||
NoProcessDstOnly: s.deleteMode == fs.DeleteModeOff && !s.usingLogger,
|
||||
}
|
||||
s.processError(m.Run(s.ctx))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user