Files
syncthing/lib/model/folder.go
Jakob Borg c9650fc7d5 chore(model): delay starting a pull while there are incoming index updates (#10041)
This adds a simple delay to the process for starting the pull, by
default one second. In practice this means we're likely to wait for
initial index transfer, or multiple messages sent as part of a larger
change. This is better because we're more likely to have the whole
change for the purpose of handling renames etc, and also it's more
efficient to do one larger puller iteration instead of multiple while
also processing changes.

It does however introduce a certain amount of delay into the sync
process, so it can be tuned down or turned off entirely.
2025-04-06 14:31:02 +02:00

1406 lines
36 KiB
Go

// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package model
import (
"context"
"errors"
"fmt"
"math/rand"
"path/filepath"
"sort"
"time"
"github.com/syncthing/syncthing/internal/db"
"github.com/syncthing/syncthing/internal/itererr"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/ignore"
"github.com/syncthing/syncthing/lib/locations"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/scanner"
"github.com/syncthing/syncthing/lib/semaphore"
"github.com/syncthing/syncthing/lib/stats"
"github.com/syncthing/syncthing/lib/stringutil"
"github.com/syncthing/syncthing/lib/svcutil"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/versioner"
"github.com/syncthing/syncthing/lib/watchaggregator"
)
// Arbitrary limit that triggers a warning on kqueue systems
const kqueueItemCountThreshold = 10000
type folder struct {
stateTracker
config.FolderConfiguration
*stats.FolderStatisticsReference
ioLimiter *semaphore.Semaphore
localFlags uint32
model *model
shortID protocol.ShortID
db db.DB
ignores *ignore.Matcher
mtimefs fs.Filesystem
modTimeWindow time.Duration
ctx context.Context // used internally, only accessible on serve lifetime
done chan struct{} // used externally, accessible regardless of serve
scanInterval time.Duration
scanTimer *time.Timer
scanDelay chan time.Duration
initialScanFinished chan struct{}
scanScheduled chan struct{}
versionCleanupInterval time.Duration
versionCleanupTimer *time.Timer
pullScheduled chan struct{}
pullPause time.Duration
pullFailTimer *time.Timer
scanErrors []FileError
pullErrors []FileError
errorsMut sync.Mutex
doInSyncChan chan syncRequest
forcedRescanRequested chan struct{}
forcedRescanPaths map[string]struct{}
forcedRescanPathsMut sync.Mutex
watchCancel context.CancelFunc
watchChan chan []string
restartWatchChan chan struct{}
watchErr error
watchMut sync.Mutex
puller puller
versioner versioner.Versioner
warnedKqueue bool
}
type syncRequest struct {
fn func() error
err chan error
}
type puller interface {
pull() (bool, error) // true when successful and should not be retried
}
func newFolder(model *model, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger, ioLimiter *semaphore.Semaphore, ver versioner.Versioner) folder {
f := folder{
stateTracker: newStateTracker(cfg.ID, evLogger),
FolderConfiguration: cfg,
FolderStatisticsReference: stats.NewFolderStatisticsReference(db.NewTyped(model.sdb, "folderstats/"+cfg.ID)),
ioLimiter: ioLimiter,
model: model,
shortID: model.shortID,
db: model.sdb,
ignores: ignores,
mtimefs: cfg.Filesystem(fs.NewMtimeOption(model.sdb, cfg.ID)),
modTimeWindow: cfg.ModTimeWindow(),
done: make(chan struct{}),
scanInterval: time.Duration(cfg.RescanIntervalS) * time.Second,
scanTimer: time.NewTimer(0), // The first scan should be done immediately.
scanDelay: make(chan time.Duration),
initialScanFinished: make(chan struct{}),
scanScheduled: make(chan struct{}, 1),
versionCleanupInterval: time.Duration(cfg.Versioning.CleanupIntervalS) * time.Second,
versionCleanupTimer: time.NewTimer(time.Duration(cfg.Versioning.CleanupIntervalS) * time.Second),
pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
errorsMut: sync.NewMutex(),
doInSyncChan: make(chan syncRequest),
forcedRescanRequested: make(chan struct{}, 1),
forcedRescanPaths: make(map[string]struct{}),
forcedRescanPathsMut: sync.NewMutex(),
watchCancel: func() {},
restartWatchChan: make(chan struct{}, 1),
watchMut: sync.NewMutex(),
versioner: ver,
}
f.pullPause = f.pullBasePause()
f.pullFailTimer = time.NewTimer(0)
<-f.pullFailTimer.C
registerFolderMetrics(f.ID)
return f
}
func (f *folder) Serve(ctx context.Context) error {
f.model.foldersRunning.Add(1)
defer f.model.foldersRunning.Add(-1)
f.ctx = ctx
l.Debugln(f, "starting")
defer l.Debugln(f, "exiting")
defer func() {
f.scanTimer.Stop()
f.versionCleanupTimer.Stop()
f.setState(FolderIdle)
}()
if f.FSWatcherEnabled && f.getHealthErrorAndLoadIgnores() == nil {
f.startWatch()
}
// If we're configured to not do version cleanup, or we don't have a
// versioner, cancel and drain that timer now.
if f.versionCleanupInterval == 0 || f.versioner == nil {
if !f.versionCleanupTimer.Stop() {
<-f.versionCleanupTimer.C
}
}
initialCompleted := f.initialScanFinished
pullTimer := time.NewTimer(0)
pullTimer.Stop()
for {
var err error
select {
case <-f.ctx.Done():
close(f.done)
return nil
case <-f.pullScheduled:
if f.PullerDelayS > 0 {
// Wait for incoming updates to settle before doing the
// actual pull
f.setState(FolderSyncWaiting)
pullTimer.Reset(time.Duration(float64(time.Second) * f.PullerDelayS))
} else {
_, err = f.pull()
}
case <-pullTimer.C:
f.setState(FolderIdle)
_, err = f.pull()
case <-f.pullFailTimer.C:
var success bool
success, err = f.pull()
if (err != nil || !success) && f.pullPause < 60*f.pullBasePause() {
// Back off from retrying to pull
f.pullPause *= 2
}
case <-initialCompleted:
// Initial scan has completed, we should do a pull
initialCompleted = nil // never hit this case again
_, err = f.pull()
case <-f.forcedRescanRequested:
err = f.handleForcedRescans()
case <-f.scanTimer.C:
l.Debugln(f, "Scanning due to timer")
err = f.scanTimerFired()
case req := <-f.doInSyncChan:
l.Debugln(f, "Running something due to request")
err = req.fn()
req.err <- err
case next := <-f.scanDelay:
l.Debugln(f, "Delaying scan")
f.scanTimer.Reset(next)
case <-f.scanScheduled:
l.Debugln(f, "Scan was scheduled")
f.scanTimer.Reset(0)
case fsEvents := <-f.watchChan:
l.Debugln(f, "Scan due to watcher")
err = f.scanSubdirs(fsEvents)
case <-f.restartWatchChan:
l.Debugln(f, "Restart watcher")
err = f.restartWatch()
case <-f.versionCleanupTimer.C:
l.Debugln(f, "Doing version cleanup")
f.versionCleanupTimerFired()
}
if err != nil {
if svcutil.IsFatal(err) {
return err
}
f.setError(err)
}
}
}
func (*folder) BringToFront(string) {}
func (*folder) Override() {}
func (*folder) Revert() {}
func (f *folder) DelayScan(next time.Duration) {
select {
case f.scanDelay <- next:
case <-f.done:
}
}
func (f *folder) ScheduleScan() {
// 1-buffered chan
select {
case f.scanScheduled <- struct{}{}:
default:
}
}
func (f *folder) ignoresUpdated() {
if f.FSWatcherEnabled {
f.scheduleWatchRestart()
}
}
func (f *folder) SchedulePull() {
select {
case f.pullScheduled <- struct{}{}:
default:
// We might be busy doing a pull and thus not reading from this
// channel. The channel is 1-buffered, so one notification will be
// queued to ensure we recheck after the pull, but beyond that we must
// make sure to not block index receiving.
}
}
func (*folder) Jobs(_, _ int) ([]string, []string, int) {
return nil, nil, 0
}
func (f *folder) Scan(subdirs []string) error {
<-f.initialScanFinished
return f.doInSync(func() error { return f.scanSubdirs(subdirs) })
}
// doInSync allows to run functions synchronously in folder.serve from exported,
// asynchronously called methods.
func (f *folder) doInSync(fn func() error) error {
req := syncRequest{
fn: fn,
err: make(chan error, 1),
}
select {
case f.doInSyncChan <- req:
return <-req.err
case <-f.done:
return context.Canceled
}
}
func (f *folder) Reschedule() {
if f.scanInterval == 0 {
return
}
// Sleep a random time between 3/4 and 5/4 of the configured interval.
sleepNanos := (f.scanInterval.Nanoseconds()*3 + rand.Int63n(2*f.scanInterval.Nanoseconds())) / 4
interval := time.Duration(sleepNanos) * time.Nanosecond
l.Debugln(f, "next rescan in", interval)
f.scanTimer.Reset(interval)
}
func (f *folder) getHealthErrorAndLoadIgnores() error {
if err := f.getHealthErrorWithoutIgnores(); err != nil {
return err
}
if f.Type != config.FolderTypeReceiveEncrypted {
if err := f.ignores.Load(".stignore"); err != nil && !fs.IsNotExist(err) {
return fmt.Errorf("loading ignores: %w", err)
}
}
return nil
}
func (f *folder) getHealthErrorWithoutIgnores() error {
// Check for folder errors, with the most serious and specific first and
// generic ones like out of space on the home disk later.
if err := f.CheckPath(); err != nil {
return err
}
if minFree := f.model.cfg.Options().MinHomeDiskFree; minFree.Value > 0 {
dbPath := locations.Get(locations.Database)
if usage, err := fs.NewFilesystem(fs.FilesystemTypeBasic, dbPath).Usage("."); err == nil {
if err = config.CheckFreeSpace(minFree, usage); err != nil {
return fmt.Errorf("insufficient space on disk for database (%v): %w", dbPath, err)
}
}
}
return nil
}
func (f *folder) pull() (success bool, err error) {
f.pullFailTimer.Stop()
select {
case <-f.pullFailTimer.C:
default:
}
select {
case <-f.initialScanFinished:
default:
// Once the initial scan finished, a pull will be scheduled
return true, nil
}
defer func() {
if success {
// We're good, reset the pause interval.
f.pullPause = f.pullBasePause()
}
}()
// If there is nothing to do, don't even enter sync-waiting state.
needCount, err := f.db.CountNeed(f.folderID, protocol.LocalDeviceID)
if err != nil {
return false, err
}
if needCount.TotalItems() == 0 {
// Clears pull failures on items that were needed before, but aren't anymore.
f.errorsMut.Lock()
f.pullErrors = nil
f.errorsMut.Unlock()
return true, nil
}
// Abort early (before acquiring a token) if there's a folder error
err = f.getHealthErrorWithoutIgnores()
if err != nil {
l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err)
return false, err
}
// Send only folder doesn't do any io, it only checks for out-of-sync
// items that differ in metadata and updates those.
if f.Type != config.FolderTypeSendOnly {
f.setState(FolderSyncWaiting)
if err := f.ioLimiter.TakeWithContext(f.ctx, 1); err != nil {
return true, err
}
defer f.ioLimiter.Give(1)
}
startTime := time.Now()
// Check if the ignore patterns changed.
oldHash := f.ignores.Hash()
defer func() {
if f.ignores.Hash() != oldHash {
f.ignoresUpdated()
}
}()
err = f.getHealthErrorAndLoadIgnores()
if err != nil {
l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err)
return false, err
}
f.setError(nil)
success, err = f.puller.pull()
if success && err == nil {
return true, nil
}
// Pulling failed, try again later.
delay := f.pullPause + time.Since(startTime)
l.Infof("Folder %v isn't making sync progress - retrying in %v.", f.Description(), stringutil.NiceDurationString(delay))
f.pullFailTimer.Reset(delay)
return false, err
}
func (f *folder) scanSubdirs(subDirs []string) error {
l.Debugf("%v scanning", f)
oldHash := f.ignores.Hash()
err := f.getHealthErrorAndLoadIgnores()
if err != nil {
return err
}
f.setError(nil)
// Check on the way out if the ignore patterns changed as part of scanning
// this folder. If they did we should schedule a pull of the folder so that
// we request things we might have suddenly become unignored and so on.
defer func() {
if f.ignores.Hash() != oldHash {
l.Debugln("Folder", f.Description(), "ignore patterns change detected while scanning; triggering puller")
f.ignoresUpdated()
f.SchedulePull()
}
}()
f.setState(FolderScanWaiting)
defer f.setState(FolderIdle)
if err := f.ioLimiter.TakeWithContext(f.ctx, 1); err != nil {
return err
}
defer f.ioLimiter.Give(1)
metricFolderScans.WithLabelValues(f.ID).Inc()
ctx, cancel := context.WithCancel(f.ctx)
defer cancel()
go addTimeUntilCancelled(ctx, metricFolderScanSeconds.WithLabelValues(f.ID))
for i := range subDirs {
sub := osutil.NativeFilename(subDirs[i])
if sub == "" {
// A blank subdirs means to scan the entire folder. We can trim
// the subDirs list and go on our way.
subDirs = nil
break
}
subDirs[i] = sub
}
// Clean the list of subitems to ensure that we start at a known
// directory, and don't scan subdirectories of things we've already
// scanned.
subDirs = unifySubs(subDirs, func(file string) bool {
_, ok, err := f.db.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file)
return err == nil && ok
})
f.setState(FolderScanning)
f.clearScanErrors(subDirs)
batch := f.newScanBatch()
// Schedule a pull after scanning, but only if we actually detected any
// changes.
changes := 0
defer func() {
l.Debugf("%v finished scanning, detected %v changes", f, changes)
if changes > 0 {
f.SchedulePull()
}
}()
changesHere, err := f.scanSubdirsChangedAndNew(subDirs, batch)
changes += changesHere
if err != nil {
return err
}
if err := batch.Flush(); err != nil {
return err
}
if len(subDirs) == 0 {
// If we have no specific subdirectories to traverse, set it to one
// empty prefix so we traverse the entire folder contents once.
subDirs = []string{""}
}
// Do a scan of the database for each prefix, to check for deleted and
// ignored files.
changesHere, err = f.scanSubdirsDeletedAndIgnored(subDirs, batch)
changes += changesHere
if err != nil {
return err
}
if err := batch.Flush(); err != nil {
return err
}
f.ScanCompleted()
return nil
}
const maxToRemove = 1000
type scanBatch struct {
f *folder
updateBatch *FileInfoBatch
toRemove []string
}
func (f *folder) newScanBatch() *scanBatch {
b := &scanBatch{
f: f,
toRemove: make([]string, 0, maxToRemove),
}
b.updateBatch = NewFileInfoBatch(func(fs []protocol.FileInfo) error {
if err := b.f.getHealthErrorWithoutIgnores(); err != nil {
l.Debugf("Stopping scan of folder %s due to: %s", b.f.Description(), err)
return err
}
b.f.updateLocalsFromScanning(fs)
return nil
})
return b
}
func (b *scanBatch) Remove(item string) {
b.toRemove = append(b.toRemove, item)
}
func (b *scanBatch) flushToRemove() error {
if len(b.toRemove) > 0 {
if err := b.f.db.DropFilesNamed(b.f.folderID, protocol.LocalDeviceID, b.toRemove); err != nil {
return err
}
b.toRemove = b.toRemove[:0]
}
return nil
}
func (b *scanBatch) Flush() error {
if err := b.flushToRemove(); err != nil {
return err
}
return b.updateBatch.Flush()
}
func (b *scanBatch) FlushIfFull() error {
if len(b.toRemove) >= maxToRemove {
if err := b.flushToRemove(); err != nil {
return err
}
}
return b.updateBatch.FlushIfFull()
}
// Update adds the fileinfo to the batch for updating, and does a few checks.
// It returns false if the checks result in the file not going to be updated or removed.
func (b *scanBatch) Update(fi protocol.FileInfo) (bool, error) {
// Check for a "virtual" parent directory of encrypted files. We don't track
// it, but check if anything still exists within and delete it otherwise.
if b.f.Type == config.FolderTypeReceiveEncrypted && fi.IsDirectory() && protocol.IsEncryptedParent(fs.PathComponents(fi.Name)) {
if names, err := b.f.mtimefs.DirNames(fi.Name); err == nil && len(names) == 0 {
b.f.mtimefs.Remove(fi.Name)
}
return false, nil
}
// Resolve receive-only items which are identical with the global state or
// the global item is our own receive-only item.
switch gf, ok, err := b.f.db.GetGlobalFile(b.f.folderID, fi.Name); {
case err != nil:
return false, err
case !ok:
case gf.IsReceiveOnlyChanged():
if fi.IsDeleted() {
// Our item is deleted and the global item is our own receive only
// file. No point in keeping track of that.
b.Remove(fi.Name)
l.Debugf("%v scanning: deleting deleted receive-only local-changed file: %v", b.f, fi)
return true, nil
}
case (b.f.Type == config.FolderTypeReceiveOnly || b.f.Type == config.FolderTypeReceiveEncrypted) &&
gf.IsEquivalentOptional(fi, protocol.FileInfoComparison{
ModTimeWindow: b.f.modTimeWindow,
IgnorePerms: b.f.IgnorePerms,
IgnoreBlocks: true,
IgnoreFlags: protocol.FlagLocalReceiveOnly,
IgnoreOwnership: !b.f.SyncOwnership && !b.f.SendOwnership,
IgnoreXattrs: !b.f.SyncXattrs && !b.f.SendXattrs,
}):
// What we have locally is equivalent to the global file.
l.Debugf("%v scanning: Merging identical locally changed item with global: %v", b.f, fi)
fi = gf
}
b.updateBatch.Append(fi)
return true, nil
}
func (f *folder) scanSubdirsChangedAndNew(subDirs []string, batch *scanBatch) (int, error) {
changes := 0
// If we return early e.g. due to a folder health error, the scan needs
// to be cancelled.
scanCtx, scanCancel := context.WithCancel(f.ctx)
defer scanCancel()
scanConfig := scanner.Config{
Folder: f.ID,
Subs: subDirs,
Matcher: f.ignores,
TempLifetime: time.Duration(f.model.cfg.Options().KeepTemporariesH) * time.Hour,
CurrentFiler: cFiler{db: f.db, folder: f.folderID},
Filesystem: f.mtimefs,
IgnorePerms: f.IgnorePerms,
AutoNormalize: f.AutoNormalize,
Hashers: f.model.numHashers(f.ID),
ShortID: f.shortID,
ProgressTickIntervalS: f.ScanProgressIntervalS,
LocalFlags: f.localFlags,
ModTimeWindow: f.modTimeWindow,
EventLogger: f.evLogger,
ScanOwnership: f.SendOwnership || f.SyncOwnership,
ScanXattrs: f.SendXattrs || f.SyncXattrs,
XattrFilter: f.XattrFilter,
}
var fchan chan scanner.ScanResult
if f.Type == config.FolderTypeReceiveEncrypted {
fchan = scanner.WalkWithoutHashing(scanCtx, scanConfig)
} else {
fchan = scanner.Walk(scanCtx, scanConfig)
}
alreadyUsedOrExisting := make(map[string]struct{})
for res := range fchan {
if res.Err != nil {
f.newScanError(res.Path, res.Err)
continue
}
if err := batch.FlushIfFull(); err != nil {
// Prevent a race between the scan aborting due to context
// cancellation and releasing the snapshot in defer here.
scanCancel()
for range fchan {
}
return changes, err
}
if ok, err := batch.Update(res.File); err != nil {
return 0, err
} else if ok {
changes++
}
switch f.Type {
case config.FolderTypeReceiveOnly, config.FolderTypeReceiveEncrypted:
default:
if nf, ok := f.findRename(res.File, alreadyUsedOrExisting); ok {
if ok, err := batch.Update(nf); err != nil {
return 0, err
} else if ok {
changes++
}
}
}
}
return changes, nil
}
func (f *folder) scanSubdirsDeletedAndIgnored(subDirs []string, batch *scanBatch) (int, error) {
var toIgnore []protocol.FileInfo
ignoredParent := ""
changes := 0
outer:
for _, sub := range subDirs {
for fi, err := range itererr.Zip(f.db.AllLocalFilesWithPrefix(f.folderID, protocol.LocalDeviceID, sub)) {
if err != nil {
return changes, err
}
select {
case <-f.ctx.Done():
break outer
default:
}
if err := batch.FlushIfFull(); err != nil {
return 0, err
}
if ignoredParent != "" && !fs.IsParent(fi.Name, ignoredParent) {
for _, file := range toIgnore {
l.Debugln("marking file as ignored", file)
nf := file
nf.SetIgnored()
if ok, err := batch.Update(nf); err != nil {
return 0, err
} else if ok {
changes++
}
if err := batch.FlushIfFull(); err != nil {
return 0, err
}
}
toIgnore = toIgnore[:0]
ignoredParent = ""
}
switch ignored := f.ignores.Match(fi.Name).IsIgnored(); {
case fi.IsIgnored() && ignored:
continue
case !fi.IsIgnored() && ignored:
// File was not ignored at last pass but has been ignored.
if fi.IsDirectory() {
// Delay ignoring as a child might be unignored.
toIgnore = append(toIgnore, fi)
if ignoredParent == "" {
// If the parent wasn't ignored already, set
// this path as the "highest" ignored parent
ignoredParent = fi.Name
}
continue
}
l.Debugln("marking file as ignored", fi)
nf := fi
nf.SetIgnored()
if ok, err := batch.Update(nf); err != nil {
return 0, err
} else if ok {
changes++
}
case fi.IsIgnored() && !ignored:
// Successfully scanned items are already un-ignored during
// the scan, so check whether it is deleted.
fallthrough
case !fi.IsIgnored() && !fi.IsDeleted() && !fi.IsUnsupported():
// The file is not ignored, deleted or unsupported. Lets check if
// it's still here. Simply stat:ing it won't do as there are
// tons of corner cases (e.g. parent dir->symlink, missing
// permissions)
if !osutil.IsDeleted(f.mtimefs, fi.Name) {
if ignoredParent != "" {
// Don't ignore parents of this not ignored item
toIgnore = toIgnore[:0]
ignoredParent = ""
}
continue
}
nf := fi
nf.SetDeleted(f.shortID)
nf.LocalFlags = f.localFlags
if fi.ShouldConflict() {
// We do not want to override the global version with
// the deleted file. Setting to an empty version makes
// sure the file gets in sync on the following pull.
nf.Version = protocol.Vector{}
}
l.Debugln("marking file as deleted", nf)
if ok, err := batch.Update(nf); err != nil {
return 0, err
} else if ok {
changes++
}
case fi.IsDeleted() && fi.IsReceiveOnlyChanged():
switch f.Type {
case config.FolderTypeReceiveOnly, config.FolderTypeReceiveEncrypted:
switch gf, ok, err := f.db.GetGlobalFile(f.folderID, fi.Name); {
case err != nil:
return 0, err
case !ok:
case gf.IsReceiveOnlyChanged():
l.Debugln("removing deleted, receive-only item that is globally receive-only from db", fi)
batch.Remove(fi.Name)
changes++
case gf.IsDeleted():
// Our item is deleted and the global item is deleted too. We just
// pretend it is a normal deleted file (nobody cares about that).
l.Debugf("%v scanning: Marking globally deleted item as not locally changed: %v", f, fi.Name)
fi.LocalFlags &^= protocol.FlagLocalReceiveOnly
if ok, err := batch.Update(fi); err != nil {
return 0, err
} else if ok {
changes++
}
}
default:
// No need to bump the version for a file that was and is
// deleted and just the folder type/local flags changed.
fi.LocalFlags &^= protocol.FlagLocalReceiveOnly
l.Debugln("removing receive-only flag on deleted item", fi)
if ok, err := batch.Update(fi); err != nil {
return 0, err
} else if ok {
changes++
}
}
}
}
select {
case <-f.ctx.Done():
return changes, f.ctx.Err()
default:
}
if len(toIgnore) > 0 {
for _, file := range toIgnore {
l.Debugln("marking file as ignored", file)
nf := file
nf.SetIgnored()
if ok, err := batch.Update(nf); err != nil {
return 0, err
} else if ok {
changes++
}
if err := batch.FlushIfFull(); err != nil {
return 0, err
}
}
toIgnore = toIgnore[:0]
}
}
return changes, nil
}
func (f *folder) findRename(file protocol.FileInfo, alreadyUsedOrExisting map[string]struct{}) (protocol.FileInfo, bool) {
if len(file.Blocks) == 0 || file.Size == 0 {
return protocol.FileInfo{}, false
}
found := false
nf := protocol.FileInfo{}
loop:
for fi, err := range itererr.Zip(f.db.AllLocalFilesWithBlocksHash(f.folderID, file.BlocksHash)) {
if err != nil {
return protocol.FileInfo{}, false
}
select {
case <-f.ctx.Done():
break loop
default:
}
if fi.Name == file.Name {
alreadyUsedOrExisting[fi.Name] = struct{}{}
continue
}
if _, ok := alreadyUsedOrExisting[fi.Name]; ok {
continue
}
if fi.ShouldConflict() {
continue
}
if f.ignores.Match(fi.Name).IsIgnored() {
continue
}
// Only check the size.
// No point checking block equality, as that uses BlocksHash comparison if that is set (which it will be).
// No point checking BlocksHash comparison as WithBlocksHash already does that.
if file.Size != fi.Size {
continue
}
alreadyUsedOrExisting[fi.Name] = struct{}{}
if !osutil.IsDeleted(f.mtimefs, fi.Name) {
continue
}
var ok bool
nf, ok, err = f.db.GetDeviceFile(f.folderID, protocol.LocalDeviceID, fi.Name)
if err != nil || !ok || nf.Sequence != fi.Sequence {
continue
}
nf.SetDeleted(f.shortID)
nf.LocalFlags = f.localFlags
found = true
break
}
return nf, found
}
func (f *folder) scanTimerFired() error {
err := f.scanSubdirs(nil)
select {
case <-f.initialScanFinished:
default:
status := "Completed"
if err != nil {
status = "Failed"
}
l.Infoln(status, "initial scan of", f.Type.String(), "folder", f.Description())
close(f.initialScanFinished)
}
f.Reschedule()
return err
}
func (f *folder) versionCleanupTimerFired() {
f.setState(FolderCleanWaiting)
defer f.setState(FolderIdle)
if err := f.ioLimiter.TakeWithContext(f.ctx, 1); err != nil {
return
}
defer f.ioLimiter.Give(1)
f.setState(FolderCleaning)
if err := f.versioner.Clean(f.ctx); err != nil {
l.Infoln("Failed to clean versions in %s: %v", f.Description(), err)
}
f.versionCleanupTimer.Reset(f.versionCleanupInterval)
}
func (f *folder) WatchError() error {
f.watchMut.Lock()
defer f.watchMut.Unlock()
return f.watchErr
}
// stopWatch immediately aborts watching and may be called asynchronously
func (f *folder) stopWatch() {
f.watchMut.Lock()
f.watchCancel()
f.watchMut.Unlock()
f.setWatchError(nil, 0)
}
// scheduleWatchRestart makes sure watching is restarted from the main for loop
// in a folder's Serve and thus may be called asynchronously (e.g. when ignores change).
func (f *folder) scheduleWatchRestart() {
select {
case f.restartWatchChan <- struct{}{}:
default:
// We might be busy doing a pull and thus not reading from this
// channel. The channel is 1-buffered, so one notification will be
// queued to ensure we recheck after the pull.
}
}
// restartWatch should only ever be called synchronously. If you want to use
// this asynchronously, you should probably use scheduleWatchRestart instead.
func (f *folder) restartWatch() error {
f.stopWatch()
f.startWatch()
return f.scanSubdirs(nil)
}
// startWatch should only ever be called synchronously. If you want to use
// this asynchronously, you should probably use scheduleWatchRestart instead.
func (f *folder) startWatch() {
ctx, cancel := context.WithCancel(f.ctx)
f.watchMut.Lock()
f.watchChan = make(chan []string)
f.watchCancel = cancel
f.watchMut.Unlock()
go f.monitorWatch(ctx)
}
// monitorWatch starts the filesystem watching and retries every minute on failure.
// It should not be used except in startWatch.
func (f *folder) monitorWatch(ctx context.Context) {
failTimer := time.NewTimer(0)
aggrCtx, aggrCancel := context.WithCancel(ctx)
var err error
var eventChan <-chan fs.Event
var errChan <-chan error
warnedOutside := false
var lastWatch time.Time
pause := time.Minute
// Subscribe to folder summaries only on kqueue systems, to warn about potential high resource usage
var summarySub events.Subscription
var summaryChan <-chan events.Event
if fs.WatchKqueue && !f.warnedKqueue {
summarySub = f.evLogger.Subscribe(events.FolderSummary)
summaryChan = summarySub.C()
}
defer func() {
aggrCancel() // aggrCancel might e re-assigned -> call within closure
if summaryChan != nil {
summarySub.Unsubscribe()
}
}()
for {
select {
case <-failTimer.C:
eventChan, errChan, err = f.mtimefs.Watch(".", f.ignores, ctx, f.IgnorePerms)
// We do this once per minute initially increased to
// max one hour in case of repeat failures.
f.scanOnWatchErr()
f.setWatchError(err, pause)
if err != nil {
failTimer.Reset(pause)
if pause < 60*time.Minute {
pause *= 2
}
continue
}
lastWatch = time.Now()
watchaggregator.Aggregate(aggrCtx, eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, f.evLogger)
l.Debugln("Started filesystem watcher for folder", f.Description())
case err = <-errChan:
var next time.Duration
if dur := time.Since(lastWatch); dur > pause {
pause = time.Minute
next = 0
} else {
next = pause - dur
if pause < 60*time.Minute {
pause *= 2
}
}
failTimer.Reset(next)
f.setWatchError(err, next)
// This error was previously a panic and should never occur, so generate
// a warning, but don't do it repetitively.
var errOutside *fs.ErrWatchEventOutsideRoot
if errors.As(err, &errOutside) {
if !warnedOutside {
l.Warnln(err)
warnedOutside = true
}
f.evLogger.Log(events.Failure, "watching for changes encountered an event outside of the filesystem root")
}
aggrCancel()
errChan = nil
aggrCtx, aggrCancel = context.WithCancel(ctx)
case ev := <-summaryChan:
if data, ok := ev.Data.(FolderSummaryEventData); !ok {
f.evLogger.Log(events.Failure, "Unexpected type of folder-summary event in folder.monitorWatch")
} else if data.Folder == f.folderID && data.Summary.LocalTotalItems-data.Summary.LocalDeleted > kqueueItemCountThreshold {
f.warnedKqueue = true
summarySub.Unsubscribe()
summaryChan = nil
l.Warnf("Filesystem watching (kqueue) is enabled on %v with a lot of files/directories, and that requires a lot of resources and might slow down your system significantly", f.Description())
}
case <-ctx.Done():
aggrCancel() // for good measure and keeping the linters happy
return
}
}
}
// setWatchError sets the current error state of the watch and should be called
// regardless of whether err is nil or not.
func (f *folder) setWatchError(err error, nextTryIn time.Duration) {
f.watchMut.Lock()
prevErr := f.watchErr
f.watchErr = err
f.watchMut.Unlock()
if err != prevErr {
data := map[string]interface{}{
"folder": f.ID,
}
if prevErr != nil {
data["from"] = prevErr.Error()
}
if err != nil {
data["to"] = err.Error()
}
f.evLogger.Log(events.FolderWatchStateChanged, data)
}
if err == nil {
return
}
msg := fmt.Sprintf("Error while trying to start filesystem watcher for folder %s, trying again in %v: %v", f.Description(), nextTryIn, err)
if prevErr != err {
l.Infof(msg)
return
}
l.Debugf(msg)
}
// scanOnWatchErr schedules a full scan immediately if an error occurred while watching.
func (f *folder) scanOnWatchErr() {
f.watchMut.Lock()
err := f.watchErr
f.watchMut.Unlock()
if err != nil {
f.DelayScan(0)
}
}
func (f *folder) setError(err error) {
select {
case <-f.ctx.Done():
return
default:
}
_, _, oldErr := f.getState()
if (err != nil && oldErr != nil && oldErr.Error() == err.Error()) || (err == nil && oldErr == nil) {
return
}
if err != nil {
if oldErr == nil {
l.Warnf("Error on folder %s: %v", f.Description(), err)
} else {
l.Infof("Error on folder %s changed: %q -> %q", f.Description(), oldErr, err)
}
} else {
l.Infoln("Cleared error on folder", f.Description())
f.SchedulePull()
}
if f.FSWatcherEnabled {
if err != nil {
f.stopWatch()
} else {
f.scheduleWatchRestart()
}
}
f.stateTracker.setError(err)
}
func (f *folder) pullBasePause() time.Duration {
if f.PullerPauseS == 0 {
return defaultPullerPause
}
return time.Duration(f.PullerPauseS) * time.Second
}
func (f *folder) String() string {
return fmt.Sprintf("%s/%s@%p", f.Type, f.folderID, f)
}
func (f *folder) newScanError(path string, err error) {
f.errorsMut.Lock()
l.Infof("Scanner (folder %s, item %q): %v", f.Description(), path, err)
f.scanErrors = append(f.scanErrors, FileError{
Err: err.Error(),
Path: path,
})
f.errorsMut.Unlock()
}
func (f *folder) clearScanErrors(subDirs []string) {
f.errorsMut.Lock()
defer f.errorsMut.Unlock()
if len(subDirs) == 0 {
f.scanErrors = nil
return
}
filtered := f.scanErrors[:0]
outer:
for _, fe := range f.scanErrors {
for _, sub := range subDirs {
if fe.Path == sub || fs.IsParent(fe.Path, sub) {
continue outer
}
}
filtered = append(filtered, fe)
}
f.scanErrors = filtered
}
func (f *folder) Errors() []FileError {
f.errorsMut.Lock()
defer f.errorsMut.Unlock()
scanLen := len(f.scanErrors)
errors := make([]FileError, scanLen+len(f.pullErrors))
copy(errors[:scanLen], f.scanErrors)
copy(errors[scanLen:], f.pullErrors)
sort.Sort(fileErrorList(errors))
return errors
}
// ScheduleForceRescan marks the file such that it gets rehashed on next scan, and schedules a scan.
func (f *folder) ScheduleForceRescan(path string) {
f.forcedRescanPathsMut.Lock()
f.forcedRescanPaths[path] = struct{}{}
f.forcedRescanPathsMut.Unlock()
select {
case f.forcedRescanRequested <- struct{}{}:
default:
}
}
func (f *folder) updateLocalsFromScanning(fs []protocol.FileInfo) error {
if err := f.updateLocals(fs); err != nil {
return err
}
f.emitDiskChangeEvents(fs, events.LocalChangeDetected)
return nil
}
func (f *folder) updateLocalsFromPulling(fs []protocol.FileInfo) error {
if err := f.updateLocals(fs); err != nil {
return err
}
f.emitDiskChangeEvents(fs, events.RemoteChangeDetected)
return nil
}
func (f *folder) updateLocals(fs []protocol.FileInfo) error {
if err := f.db.Update(f.folderID, protocol.LocalDeviceID, fs); err != nil {
return err
}
filenames := make([]string, len(fs))
f.forcedRescanPathsMut.Lock()
for i, file := range fs {
filenames[i] = file.Name
// No need to rescan a file that was changed since anyway.
delete(f.forcedRescanPaths, file.Name)
}
f.forcedRescanPathsMut.Unlock()
seq, err := f.db.GetDeviceSequence(f.folderID, protocol.LocalDeviceID)
if err != nil {
return err
}
f.evLogger.Log(events.LocalIndexUpdated, map[string]interface{}{
"folder": f.ID,
"items": len(fs),
"filenames": filenames,
"sequence": seq,
"version": seq, // legacy for sequence
})
return nil
}
func (f *folder) emitDiskChangeEvents(fs []protocol.FileInfo, typeOfEvent events.EventType) {
for _, file := range fs {
if file.IsInvalid() {
continue
}
objType := "file"
action := "modified"
if file.IsDeleted() {
action = "deleted"
}
if file.IsSymlink() {
objType = "symlink"
} else if file.IsDirectory() {
objType = "dir"
}
// Two different events can be fired here based on what EventType is passed into function
f.evLogger.Log(typeOfEvent, map[string]string{
"folder": f.ID,
"folderID": f.ID, // incorrect, deprecated, kept for historical compliance
"label": f.Label,
"action": action,
"type": objType,
"path": filepath.FromSlash(file.Name),
"modifiedBy": file.ModifiedBy.String(),
})
}
}
func (f *folder) handleForcedRescans() error {
f.forcedRescanPathsMut.Lock()
paths := make([]string, 0, len(f.forcedRescanPaths))
for path := range f.forcedRescanPaths {
paths = append(paths, path)
}
f.forcedRescanPaths = make(map[string]struct{})
f.forcedRescanPathsMut.Unlock()
if len(paths) == 0 {
return nil
}
batch := NewFileInfoBatch(func(fs []protocol.FileInfo) error {
return f.db.Update(f.folderID, protocol.LocalDeviceID, fs)
})
for _, path := range paths {
if err := batch.FlushIfFull(); err != nil {
return err
}
fi, ok, err := f.db.GetDeviceFile(f.folderID, protocol.LocalDeviceID, path)
if err != nil {
return err
}
if !ok {
continue
}
fi.SetMustRescan()
batch.Append(fi)
}
if err := batch.Flush(); err != nil {
return err
}
return f.scanSubdirs(paths)
}
// The exists function is expected to return true for all known paths
// (excluding "" and ".")
func unifySubs(dirs []string, exists func(dir string) bool) []string {
if len(dirs) == 0 {
return nil
}
sort.Strings(dirs)
if dirs[0] == "" || dirs[0] == "." || dirs[0] == string(fs.PathSeparator) {
return nil
}
prev := "./" // Anything that can't be parent of a clean path
for i := 0; i < len(dirs); {
dir, err := fs.Canonicalize(dirs[i])
if err != nil {
l.Debugf("Skipping %v for scan: %s", dirs[i], err)
dirs = append(dirs[:i], dirs[i+1:]...)
continue
}
if dir == prev || fs.IsParent(dir, prev) {
dirs = append(dirs[:i], dirs[i+1:]...)
continue
}
parent := filepath.Dir(dir)
for parent != "." && parent != string(fs.PathSeparator) && !exists(parent) {
dir = parent
parent = filepath.Dir(dir)
}
dirs[i] = dir
prev = dir
i++
}
return dirs
}
type cFiler struct {
db db.DB
folder string
}
// Implements scanner.CurrentFiler
func (cf cFiler) CurrentFile(file string) (protocol.FileInfo, bool) {
fi, ok, err := cf.db.GetDeviceFile(cf.folder, protocol.LocalDeviceID, file)
if err != nil || !ok {
return protocol.FileInfo{}, false
}
return fi, true
}