diff --git a/lib/model/folder_sendrecv.go b/lib/model/folder_sendrecv.go index 29e2a0754..6b3b05184 100644 --- a/lib/model/folder_sendrecv.go +++ b/lib/model/folder_sendrecv.go @@ -202,9 +202,8 @@ func (f *sendReceiveFolder) pull(ctx context.Context) (bool, error) { f.sl.DebugContext(ctx, "Pull iteration completed", "changed", changed, "try", tries+1) if changed == 0 { - // No files were changed by the puller, so we are in - // sync (except for unrecoverable stuff like invalid - // filenames on windows). + // No files were changed by the puller, so we are in sync, or we + // are unable to make further progress for the moment. break } } @@ -231,7 +230,9 @@ func (f *sendReceiveFolder) pull(ctx context.Context) (bool, error) { }) } - return changed == 0, nil + // We're done if we didn't change anything and didn't fail to change + // anything + return changed == 0 && pullErrNum == 0, nil } // pullerIteration runs a single puller iteration for the given folder and @@ -256,9 +257,10 @@ func (f *sendReceiveFolder) pullerIteration(ctx context.Context, scanChan chan<- f.sl.DebugContext(ctx, "Starting puller iteration", "copiers", f.Copiers, "pullerPendingKiB", f.PullerMaxPendingKiB) updateWg.Add(1) + var changed int // only read after updateWg closes go func() { // dbUpdaterRoutine finishes when dbUpdateChan is closed - f.dbUpdaterRoutine(dbUpdateChan) + changed = f.dbUpdaterRoutine(dbUpdateChan) updateWg.Done() }() @@ -285,7 +287,7 @@ func (f *sendReceiveFolder) pullerIteration(ctx context.Context, scanChan chan<- doneWg.Done() }() - changed, fileDeletions, dirDeletions, err := f.processNeeded(ctx, dbUpdateChan, copyChan, scanChan) + fileDeletions, dirDeletions, err := f.processNeeded(ctx, dbUpdateChan, copyChan, scanChan) // Signal copy and puller routines that we are done with the in data for // this iteration. Wait for them to finish. @@ -312,8 +314,7 @@ func (f *sendReceiveFolder) pullerIteration(ctx context.Context, scanChan chan<- return changed, err } -func (f *sendReceiveFolder) processNeeded(ctx context.Context, dbUpdateChan chan<- dbUpdateJob, copyChan chan<- copyBlocksState, scanChan chan<- string) (int, map[string]protocol.FileInfo, []protocol.FileInfo, error) { - changed := 0 +func (f *sendReceiveFolder) processNeeded(ctx context.Context, dbUpdateChan chan<- dbUpdateJob, copyChan chan<- copyBlocksState, scanChan chan<- string) (map[string]protocol.FileInfo, []protocol.FileInfo, error) { var dirDeletions []protocol.FileInfo fileDeletions := map[string]protocol.FileInfo{} buckets := map[string][]protocol.FileInfo{} @@ -325,7 +326,7 @@ func (f *sendReceiveFolder) processNeeded(ctx context.Context, dbUpdateChan chan loop: for file, err := range itererr.Zip(f.model.sdb.AllNeededGlobalFiles(f.folderID, protocol.LocalDeviceID, f.Order, 0, 0)) { if err != nil { - return changed, nil, nil, err + return nil, nil, err } select { case <-ctx.Done(): @@ -338,8 +339,6 @@ loop: continue } - changed++ - switch { case f.ignores.Match(file.Name).IsIgnored(): file.SetIgnored() @@ -357,8 +356,6 @@ loop: // We can't pull an invalid file. Grab the error again since // we couldn't assign it directly in the case clause. f.newPullError(file.Name, fs.WindowsInvalidFilename(file.Name)) - // No reason to retry for this - changed-- } case file.IsDeleted(): @@ -372,7 +369,7 @@ loop: default: df, ok, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name) if err != nil { - return changed, nil, nil, err + return nil, nil, err } // Local file can be already deleted, but with a lower version // number, hence the deletion coming in again as part of @@ -391,7 +388,7 @@ loop: case file.Type == protocol.FileInfoTypeFile: curFile, hasCurFile, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name) if err != nil { - return changed, nil, nil, err + return nil, nil, err } if hasCurFile && file.BlocksEqual(curFile) { // We are supposed to copy the entire file, and then fetch nothing. We @@ -431,7 +428,7 @@ loop: select { case <-ctx.Done(): - return changed, nil, nil, ctx.Err() + return nil, nil, ctx.Err() default: } @@ -441,7 +438,7 @@ nextFile: for { select { case <-ctx.Done(): - return changed, fileDeletions, dirDeletions, ctx.Err() + return fileDeletions, dirDeletions, ctx.Err() default: } @@ -452,7 +449,7 @@ nextFile: fi, ok, err := f.model.sdb.GetGlobalFile(f.folderID, fileName) if err != nil { - return changed, nil, nil, err + return nil, nil, err } if !ok { // File is no longer in the index. Mark it as done and drop it. @@ -515,7 +512,7 @@ nextFile: } } - return changed, fileDeletions, dirDeletions, nil + return fileDeletions, dirDeletions, nil } func popCandidate(buckets map[string][]protocol.FileInfo, key string) (protocol.FileInfo, bool) { @@ -1741,9 +1738,10 @@ func (f *sendReceiveFolder) Jobs(page, perpage int) ([]string, []string, int) { // dbUpdaterRoutine aggregates db updates and commits them in batches no // larger than 1000 items, and no more delayed than 2 seconds. -func (f *sendReceiveFolder) dbUpdaterRoutine(dbUpdateChan <-chan dbUpdateJob) { +func (f *sendReceiveFolder) dbUpdaterRoutine(dbUpdateChan <-chan dbUpdateJob) int { const maxBatchTime = 2 * time.Second + changed := 0 changedDirs := make(map[string]struct{}) found := false var lastFile protocol.FileInfo @@ -1819,8 +1817,8 @@ loop: job.file.Sequence = 0 batch.Append(job.file) - batch.FlushIfFull() + changed++ case <-tick.C: batch.Flush() @@ -1828,6 +1826,7 @@ loop: } batch.Flush() + return changed } // pullScannerRoutine aggregates paths to be scanned after pulling. The scan is