Files
syncthing/lib/model/folder.go
2025-11-27 20:34:35 +00:00

1412 lines
38 KiB
Go

// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package model
import (
"context"
"errors"
"fmt"
"log/slog"
"math/rand"
"path/filepath"
"slices"
"strings"
"sync"
"time"
"github.com/syncthing/syncthing/internal/db"
"github.com/syncthing/syncthing/internal/itererr"
"github.com/syncthing/syncthing/internal/slogutil"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/ignore"
"github.com/syncthing/syncthing/lib/locations"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/scanner"
"github.com/syncthing/syncthing/lib/semaphore"
"github.com/syncthing/syncthing/lib/stats"
"github.com/syncthing/syncthing/lib/stringutil"
"github.com/syncthing/syncthing/lib/svcutil"
"github.com/syncthing/syncthing/lib/versioner"
"github.com/syncthing/syncthing/lib/watchaggregator"
)
// Arbitrary limit that triggers a warning on kqueue systems
const kqueueItemCountThreshold = 10000
type folder struct {
stateTracker
config.FolderConfiguration
*stats.FolderStatisticsReference
ioLimiter *semaphore.Semaphore
localFlags protocol.FlagLocal
model *model
shortID protocol.ShortID
db db.DB
ignores *ignore.Matcher
mtimefs fs.Filesystem
modTimeWindow time.Duration
done chan struct{} // used externally, accessible regardless of serve
sl *slog.Logger
scanInterval time.Duration
scanTimer *time.Timer
scanDelay chan time.Duration
initialScanFinished chan struct{}
scanScheduled chan struct{}
versionCleanupInterval time.Duration
versionCleanupTimer *time.Timer
pullScheduled chan struct{}
pullPause time.Duration
pullFailTimer *time.Timer
scanErrors []FileError
pullErrors []FileError
errorsMut sync.Mutex
doInSyncChan chan syncRequest
forcedRescanRequested chan struct{}
forcedRescanPaths map[string]struct{}
forcedRescanPathsMut sync.Mutex
watchCancel context.CancelFunc
watchChan chan []string
restartWatchChan chan struct{}
watchErr error
watchMut sync.Mutex
puller puller
versioner versioner.Versioner
warnedKqueue bool
}
type syncRequest struct {
fn func(context.Context) error
err chan error
}
type puller interface {
pull(ctx context.Context) (bool, error) // true when successful and should not be retried
}
func newFolder(model *model, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger, ioLimiter *semaphore.Semaphore, ver versioner.Versioner) *folder {
f := folder{
stateTracker: newStateTracker(cfg.ID, evLogger),
FolderConfiguration: cfg,
FolderStatisticsReference: stats.NewFolderStatisticsReference(db.NewTyped(model.sdb, "folderstats/"+cfg.ID)),
ioLimiter: ioLimiter,
model: model,
shortID: model.shortID,
db: model.sdb,
ignores: ignores,
mtimefs: cfg.Filesystem(fs.NewMtimeOption(model.sdb, cfg.ID)),
modTimeWindow: cfg.ModTimeWindow(),
done: make(chan struct{}),
sl: slog.Default().With(cfg.LogAttr()),
scanInterval: time.Duration(cfg.RescanIntervalS) * time.Second,
scanTimer: time.NewTimer(0), // The first scan should be done immediately.
scanDelay: make(chan time.Duration),
initialScanFinished: make(chan struct{}),
scanScheduled: make(chan struct{}, 1),
versionCleanupInterval: time.Duration(cfg.Versioning.CleanupIntervalS) * time.Second,
versionCleanupTimer: time.NewTimer(time.Duration(cfg.Versioning.CleanupIntervalS) * time.Second),
pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
doInSyncChan: make(chan syncRequest),
forcedRescanRequested: make(chan struct{}, 1),
forcedRescanPaths: make(map[string]struct{}),
watchCancel: func() {},
restartWatchChan: make(chan struct{}, 1),
versioner: ver,
}
f.pullPause = f.pullBasePause()
f.pullFailTimer = time.NewTimer(0)
<-f.pullFailTimer.C
registerFolderMetrics(f.ID)
return &f
}
func (f *folder) Serve(ctx context.Context) error {
f.model.foldersRunning.Add(1)
defer f.model.foldersRunning.Add(-1)
f.sl.DebugContext(ctx, "Folder starting")
defer f.sl.DebugContext(ctx, "Folder exiting")
defer func() {
f.scanTimer.Stop()
f.versionCleanupTimer.Stop()
f.setState(FolderIdle)
}()
if f.FSWatcherEnabled && f.getHealthErrorAndLoadIgnores() == nil {
f.startWatch(ctx)
}
// If we're configured to not do version cleanup, or we don't have a
// versioner, cancel and drain that timer now.
if f.versionCleanupInterval == 0 || f.versioner == nil {
if !f.versionCleanupTimer.Stop() {
<-f.versionCleanupTimer.C
}
}
initialCompleted := f.initialScanFinished
pullTimer := time.NewTimer(0)
pullTimer.Stop()
for {
var err error
select {
case <-ctx.Done():
close(f.done)
return nil
case <-f.pullScheduled:
if f.PullerDelayS > 0 {
// Wait for incoming updates to settle before doing the
// actual pull. Only set the state to SyncWaiting if we have
// reason to believe there is something to sync, to avoid
// unnecessary flashing in the GUI.
if needCount, err := f.db.CountNeed(f.folderID, protocol.LocalDeviceID); err == nil && needCount.TotalItems() > 0 {
f.setState(FolderSyncWaiting)
}
pullTimer.Reset(time.Duration(float64(time.Second) * f.PullerDelayS))
} else {
_, err = f.pull(ctx)
}
case <-pullTimer.C:
f.setState(FolderIdle)
_, err = f.pull(ctx)
case <-f.pullFailTimer.C:
var success bool
success, err = f.pull(ctx)
if (err != nil || !success) && f.pullPause < 60*f.pullBasePause() {
// Back off from retrying to pull
f.pullPause *= 2
}
case <-initialCompleted:
// Initial scan has completed, we should do a pull
initialCompleted = nil // never hit this case again
_, err = f.pull(ctx)
case <-f.forcedRescanRequested:
err = f.handleForcedRescans(ctx)
case <-f.scanTimer.C:
f.sl.DebugContext(ctx, "Scanning due to timer")
err = f.scanTimerFired(ctx)
case req := <-f.doInSyncChan:
f.sl.DebugContext(ctx, "Running something due to request")
err = req.fn(ctx)
req.err <- err
case next := <-f.scanDelay:
f.sl.DebugContext(ctx, "Delaying scan")
f.scanTimer.Reset(next)
case <-f.scanScheduled:
f.sl.DebugContext(ctx, "Scan was scheduled")
f.scanTimer.Reset(0)
case fsEvents := <-f.watchChan:
f.sl.DebugContext(ctx, "Scan due to watcher")
err = f.scanSubdirs(ctx, fsEvents)
case <-f.restartWatchChan:
f.sl.DebugContext(ctx, "Restart watcher")
err = f.restartWatch(ctx)
case <-f.versionCleanupTimer.C:
f.sl.DebugContext(ctx, "Doing version cleanup")
f.versionCleanupTimerFired(ctx)
}
if err != nil {
if svcutil.IsFatal(err) {
return err
}
f.setError(ctx, err)
}
}
}
func (*folder) BringToFront(string) {}
func (*folder) Override() {}
func (*folder) Revert() {}
func (f *folder) DelayScan(next time.Duration) {
select {
case f.scanDelay <- next:
case <-f.done:
}
}
func (f *folder) ScheduleScan() {
// 1-buffered chan
select {
case f.scanScheduled <- struct{}{}:
default:
}
}
func (f *folder) ignoresUpdated() {
if f.FSWatcherEnabled {
f.scheduleWatchRestart()
}
}
func (f *folder) SchedulePull() {
select {
case f.pullScheduled <- struct{}{}:
default:
// We might be busy doing a pull and thus not reading from this
// channel. The channel is 1-buffered, so one notification will be
// queued to ensure we recheck after the pull, but beyond that we must
// make sure to not block index receiving.
}
}
func (*folder) Jobs(_, _ int) ([]string, []string, int) {
return nil, nil, 0
}
func (f *folder) Scan(subdirs []string) error {
<-f.initialScanFinished
return f.doInSync(func(ctx context.Context) error {
return f.scanSubdirs(ctx, subdirs)
})
}
// doInSync allows to run functions synchronously in folder.serve from exported,
// asynchronously called methods.
func (f *folder) doInSync(fn func(context.Context) error) error {
req := syncRequest{
fn: fn,
err: make(chan error, 1),
}
select {
case f.doInSyncChan <- req:
return <-req.err
case <-f.done:
return context.Canceled
}
}
func (f *folder) Reschedule() {
if f.scanInterval == 0 {
return
}
// Sleep a random time between 3/4 and 5/4 of the configured interval.
sleepNanos := (f.scanInterval.Nanoseconds()*3 + rand.Int63n(2*f.scanInterval.Nanoseconds())) / 4 //nolint:gosec
interval := time.Duration(sleepNanos) * time.Nanosecond
f.sl.Debug("Next rescan scheduled", slog.Duration("interval", interval))
f.scanTimer.Reset(interval)
}
func (f *folder) getHealthErrorAndLoadIgnores() error {
if err := f.getHealthErrorWithoutIgnores(); err != nil {
return err
}
if f.Type != config.FolderTypeReceiveEncrypted {
if err := f.ignores.Load(".stignore"); err != nil && !fs.IsNotExist(err) {
return fmt.Errorf("loading ignores: %w", err)
}
}
return nil
}
func (f *folder) getHealthErrorWithoutIgnores() error {
// Check for folder errors, with the most serious and specific first and
// generic ones like out of space on the home disk later.
if err := f.CheckPath(); err != nil {
return err
}
if minFree := f.model.cfg.Options().MinHomeDiskFree; minFree.Value > 0 {
dbPath := locations.Get(locations.Database)
if usage, err := fs.NewFilesystem(fs.FilesystemTypeBasic, dbPath).Usage("."); err == nil {
if err = config.CheckFreeSpace(minFree, usage); err != nil {
return fmt.Errorf("insufficient space on disk for database (%v): %w", dbPath, err)
}
}
}
return nil
}
func (f *folder) pull(ctx context.Context) (success bool, err error) {
f.pullFailTimer.Stop()
select {
case <-f.pullFailTimer.C:
default:
}
select {
case <-f.initialScanFinished:
default:
// Once the initial scan finished, a pull will be scheduled
return true, nil
}
defer func() {
if success {
// We're good, reset the pause interval.
f.pullPause = f.pullBasePause()
}
}()
// If there is nothing to do, don't even enter sync-waiting state.
needCount, err := f.db.CountNeed(f.folderID, protocol.LocalDeviceID)
if err != nil {
return false, err
}
if needCount.TotalItems() == 0 {
// Clears pull failures on items that were needed before, but aren't anymore.
f.errorsMut.Lock()
f.pullErrors = nil
f.errorsMut.Unlock()
return true, nil
}
// Abort early (before acquiring a token) if there's a folder error
err = f.getHealthErrorWithoutIgnores()
if err != nil {
f.sl.DebugContext(ctx, "Skipping pull due to folder error", slogutil.Error(err))
return false, err
}
// Send only folder doesn't do any io, it only checks for out-of-sync
// items that differ in metadata and updates those.
if f.Type != config.FolderTypeSendOnly {
f.setState(FolderSyncWaiting)
if err := f.ioLimiter.TakeWithContext(ctx, 1); err != nil {
return true, err
}
defer f.ioLimiter.Give(1)
}
startTime := time.Now()
// Check if the ignore patterns changed.
oldHash := f.ignores.Hash()
defer func() {
if f.ignores.Hash() != oldHash {
f.ignoresUpdated()
}
}()
err = f.getHealthErrorAndLoadIgnores()
if err != nil {
f.sl.DebugContext(ctx, "Skipping pull due to folder error", slogutil.Error(err))
return false, err
}
f.setError(ctx, nil)
success, err = f.puller.pull(ctx)
if success && err == nil {
return true, nil
}
// Pulling failed, try again later.
delay := f.pullPause + time.Since(startTime)
f.sl.InfoContext(ctx, "Folder failed to sync, will be retried", slog.String("wait", stringutil.NiceDurationString(delay)))
f.pullFailTimer.Reset(delay)
return false, err
}
func (f *folder) scanSubdirs(ctx context.Context, subDirs []string) error {
f.sl.DebugContext(ctx, "Scanning")
oldHash := f.ignores.Hash()
err := f.getHealthErrorAndLoadIgnores()
if err != nil {
return err
}
f.setError(ctx, nil)
// Check on the way out if the ignore patterns changed as part of scanning
// this folder. If they did we should schedule a pull of the folder so that
// we request things we might have suddenly become unignored and so on.
defer func() {
if f.ignores.Hash() != oldHash {
f.sl.DebugContext(ctx, "Ignore patterns change detected while scanning; triggering puller")
f.ignoresUpdated()
f.SchedulePull()
}
}()
f.setState(FolderScanWaiting)
defer f.setState(FolderIdle)
if err := f.ioLimiter.TakeWithContext(ctx, 1); err != nil {
return err
}
defer f.ioLimiter.Give(1)
metricFolderScans.WithLabelValues(f.ID).Inc()
scanCtx, cancel := context.WithCancel(ctx)
defer cancel()
go addTimeUntilCancelled(scanCtx, metricFolderScanSeconds.WithLabelValues(f.ID))
for i := range subDirs {
sub := osutil.NativeFilename(subDirs[i])
if sub == "" {
// A blank subdirs means to scan the entire folder. We can trim
// the subDirs list and go on our way.
subDirs = nil
break
}
subDirs[i] = sub
}
// Clean the list of subitems to ensure that we start at a known
// directory, and don't scan subdirectories of things we've already
// scanned.
subDirs = unifySubs(subDirs, func(file string) bool {
_, ok, err := f.db.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file)
return err == nil && ok
})
f.setState(FolderScanning)
f.clearScanErrors(subDirs)
batch := f.newScanBatch()
// Schedule a pull after scanning, but only if we actually detected any
// changes.
changes := 0
defer func() {
f.sl.DebugContext(ctx, "Finished scanning", slog.Int("changes", changes))
if changes > 0 {
f.SchedulePull()
}
}()
changesHere, err := f.scanSubdirsChangedAndNew(ctx, subDirs, batch)
changes += changesHere
if err != nil {
return err
}
if err := batch.Flush(); err != nil {
return err
}
if len(subDirs) == 0 {
// If we have no specific subdirectories to traverse, set it to one
// empty prefix so we traverse the entire folder contents once.
subDirs = []string{""}
}
// Do a scan of the database for each prefix, to check for deleted and
// ignored files.
changesHere, err = f.scanSubdirsDeletedAndIgnored(ctx, subDirs, batch)
changes += changesHere
if err != nil {
return err
}
if err := batch.Flush(); err != nil {
return err
}
f.ScanCompleted()
return nil
}
const maxToRemove = 1000
type scanBatch struct {
f *folder
updateBatch *FileInfoBatch
toRemove []string
}
func (f *folder) newScanBatch() *scanBatch {
b := &scanBatch{
f: f,
toRemove: make([]string, 0, maxToRemove),
}
b.updateBatch = NewFileInfoBatch(func(fs []protocol.FileInfo) error {
if err := b.f.getHealthErrorWithoutIgnores(); err != nil {
b.f.sl.Debug("Stopping scan due to folder error", slogutil.Error(err))
return err
}
b.f.updateLocalsFromScanning(fs)
return nil
})
return b
}
func (b *scanBatch) Remove(item string) {
b.toRemove = append(b.toRemove, item)
}
func (b *scanBatch) flushToRemove() error {
if len(b.toRemove) > 0 {
if err := b.f.db.DropFilesNamed(b.f.folderID, protocol.LocalDeviceID, b.toRemove); err != nil {
return err
}
b.toRemove = b.toRemove[:0]
}
return nil
}
func (b *scanBatch) Flush() error {
if err := b.flushToRemove(); err != nil {
return err
}
return b.updateBatch.Flush()
}
func (b *scanBatch) FlushIfFull() error {
if len(b.toRemove) >= maxToRemove {
if err := b.flushToRemove(); err != nil {
return err
}
}
return b.updateBatch.FlushIfFull()
}
// Update adds the fileinfo to the batch for updating, and does a few checks.
// It returns false if the checks result in the file not going to be updated or removed.
func (b *scanBatch) Update(fi protocol.FileInfo) (bool, error) {
// Check for a "virtual" parent directory of encrypted files. We don't track
// it, but check if anything still exists within and delete it otherwise.
if b.f.Type == config.FolderTypeReceiveEncrypted && fi.IsDirectory() && protocol.IsEncryptedParent(fs.PathComponents(fi.Name)) {
if names, err := b.f.mtimefs.DirNames(fi.Name); err == nil && len(names) == 0 {
b.f.mtimefs.Remove(fi.Name)
}
return false, nil
}
// Resolve receive-only items which are identical with the global state or
// the global item is our own receive-only item.
switch gf, ok, err := b.f.db.GetGlobalFile(b.f.folderID, fi.Name); {
case err != nil:
return false, err
case !ok:
case gf.IsReceiveOnlyChanged():
if fi.IsDeleted() {
// Our item is deleted and the global item is our own receive only
// file. No point in keeping track of that.
b.Remove(fi.Name)
b.f.sl.Debug("Deleting deleted receive-only local-changed file", slogutil.FilePath(fi.Name))
return true, nil
}
case (b.f.Type == config.FolderTypeReceiveOnly || b.f.Type == config.FolderTypeReceiveEncrypted) &&
gf.IsEquivalentOptional(fi, protocol.FileInfoComparison{
ModTimeWindow: b.f.modTimeWindow,
IgnorePerms: b.f.IgnorePerms,
IgnoreBlocks: true,
IgnoreFlags: protocol.FlagLocalReceiveOnly,
IgnoreOwnership: !b.f.SyncOwnership && !b.f.SendOwnership,
IgnoreXattrs: !b.f.SyncXattrs && !b.f.SendXattrs,
}):
// What we have locally is equivalent to the global file.
b.f.sl.Debug("Merging identical locally changed item with global", slogutil.FilePath(fi.Name))
fi = gf
}
b.updateBatch.Append(fi)
return true, nil
}
func (f *folder) scanSubdirsChangedAndNew(ctx context.Context, subDirs []string, batch *scanBatch) (int, error) {
changes := 0
// If we return early e.g. due to a folder health error, the scan needs
// to be cancelled.
scanCtx, scanCancel := context.WithCancel(ctx)
defer scanCancel()
scanConfig := scanner.Config{
Folder: f.ID,
Subs: subDirs,
Matcher: f.ignores,
TempLifetime: time.Duration(f.model.cfg.Options().KeepTemporariesH) * time.Hour,
CurrentFiler: cFiler{db: f.db, folder: f.folderID},
Filesystem: f.mtimefs,
IgnorePerms: f.IgnorePerms,
AutoNormalize: f.AutoNormalize,
Hashers: f.model.numHashers(f.ID),
ShortID: f.shortID,
ProgressTickIntervalS: f.ScanProgressIntervalS,
LocalFlags: f.localFlags,
ModTimeWindow: f.modTimeWindow,
EventLogger: f.evLogger,
ScanOwnership: f.SendOwnership || f.SyncOwnership,
ScanXattrs: f.SendXattrs || f.SyncXattrs,
XattrFilter: f.XattrFilter,
}
var fchan chan scanner.ScanResult
if f.Type == config.FolderTypeReceiveEncrypted {
fchan = scanner.WalkWithoutHashing(scanCtx, scanConfig)
} else {
fchan = scanner.Walk(scanCtx, scanConfig)
}
alreadyUsedOrExisting := make(map[string]struct{})
for res := range fchan {
if res.Err != nil {
f.newScanError(res.Path, res.Err)
continue
}
if err := batch.FlushIfFull(); err != nil {
// Prevent a race between the scan aborting due to context
// cancellation and releasing the snapshot in defer here.
scanCancel()
for range fchan {
}
return changes, err
}
if ok, err := batch.Update(res.File); err != nil {
return 0, err
} else if ok {
changes++
}
switch f.Type {
case config.FolderTypeReceiveOnly, config.FolderTypeReceiveEncrypted:
default:
if nf, ok := f.findRename(ctx, res.File, alreadyUsedOrExisting); ok {
if ok, err := batch.Update(nf); err != nil {
return 0, err
} else if ok {
changes++
}
}
}
}
return changes, nil
}
func (f *folder) scanSubdirsDeletedAndIgnored(ctx context.Context, subDirs []string, batch *scanBatch) (int, error) {
var toIgnore []protocol.FileInfo
ignoredParent := ""
changes := 0
outer:
for _, sub := range subDirs {
for fi, err := range itererr.Zip(f.db.AllLocalFilesWithPrefix(f.folderID, protocol.LocalDeviceID, sub)) {
if err != nil {
return changes, err
}
select {
case <-ctx.Done():
break outer
default:
}
if err := batch.FlushIfFull(); err != nil {
return 0, err
}
if ignoredParent != "" && !fs.IsParent(fi.Name, ignoredParent) {
for _, file := range toIgnore {
f.sl.DebugContext(ctx, "Marking file as ignored", slogutil.FilePath(file.Name))
nf := file
nf.SetIgnored()
if ok, err := batch.Update(nf); err != nil {
return 0, err
} else if ok {
changes++
}
if err := batch.FlushIfFull(); err != nil {
return 0, err
}
}
toIgnore = toIgnore[:0]
ignoredParent = ""
}
switch ignored := f.ignores.Match(fi.Name).IsIgnored(); {
case fi.IsIgnored() && ignored:
continue
case !fi.IsIgnored() && ignored:
// File was not ignored at last pass but has been ignored.
if fi.IsDirectory() {
// Delay ignoring as a child might be unignored.
toIgnore = append(toIgnore, fi)
if ignoredParent == "" {
// If the parent wasn't ignored already, set
// this path as the "highest" ignored parent
ignoredParent = fi.Name
}
continue
}
f.sl.DebugContext(ctx, "Marking file as ignored", slogutil.FilePath(fi.Name))
nf := fi
nf.SetIgnored()
if ok, err := batch.Update(nf); err != nil {
return 0, err
} else if ok {
changes++
}
case fi.IsIgnored() && !ignored:
// Successfully scanned items are already un-ignored during
// the scan, so check whether it is deleted.
fallthrough
case !fi.IsIgnored() && !fi.IsDeleted() && !fi.IsUnsupported():
// The file is not ignored, deleted or unsupported. Lets check if
// it's still here. Simply stat:ing it won't do as there are
// tons of corner cases (e.g. parent dir->symlink, missing
// permissions)
if !osutil.IsDeleted(f.mtimefs, fi.Name) {
if ignoredParent != "" {
// Don't ignore parents of this not ignored item
toIgnore = toIgnore[:0]
ignoredParent = ""
}
continue
}
nf := fi
nf.SetDeleted(f.shortID)
nf.LocalFlags = f.localFlags
if fi.ShouldConflict() {
// We do not want to override the global version with
// the deleted file. Setting to an empty version makes
// sure the file gets in sync on the following pull.
nf.Version = protocol.Vector{}
}
f.sl.DebugContext(ctx, "Marking file as deleted", slogutil.FilePath(nf.Name))
if ok, err := batch.Update(nf); err != nil {
return 0, err
} else if ok {
changes++
}
case fi.IsDeleted() && fi.IsReceiveOnlyChanged():
switch f.Type {
case config.FolderTypeReceiveOnly, config.FolderTypeReceiveEncrypted:
switch gf, ok, err := f.db.GetGlobalFile(f.folderID, fi.Name); {
case err != nil:
return 0, err
case !ok:
case gf.IsReceiveOnlyChanged():
f.sl.DebugContext(ctx, "Removing deleted receive-only item that is globally receive-only from db", slogutil.FilePath(fi.Name))
batch.Remove(fi.Name)
changes++
case gf.IsDeleted():
// Our item is deleted and the global item is deleted too. We just
// pretend it is a normal deleted file (nobody cares about that).
f.sl.DebugContext(ctx, "Marking globally deleted item as not locally changed", slogutil.FilePath(fi.Name))
fi.LocalFlags &^= protocol.FlagLocalReceiveOnly
if ok, err := batch.Update(fi); err != nil {
return 0, err
} else if ok {
changes++
}
}
default:
// No need to bump the version for a file that was and is
// deleted and just the folder type/local flags changed.
fi.LocalFlags &^= protocol.FlagLocalReceiveOnly
f.sl.DebugContext(ctx, "Removing receive-only flag on deleted item", slogutil.FilePath(fi.Name))
if ok, err := batch.Update(fi); err != nil {
return 0, err
} else if ok {
changes++
}
}
}
}
select {
case <-ctx.Done():
return changes, ctx.Err()
default:
}
if len(toIgnore) > 0 {
for _, file := range toIgnore {
f.sl.DebugContext(ctx, "Marking file as ignored", slogutil.FilePath(file.Name))
nf := file
nf.SetIgnored()
if ok, err := batch.Update(nf); err != nil {
return 0, err
} else if ok {
changes++
}
if err := batch.FlushIfFull(); err != nil {
return 0, err
}
}
toIgnore = toIgnore[:0]
}
}
return changes, nil
}
func (f *folder) findRename(ctx context.Context, file protocol.FileInfo, alreadyUsedOrExisting map[string]struct{}) (protocol.FileInfo, bool) {
if len(file.Blocks) == 0 || file.Size == 0 {
return protocol.FileInfo{}, false
}
found := false
nf := protocol.FileInfo{}
loop:
for fi, err := range itererr.Zip(f.db.AllLocalFilesWithBlocksHash(f.folderID, file.BlocksHash)) {
if err != nil {
return protocol.FileInfo{}, false
}
select {
case <-ctx.Done():
break loop
default:
}
if fi.Name == file.Name {
alreadyUsedOrExisting[fi.Name] = struct{}{}
continue
}
if _, ok := alreadyUsedOrExisting[fi.Name]; ok {
continue
}
if fi.ShouldConflict() {
continue
}
if f.ignores.Match(fi.Name).IsIgnored() {
continue
}
// Only check the size.
// No point checking block equality, as that uses BlocksHash comparison if that is set (which it will be).
// No point checking BlocksHash comparison as WithBlocksHash already does that.
if file.Size != fi.Size {
continue
}
alreadyUsedOrExisting[fi.Name] = struct{}{}
if !osutil.IsDeleted(f.mtimefs, fi.Name) {
continue
}
var ok bool
nf, ok, err = f.db.GetDeviceFile(f.folderID, protocol.LocalDeviceID, fi.Name)
if err != nil || !ok || nf.Sequence != fi.Sequence {
continue
}
nf.SetDeleted(f.shortID)
nf.LocalFlags = f.localFlags
found = true
break
}
return nf, found
}
func (f *folder) scanTimerFired(ctx context.Context) error {
err := f.scanSubdirs(ctx, nil)
select {
case <-f.initialScanFinished:
default:
if err != nil {
f.sl.ErrorContext(ctx, "Failed initial scan", slogutil.Error(err))
} else {
f.sl.InfoContext(ctx, "Completed initial scan")
}
close(f.initialScanFinished)
}
f.Reschedule()
return err
}
func (f *folder) versionCleanupTimerFired(ctx context.Context) {
f.setState(FolderCleanWaiting)
defer f.setState(FolderIdle)
if err := f.ioLimiter.TakeWithContext(ctx, 1); err != nil {
return
}
defer f.ioLimiter.Give(1)
f.setState(FolderCleaning)
if err := f.versioner.Clean(ctx); err != nil {
f.sl.WarnContext(ctx, "Failed to clean versions", slogutil.Error(err))
}
f.versionCleanupTimer.Reset(f.versionCleanupInterval)
}
func (f *folder) WatchError() error {
f.watchMut.Lock()
defer f.watchMut.Unlock()
return f.watchErr
}
// stopWatch immediately aborts watching and may be called asynchronously
func (f *folder) stopWatch() {
f.watchMut.Lock()
f.watchCancel()
f.watchMut.Unlock()
f.setWatchError(nil, 0)
}
// scheduleWatchRestart makes sure watching is restarted from the main for loop
// in a folder's Serve and thus may be called asynchronously (e.g. when ignores change).
func (f *folder) scheduleWatchRestart() {
select {
case f.restartWatchChan <- struct{}{}:
default:
// We might be busy doing a pull and thus not reading from this
// channel. The channel is 1-buffered, so one notification will be
// queued to ensure we recheck after the pull.
}
}
// restartWatch should only ever be called synchronously. If you want to use
// this asynchronously, you should probably use scheduleWatchRestart instead.
func (f *folder) restartWatch(ctx context.Context) error {
f.stopWatch()
f.startWatch(ctx)
return f.scanSubdirs(ctx, nil)
}
// startWatch should only ever be called synchronously. If you want to use
// this asynchronously, you should probably use scheduleWatchRestart instead.
func (f *folder) startWatch(ctx context.Context) {
watchCtx, cancel := context.WithCancel(ctx)
f.watchMut.Lock()
f.watchChan = make(chan []string)
f.watchCancel = cancel
f.watchMut.Unlock()
go f.monitorWatch(watchCtx)
}
// monitorWatch starts the filesystem watching and retries every minute on failure.
// It should not be used except in startWatch.
func (f *folder) monitorWatch(ctx context.Context) {
failTimer := time.NewTimer(0)
aggrCtx, aggrCancel := context.WithCancel(ctx)
var err error
var eventChan <-chan fs.Event
var errChan <-chan error
warnedOutside := false
var lastWatch time.Time
pause := time.Minute
// Subscribe to folder summaries only on kqueue systems, to warn about potential high resource usage
var summarySub events.Subscription
var summaryChan <-chan events.Event
if fs.WatchKqueue && !f.warnedKqueue {
summarySub = f.evLogger.Subscribe(events.FolderSummary)
summaryChan = summarySub.C()
}
defer func() {
aggrCancel() // aggrCancel might e re-assigned -> call within closure
if summaryChan != nil {
summarySub.Unsubscribe()
}
}()
for {
select {
case <-failTimer.C:
eventChan, errChan, err = f.mtimefs.Watch(".", f.ignores, ctx, f.IgnorePerms)
// We do this once per minute initially increased to
// max one hour in case of repeat failures.
f.scanOnWatchErr()
f.setWatchError(err, pause)
if err != nil {
failTimer.Reset(pause)
if pause < 60*time.Minute {
pause *= 2
}
continue
}
lastWatch = time.Now()
watchaggregator.Aggregate(aggrCtx, eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, f.evLogger)
f.sl.DebugContext(ctx, "Started filesystem watcher")
case err = <-errChan:
var next time.Duration
if dur := time.Since(lastWatch); dur > pause {
pause = time.Minute
next = 0
} else {
next = pause - dur
if pause < 60*time.Minute {
pause *= 2
}
}
failTimer.Reset(next)
f.setWatchError(err, next)
// This error was previously a panic and should never occur, so generate
// a warning, but don't do it repetitively.
var errOutside *fs.WatchEventOutsideRootError
if errors.As(err, &errOutside) {
if !warnedOutside {
slog.WarnContext(ctx, err.Error()) //nolint:sloglint
warnedOutside = true
}
f.evLogger.Log(events.Failure, "watching for changes encountered an event outside of the filesystem root")
}
aggrCancel()
errChan = nil
aggrCtx, aggrCancel = context.WithCancel(ctx)
case ev := <-summaryChan:
if data, ok := ev.Data.(FolderSummaryEventData); !ok {
f.evLogger.Log(events.Failure, "Unexpected type of folder-summary event in folder.monitorWatch")
} else if data.Folder == f.folderID && data.Summary.LocalTotalItems-data.Summary.LocalDeleted > kqueueItemCountThreshold {
f.warnedKqueue = true
summarySub.Unsubscribe()
summaryChan = nil
slog.WarnContext(ctx, "Filesystem watching (kqueue) is enabled with a lot of files/directories, which requires a lot of resources and might slow down your system significantly", f.LogAttr())
}
case <-ctx.Done():
aggrCancel() // for good measure and keeping the linters happy
return
}
}
}
// setWatchError sets the current error state of the watch and should be called
// regardless of whether err is nil or not.
func (f *folder) setWatchError(err error, nextTryIn time.Duration) {
f.watchMut.Lock()
prevErr := f.watchErr
f.watchErr = err
f.watchMut.Unlock()
if err != prevErr { //nolint:errorlint
data := map[string]interface{}{
"folder": f.ID,
}
if prevErr != nil {
data["from"] = prevErr.Error()
}
if err != nil {
data["to"] = err.Error()
}
f.evLogger.Log(events.FolderWatchStateChanged, data)
}
if err == nil {
return
}
if prevErr != err { //nolint:errorlint
f.sl.Warn("Failed to start filesystem watcher", slog.String("wait", nextTryIn.String()), slogutil.Error(err))
} else {
f.sl.Debug("Failed to start filesystem watcher", slog.String("wait", nextTryIn.String()), slogutil.Error(err))
}
}
// scanOnWatchErr schedules a full scan immediately if an error occurred while watching.
func (f *folder) scanOnWatchErr() {
f.watchMut.Lock()
err := f.watchErr
f.watchMut.Unlock()
if err != nil {
f.DelayScan(0)
}
}
func (f *folder) setError(ctx context.Context, err error) {
select {
case <-ctx.Done():
return
default:
}
_, _, oldErr := f.getState()
if (err != nil && oldErr != nil && oldErr.Error() == err.Error()) || (err == nil && oldErr == nil) {
return
}
if err != nil {
if oldErr == nil {
f.sl.WarnContext(ctx, "Error on folder", slogutil.Error(err))
} else {
f.sl.InfoContext(ctx, "Folder error changed", slogutil.Error(err), slog.Any("previously", oldErr))
}
} else {
f.sl.InfoContext(ctx, "Folder error cleared")
f.SchedulePull()
}
if f.FSWatcherEnabled {
if err != nil {
f.stopWatch()
} else {
f.scheduleWatchRestart()
}
}
f.stateTracker.setError(err)
}
func (f *folder) pullBasePause() time.Duration {
if f.PullerPauseS == 0 {
return defaultPullerPause
}
return time.Duration(f.PullerPauseS) * time.Second
}
func (f *folder) String() string {
return fmt.Sprintf("%s/%s@%p", f.Type, f.folderID, f)
}
func (f *folder) newScanError(path string, err error) {
f.errorsMut.Lock()
f.sl.Warn("Failed to scan", slogutil.FilePath(path), slogutil.Error(err))
f.scanErrors = append(f.scanErrors, FileError{
Err: err.Error(),
Path: path,
})
f.errorsMut.Unlock()
}
func (f *folder) clearScanErrors(subDirs []string) {
f.errorsMut.Lock()
defer f.errorsMut.Unlock()
if len(subDirs) == 0 {
f.scanErrors = nil
return
}
filtered := f.scanErrors[:0]
outer:
for _, fe := range f.scanErrors {
for _, sub := range subDirs {
if fe.Path == sub || fs.IsParent(fe.Path, sub) {
continue outer
}
}
filtered = append(filtered, fe)
}
f.scanErrors = filtered
}
func (f *folder) Errors() []FileError {
f.errorsMut.Lock()
defer f.errorsMut.Unlock()
scanLen := len(f.scanErrors)
errors := make([]FileError, scanLen+len(f.pullErrors))
copy(errors[:scanLen], f.scanErrors)
copy(errors[scanLen:], f.pullErrors)
slices.SortFunc(errors, func(a, b FileError) int {
return strings.Compare(a.Path, b.Path)
})
return errors
}
// ScheduleForceRescan marks the file such that it gets rehashed on next scan, and schedules a scan.
func (f *folder) ScheduleForceRescan(path string) {
f.forcedRescanPathsMut.Lock()
f.forcedRescanPaths[path] = struct{}{}
f.forcedRescanPathsMut.Unlock()
select {
case f.forcedRescanRequested <- struct{}{}:
default:
}
}
func (f *folder) updateLocalsFromScanning(fs []protocol.FileInfo) error {
if err := f.updateLocals(fs); err != nil {
return err
}
f.emitDiskChangeEvents(fs, events.LocalChangeDetected)
return nil
}
func (f *folder) updateLocalsFromPulling(fs []protocol.FileInfo) error {
if err := f.updateLocals(fs); err != nil {
return err
}
f.emitDiskChangeEvents(fs, events.RemoteChangeDetected)
return nil
}
func (f *folder) updateLocals(fs []protocol.FileInfo) error {
if err := f.db.Update(f.folderID, protocol.LocalDeviceID, fs); err != nil {
return err
}
filenames := make([]string, len(fs))
f.forcedRescanPathsMut.Lock()
for i, file := range fs {
filenames[i] = file.Name
// No need to rescan a file that was changed since anyway.
delete(f.forcedRescanPaths, file.Name)
}
f.forcedRescanPathsMut.Unlock()
seq, err := f.db.GetDeviceSequence(f.folderID, protocol.LocalDeviceID)
if err != nil {
return err
}
f.evLogger.Log(events.LocalIndexUpdated, map[string]interface{}{
"folder": f.ID,
"items": len(fs),
"filenames": filenames,
"sequence": seq,
"version": seq, // legacy for sequence
})
return nil
}
func (f *folder) emitDiskChangeEvents(fs []protocol.FileInfo, typeOfEvent events.EventType) {
for _, file := range fs {
if file.IsInvalid() {
continue
}
objType := "file"
action := "modified"
if file.IsDeleted() {
action = "deleted"
}
if file.IsSymlink() {
objType = "symlink"
} else if file.IsDirectory() {
objType = "dir"
}
// Two different events can be fired here based on what EventType is passed into function
f.evLogger.Log(typeOfEvent, map[string]string{
"folder": f.ID,
"folderID": f.ID, // incorrect, deprecated, kept for historical compliance
"label": f.Label,
"action": action,
"type": objType,
"path": filepath.FromSlash(file.Name),
"modifiedBy": file.ModifiedBy.String(),
})
}
}
func (f *folder) handleForcedRescans(ctx context.Context) error {
f.forcedRescanPathsMut.Lock()
paths := make([]string, 0, len(f.forcedRescanPaths))
for path := range f.forcedRescanPaths {
paths = append(paths, path)
}
f.forcedRescanPaths = make(map[string]struct{})
f.forcedRescanPathsMut.Unlock()
if len(paths) == 0 {
return nil
}
batch := NewFileInfoBatch(func(fs []protocol.FileInfo) error {
return f.db.Update(f.folderID, protocol.LocalDeviceID, fs)
})
for _, path := range paths {
if err := batch.FlushIfFull(); err != nil {
return err
}
fi, ok, err := f.db.GetDeviceFile(f.folderID, protocol.LocalDeviceID, path)
if err != nil {
return err
}
if !ok {
continue
}
fi.SetMustRescan()
batch.Append(fi)
}
if err := batch.Flush(); err != nil {
return err
}
return f.scanSubdirs(ctx, paths)
}
// The exists function is expected to return true for all known paths
// (excluding "" and ".")
func unifySubs(dirs []string, exists func(dir string) bool) []string {
if len(dirs) == 0 {
return nil
}
slices.Sort(dirs)
if dirs[0] == "" || dirs[0] == "." || dirs[0] == string(fs.PathSeparator) {
return nil
}
prev := "./" // Anything that can't be parent of a clean path
for i := 0; i < len(dirs); {
dir, err := fs.Canonicalize(dirs[i])
if err != nil {
slog.Debug("Skipping directory for scan", slog.String("dir", dirs[i]), slogutil.Error(err))
dirs = append(dirs[:i], dirs[i+1:]...)
continue
}
if dir == prev || fs.IsParent(dir, prev) {
dirs = append(dirs[:i], dirs[i+1:]...)
continue
}
parent := filepath.Dir(dir)
for parent != "." && parent != string(fs.PathSeparator) && !exists(parent) {
dir = parent
parent = filepath.Dir(dir)
}
dirs[i] = dir
prev = dir
i++
}
return dirs
}
type cFiler struct {
db db.DB
folder string
}
// Implements scanner.CurrentFiler
func (cf cFiler) CurrentFile(file string) (protocol.FileInfo, bool) {
fi, ok, err := cf.db.GetDeviceFile(cf.folder, protocol.LocalDeviceID, file)
if err != nil || !ok {
return protocol.FileInfo{}, false
}
return fi, true
}