Files
navidrome/scanner/watcher.go
Deluan 0790f66627 fix(scanner): increase watcher channel buffers to prevent dropped filesystem events
When files were moved between libraries, the small channel buffers (size 1)
throughout the watcher pipeline caused backpressure that led to dropped
filesystem events. This meant only some of the affected folders were scanned,
preventing cross-library move detection from working correctly.

Increase all watcher channel buffers to 500 and switch to blocking sends
to ensure no filesystem events are silently dropped.
2026-03-12 17:07:34 -04:00

338 lines
10 KiB
Go

package scanner
import (
"context"
"fmt"
"io/fs"
"path/filepath"
"sync"
"time"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/core/storage"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/utils/singleton"
)
type Watcher interface {
Run(ctx context.Context) error
Watch(ctx context.Context, lib *model.Library) error
StopWatching(ctx context.Context, libraryID int) error
}
type watcher struct {
mainCtx context.Context
ds model.DataStore
scanner model.Scanner
triggerWait time.Duration
watcherNotify chan scanNotification
libraryWatchers map[int]*libraryWatcherInstance
mu sync.RWMutex
}
type libraryWatcherInstance struct {
library *model.Library
cancel context.CancelFunc
}
type scanNotification struct {
Library *model.Library
FolderPath string
}
// GetWatcher returns the watcher singleton
func GetWatcher(ds model.DataStore, s model.Scanner) Watcher {
return singleton.GetInstance(func() *watcher {
return &watcher{
ds: ds,
scanner: s,
triggerWait: conf.Server.Scanner.WatcherWait,
watcherNotify: make(chan scanNotification, 500),
libraryWatchers: make(map[int]*libraryWatcherInstance),
}
})
}
func (w *watcher) Run(ctx context.Context) error {
// Keep the main context to be used in all watchers added later
w.mainCtx = ctx
// Start watchers for all existing libraries
libs, err := w.ds.Library(ctx).GetAll()
if err != nil {
return fmt.Errorf("getting libraries: %w", err)
}
for _, lib := range libs {
if err := w.Watch(ctx, &lib); err != nil {
log.Warn(ctx, "Failed to start watcher for existing library", "libraryID", lib.ID, "name", lib.Name, "path", lib.Path, err)
}
}
// Main scan triggering loop
trigger := time.NewTimer(w.triggerWait)
trigger.Stop()
targets := make(map[model.ScanTarget]struct{})
for {
select {
case <-trigger.C:
log.Info("Watcher: Triggering scan for changed folders", "numTargets", len(targets))
status, err := w.scanner.Status(ctx)
if err != nil {
log.Error(ctx, "Watcher: Error retrieving Scanner status", err)
break
}
if status.Scanning {
log.Debug(ctx, "Watcher: Already scanning, will retry later", "waitTime", w.triggerWait*3)
trigger.Reset(w.triggerWait * 3)
continue
}
// Convert targets map to slice
targetSlice := make([]model.ScanTarget, 0, len(targets))
for target := range targets {
targetSlice = append(targetSlice, target)
}
// Clear targets for next batch
targets = make(map[model.ScanTarget]struct{})
go func() {
var err error
if conf.Server.DevSelectiveWatcher {
_, err = w.scanner.ScanFolders(ctx, false, targetSlice)
} else {
_, err = w.scanner.ScanAll(ctx, false)
}
if err != nil {
log.Error(ctx, "Watcher: Error scanning", err)
} else {
log.Info(ctx, "Watcher: Scan completed")
}
}()
case <-ctx.Done():
// Stop all library watchers
w.mu.Lock()
for libraryID, instance := range w.libraryWatchers {
log.Debug(ctx, "Stopping library watcher due to context cancellation", "libraryID", libraryID)
instance.cancel()
}
w.libraryWatchers = make(map[int]*libraryWatcherInstance)
w.mu.Unlock()
return nil
case notification := <-w.watcherNotify:
// Reset the trigger timer for debounce
trigger.Reset(w.triggerWait)
lib := notification.Library
folderPath := notification.FolderPath
// If already scheduled for scan, skip
target := model.ScanTarget{LibraryID: lib.ID, FolderPath: folderPath}
if _, exists := targets[target]; exists {
continue
}
targets[target] = struct{}{}
log.Debug(ctx, "Watcher: Detected changes. Waiting for more changes before triggering scan",
"libraryID", lib.ID, "name", lib.Name, "path", lib.Path, "folderPath", folderPath)
}
}
}
func (w *watcher) Watch(ctx context.Context, lib *model.Library) error {
w.mu.Lock()
defer w.mu.Unlock()
// If Run() hasn't been called yet, mainCtx will be nil - skip watching
if w.mainCtx == nil {
log.Debug(ctx, "Watcher not started yet, skipping watch for library", "libraryID", lib.ID, "name", lib.Name)
return nil
}
// Stop existing watcher if any
if existingInstance, exists := w.libraryWatchers[lib.ID]; exists {
log.Debug(ctx, "Stopping existing watcher before starting new one", "libraryID", lib.ID, "name", lib.Name)
existingInstance.cancel()
}
// Start new watcher
watcherCtx, cancel := context.WithCancel(w.mainCtx) //nolint:gosec // cancel is stored in instance and called on shutdown
instance := &libraryWatcherInstance{
library: lib,
cancel: cancel,
}
w.libraryWatchers[lib.ID] = instance
// Start watching in a goroutine
go func() {
defer func() {
w.mu.Lock()
if currentInstance, exists := w.libraryWatchers[lib.ID]; exists && currentInstance == instance {
delete(w.libraryWatchers, lib.ID)
}
w.mu.Unlock()
}()
err := w.watchLibrary(watcherCtx, lib)
if err != nil && watcherCtx.Err() == nil { // Only log error if not due to cancellation
log.Error(ctx, "Watcher error", "libraryID", lib.ID, "name", lib.Name, "path", lib.Path, err)
}
}()
log.Info(ctx, "Started watcher for library", "libraryID", lib.ID, "name", lib.Name, "path", lib.Path)
return nil
}
func (w *watcher) StopWatching(ctx context.Context, libraryID int) error {
w.mu.Lock()
defer w.mu.Unlock()
instance, exists := w.libraryWatchers[libraryID]
if !exists {
log.Debug(ctx, "No watcher found to stop", "libraryID", libraryID)
return nil
}
instance.cancel()
delete(w.libraryWatchers, libraryID)
log.Info(ctx, "Stopped watcher for library", "libraryID", libraryID, "name", instance.library.Name)
return nil
}
// watchLibrary implements the core watching logic for a single library (extracted from old watchLib function)
func (w *watcher) watchLibrary(ctx context.Context, lib *model.Library) error {
s, err := storage.For(lib.Path)
if err != nil {
return fmt.Errorf("creating storage: %w", err)
}
fsys, err := s.FS()
if err != nil {
return fmt.Errorf("getting FS: %w", err)
}
watcher, ok := s.(storage.Watcher)
if !ok {
log.Info(ctx, "Watcher not supported for storage type", "libraryID", lib.ID, "path", lib.Path)
return nil
}
c, err := watcher.Start(ctx)
if err != nil {
return fmt.Errorf("starting watcher: %w", err)
}
absLibPath, err := filepath.Abs(lib.Path)
if err != nil {
return fmt.Errorf("converting to absolute path: %w", err)
}
log.Info(ctx, "Watcher started for library", "libraryID", lib.ID, "name", lib.Name, "path", lib.Path, "absoluteLibPath", absLibPath)
return w.processLibraryEvents(ctx, lib, fsys, c, absLibPath)
}
// processLibraryEvents processes filesystem events for a library.
func (w *watcher) processLibraryEvents(ctx context.Context, lib *model.Library, fsys storage.MusicFS, events <-chan string, absLibPath string) error {
for {
select {
case <-ctx.Done():
log.Debug(ctx, "Watcher stopped due to context cancellation", "libraryID", lib.ID, "name", lib.Name)
return nil
case path := <-events:
path, err := filepath.Rel(absLibPath, path)
if err != nil {
log.Error(ctx, "Error getting relative path", "libraryID", lib.ID, "absolutePath", absLibPath, "path", path, err)
continue
}
if isIgnoredPath(ctx, fsys, path) {
log.Trace(ctx, "Ignoring change", "libraryID", lib.ID, "path", path)
continue
}
log.Trace(ctx, "Detected change", "libraryID", lib.ID, "path", path, "absoluteLibPath", absLibPath)
// Check if the original path (before resolution) matches .ndignore patterns
// This is crucial for deleted folders - if a deleted folder matches .ndignore,
// we should ignore it BEFORE resolveFolderPath walks up to the parent
if w.shouldIgnoreFolderPath(ctx, fsys, path) {
log.Debug(ctx, "Ignoring change matching .ndignore pattern", "libraryID", lib.ID, "path", path)
continue
}
// Find the folder to scan - validate path exists as directory, walk up if needed
folderPath := resolveFolderPath(fsys, path)
// Double-check after resolution in case the resolved path is different and also matches patterns
if folderPath != path && w.shouldIgnoreFolderPath(ctx, fsys, folderPath) {
log.Trace(ctx, "Ignoring change in folder matching .ndignore pattern", "libraryID", lib.ID, "folderPath", folderPath)
continue
}
// Notify the main watcher of changes. This will trigger a scan after the debounce period.
w.watcherNotify <- scanNotification{Library: lib, FolderPath: folderPath}
}
}
}
// resolveFolderPath takes a path (which may be a file or directory) and returns
// the folder path to scan. If the path is a file, it walks up to find the parent
// directory. Returns empty string if the path should scan the library root.
func resolveFolderPath(fsys fs.FS, path string) string {
// Handle root paths immediately
if path == "." || path == "" {
return ""
}
folderPath := path
for {
info, err := fs.Stat(fsys, folderPath)
if err == nil && info.IsDir() {
// Found a valid directory
return folderPath
}
if folderPath == "." || folderPath == "" {
// Reached root, scan entire library
return ""
}
// Walk up the tree
dir, _ := filepath.Split(folderPath)
if dir == "" || dir == "." {
return ""
}
// Remove trailing slash
folderPath = filepath.Clean(dir)
}
}
// shouldIgnoreFolderPath checks if the given folderPath should be ignored based on .ndignore patterns
// in the library. It pushes all parent folders onto the IgnoreChecker stack before checking.
func (w *watcher) shouldIgnoreFolderPath(ctx context.Context, fsys storage.MusicFS, folderPath string) bool {
checker := newIgnoreChecker(fsys)
err := checker.PushAllParents(ctx, folderPath)
if err != nil {
log.Warn(ctx, "Watcher: Error pushing ignore patterns for folder", "path", folderPath, err)
}
return checker.ShouldIgnore(ctx, folderPath)
}
func isIgnoredPath(_ context.Context, _ fs.FS, path string) bool {
baseDir, name := filepath.Split(path)
switch {
case model.IsAudioFile(path):
return false
case model.IsValidPlaylist(path):
return false
case model.IsImageFile(path):
return false
case name == ".DS_Store":
return true
}
// As it can be a deletion and not a change, we cannot reliably know if the path is a file or directory.
// But at this point, we can assume it's a directory. If it's a file, it would be ignored anyway
return isDirIgnored(baseDir)
}