chore: tweak pull retry logic (#10491)

Signed-off-by: Jakob Borg <jakob@kastelo.net>
This commit is contained in:
Jakob Borg
2025-12-23 08:26:58 +01:00
committed by GitHub
parent b9ab05af02
commit f57e92c20a

View File

@@ -202,9 +202,8 @@ func (f *sendReceiveFolder) pull(ctx context.Context) (bool, error) {
f.sl.DebugContext(ctx, "Pull iteration completed", "changed", changed, "try", tries+1)
if changed == 0 {
// No files were changed by the puller, so we are in
// sync (except for unrecoverable stuff like invalid
// filenames on windows).
// No files were changed by the puller, so we are in sync, or we
// are unable to make further progress for the moment.
break
}
}
@@ -231,7 +230,9 @@ func (f *sendReceiveFolder) pull(ctx context.Context) (bool, error) {
})
}
return changed == 0, nil
// We're done if we didn't change anything and didn't fail to change
// anything
return changed == 0 && pullErrNum == 0, nil
}
// pullerIteration runs a single puller iteration for the given folder and
@@ -256,9 +257,10 @@ func (f *sendReceiveFolder) pullerIteration(ctx context.Context, scanChan chan<-
f.sl.DebugContext(ctx, "Starting puller iteration", "copiers", f.Copiers, "pullerPendingKiB", f.PullerMaxPendingKiB)
updateWg.Add(1)
var changed int // only read after updateWg closes
go func() {
// dbUpdaterRoutine finishes when dbUpdateChan is closed
f.dbUpdaterRoutine(dbUpdateChan)
changed = f.dbUpdaterRoutine(dbUpdateChan)
updateWg.Done()
}()
@@ -285,7 +287,7 @@ func (f *sendReceiveFolder) pullerIteration(ctx context.Context, scanChan chan<-
doneWg.Done()
}()
changed, fileDeletions, dirDeletions, err := f.processNeeded(ctx, dbUpdateChan, copyChan, scanChan)
fileDeletions, dirDeletions, err := f.processNeeded(ctx, dbUpdateChan, copyChan, scanChan)
// Signal copy and puller routines that we are done with the in data for
// this iteration. Wait for them to finish.
@@ -312,8 +314,7 @@ func (f *sendReceiveFolder) pullerIteration(ctx context.Context, scanChan chan<-
return changed, err
}
func (f *sendReceiveFolder) processNeeded(ctx context.Context, dbUpdateChan chan<- dbUpdateJob, copyChan chan<- copyBlocksState, scanChan chan<- string) (int, map[string]protocol.FileInfo, []protocol.FileInfo, error) {
changed := 0
func (f *sendReceiveFolder) processNeeded(ctx context.Context, dbUpdateChan chan<- dbUpdateJob, copyChan chan<- copyBlocksState, scanChan chan<- string) (map[string]protocol.FileInfo, []protocol.FileInfo, error) {
var dirDeletions []protocol.FileInfo
fileDeletions := map[string]protocol.FileInfo{}
buckets := map[string][]protocol.FileInfo{}
@@ -325,7 +326,7 @@ func (f *sendReceiveFolder) processNeeded(ctx context.Context, dbUpdateChan chan
loop:
for file, err := range itererr.Zip(f.model.sdb.AllNeededGlobalFiles(f.folderID, protocol.LocalDeviceID, f.Order, 0, 0)) {
if err != nil {
return changed, nil, nil, err
return nil, nil, err
}
select {
case <-ctx.Done():
@@ -338,8 +339,6 @@ loop:
continue
}
changed++
switch {
case f.ignores.Match(file.Name).IsIgnored():
file.SetIgnored()
@@ -357,8 +356,6 @@ loop:
// We can't pull an invalid file. Grab the error again since
// we couldn't assign it directly in the case clause.
f.newPullError(file.Name, fs.WindowsInvalidFilename(file.Name))
// No reason to retry for this
changed--
}
case file.IsDeleted():
@@ -372,7 +369,7 @@ loop:
default:
df, ok, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
if err != nil {
return changed, nil, nil, err
return nil, nil, err
}
// Local file can be already deleted, but with a lower version
// number, hence the deletion coming in again as part of
@@ -391,7 +388,7 @@ loop:
case file.Type == protocol.FileInfoTypeFile:
curFile, hasCurFile, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
if err != nil {
return changed, nil, nil, err
return nil, nil, err
}
if hasCurFile && file.BlocksEqual(curFile) {
// We are supposed to copy the entire file, and then fetch nothing. We
@@ -431,7 +428,7 @@ loop:
select {
case <-ctx.Done():
return changed, nil, nil, ctx.Err()
return nil, nil, ctx.Err()
default:
}
@@ -441,7 +438,7 @@ nextFile:
for {
select {
case <-ctx.Done():
return changed, fileDeletions, dirDeletions, ctx.Err()
return fileDeletions, dirDeletions, ctx.Err()
default:
}
@@ -452,7 +449,7 @@ nextFile:
fi, ok, err := f.model.sdb.GetGlobalFile(f.folderID, fileName)
if err != nil {
return changed, nil, nil, err
return nil, nil, err
}
if !ok {
// File is no longer in the index. Mark it as done and drop it.
@@ -515,7 +512,7 @@ nextFile:
}
}
return changed, fileDeletions, dirDeletions, nil
return fileDeletions, dirDeletions, nil
}
func popCandidate(buckets map[string][]protocol.FileInfo, key string) (protocol.FileInfo, bool) {
@@ -1741,9 +1738,10 @@ func (f *sendReceiveFolder) Jobs(page, perpage int) ([]string, []string, int) {
// dbUpdaterRoutine aggregates db updates and commits them in batches no
// larger than 1000 items, and no more delayed than 2 seconds.
func (f *sendReceiveFolder) dbUpdaterRoutine(dbUpdateChan <-chan dbUpdateJob) {
func (f *sendReceiveFolder) dbUpdaterRoutine(dbUpdateChan <-chan dbUpdateJob) int {
const maxBatchTime = 2 * time.Second
changed := 0
changedDirs := make(map[string]struct{})
found := false
var lastFile protocol.FileInfo
@@ -1819,8 +1817,8 @@ loop:
job.file.Sequence = 0
batch.Append(job.file)
batch.FlushIfFull()
changed++
case <-tick.C:
batch.Flush()
@@ -1828,6 +1826,7 @@ loop:
}
batch.Flush()
return changed
}
// pullScannerRoutine aggregates paths to be scanned after pulling. The scan is