Files
tailscale/tailsync/tailsyncimpl/watcher/watcher.go
Jason O'Donnell 6e71a15e59 tailsync
2026-03-07 08:53:59 -05:00

307 lines
6.1 KiB
Go

// Copyright (c) Tailscale Inc & contributors
// SPDX-License-Identifier: BSD-3-Clause
// Package watcher provides filesystem change detection using fsnotify
// with periodic full-scan fallback for the tailsync subsystem.
package watcher
import (
"context"
"os"
"path/filepath"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"tailscale.com/tailsync/tailsyncimpl/ignore"
"tailscale.com/types/logger"
)
// Event represents a filesystem change event.
type Event struct {
// Path is the relative path within the watched root.
Path string
// Op is the type of change.
Op Op
}
// Op describes the type of filesystem operation.
type Op uint8
const (
OpCreate Op = iota
OpModify
OpDelete
OpRename
)
// Watcher watches a directory tree for changes and emits events.
type Watcher struct {
logf logger.Logf
root string
matcher *ignore.Matcher
debounce time.Duration
scanInterval time.Duration
fsw *fsnotify.Watcher
events chan []Event
cancel context.CancelFunc
wg sync.WaitGroup
}
// Config configures a Watcher.
type Config struct {
// Root is the absolute path to watch.
Root string
// Matcher determines which paths to ignore.
Matcher *ignore.Matcher
// Logf is the logging function.
Logf logger.Logf
// Debounce is how long to wait after the last event before emitting.
// Defaults to 200ms.
Debounce time.Duration
// ScanInterval is how often to do a full reconciliation scan.
// Defaults to 60s.
ScanInterval time.Duration
}
// New creates and starts a new Watcher. Events are delivered on the
// channel returned by Events(). Call Close() to stop watching.
func New(cfg Config) (*Watcher, error) {
if cfg.Debounce == 0 {
cfg.Debounce = 200 * time.Millisecond
}
if cfg.ScanInterval == 0 {
cfg.ScanInterval = 60 * time.Second
}
if cfg.Logf == nil {
cfg.Logf = logger.Discard
}
fsw, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
w := &Watcher{
logf: cfg.Logf,
root: cfg.Root,
matcher: cfg.Matcher,
debounce: cfg.Debounce,
scanInterval: cfg.ScanInterval,
fsw: fsw,
events: make(chan []Event, 64),
cancel: cancel,
}
if err := w.addWatchesRecursive(cfg.Root); err != nil {
fsw.Close()
cancel()
return nil, err
}
w.wg.Add(2)
go w.eventLoop(ctx)
go w.scanLoop(ctx)
return w, nil
}
// Events returns the channel on which batched events are delivered.
func (w *Watcher) Events() <-chan []Event {
return w.events
}
// Close stops the watcher and releases resources.
func (w *Watcher) Close() error {
w.cancel()
err := w.fsw.Close()
w.wg.Wait()
close(w.events)
return err
}
// ScanFull performs a full scan of the watched root and returns all
// file paths found (relative to root). This is used for initial index
// building and periodic reconciliation.
func (w *Watcher) ScanFull() ([]string, error) {
return w.scanDir(w.root)
}
func (w *Watcher) scanDir(root string) ([]string, error) {
var paths []string
err := filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error {
if err != nil {
return nil // skip errors, log them
}
rel, err := filepath.Rel(root, path)
if err != nil {
return nil
}
if rel == "." {
return nil
}
if w.matcher.Match(rel, d.IsDir()) {
if d.IsDir() {
return filepath.SkipDir
}
return nil
}
if !d.IsDir() {
paths = append(paths, rel)
}
return nil
})
return paths, err
}
func (w *Watcher) addWatchesRecursive(root string) error {
return filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error {
if err != nil {
return nil
}
if !d.IsDir() {
return nil
}
rel, err := filepath.Rel(w.root, path)
if err != nil {
return nil
}
if rel != "." && w.matcher.Match(rel, true) {
return filepath.SkipDir
}
if err := w.fsw.Add(path); err != nil {
w.logf("tailsync: watcher: failed to watch %s: %v", path, err)
}
return nil
})
}
func (w *Watcher) eventLoop(ctx context.Context) {
defer w.wg.Done()
var (
pending = make(map[string]Op)
timer *time.Timer
timerC <-chan time.Time
)
flush := func() {
if len(pending) == 0 {
return
}
batch := make([]Event, 0, len(pending))
for path, op := range pending {
batch = append(batch, Event{Path: path, Op: op})
}
pending = make(map[string]Op)
select {
case w.events <- batch:
case <-ctx.Done():
return
}
}
for {
select {
case <-ctx.Done():
flush()
return
case ev, ok := <-w.fsw.Events:
if !ok {
flush()
return
}
rel, err := filepath.Rel(w.root, ev.Name)
if err != nil {
continue
}
info, statErr := os.Lstat(ev.Name)
isDir := statErr == nil && info.IsDir()
if w.matcher.Match(rel, isDir) {
continue
}
// If a new directory was created, add watches for it.
if isDir && ev.Has(fsnotify.Create) {
w.addWatchesRecursive(ev.Name)
}
var op Op
switch {
case ev.Has(fsnotify.Create):
op = OpCreate
case ev.Has(fsnotify.Write):
op = OpModify
case ev.Has(fsnotify.Remove):
op = OpDelete
case ev.Has(fsnotify.Rename):
op = OpRename
default:
continue
}
// For directories, skip (we care about files).
if isDir && op != OpDelete {
continue
}
pending[rel] = op
if timer == nil {
timer = time.NewTimer(w.debounce)
timerC = timer.C
} else {
timer.Reset(w.debounce)
}
case err, ok := <-w.fsw.Errors:
if !ok {
flush()
return
}
w.logf("tailsync: watcher error: %v", err)
// On error, trigger a full scan.
select {
case w.events <- []Event{{Path: "", Op: OpModify}}: // empty path signals full rescan needed
case <-ctx.Done():
return
}
case <-timerC:
flush()
timer = nil
timerC = nil
}
}
}
func (w *Watcher) scanLoop(ctx context.Context) {
defer w.wg.Done()
ticker := time.NewTicker(w.scanInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Signal a periodic rescan by sending an empty-path event.
select {
case w.events <- []Event{{Path: "", Op: OpModify}}:
case <-ctx.Done():
return
}
}
}
}