mirror of
https://github.com/tailscale/tailscale.git
synced 2026-04-04 22:53:38 -04:00
307 lines
6.1 KiB
Go
307 lines
6.1 KiB
Go
// Copyright (c) Tailscale Inc & contributors
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
// Package watcher provides filesystem change detection using fsnotify
|
|
// with periodic full-scan fallback for the tailsync subsystem.
|
|
package watcher
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/fsnotify/fsnotify"
|
|
"tailscale.com/tailsync/tailsyncimpl/ignore"
|
|
"tailscale.com/types/logger"
|
|
)
|
|
|
|
// Event represents a filesystem change event.
|
|
type Event struct {
|
|
// Path is the relative path within the watched root.
|
|
Path string
|
|
|
|
// Op is the type of change.
|
|
Op Op
|
|
}
|
|
|
|
// Op describes the type of filesystem operation.
|
|
type Op uint8
|
|
|
|
const (
|
|
OpCreate Op = iota
|
|
OpModify
|
|
OpDelete
|
|
OpRename
|
|
)
|
|
|
|
// Watcher watches a directory tree for changes and emits events.
|
|
type Watcher struct {
|
|
logf logger.Logf
|
|
root string
|
|
matcher *ignore.Matcher
|
|
debounce time.Duration
|
|
scanInterval time.Duration
|
|
|
|
fsw *fsnotify.Watcher
|
|
events chan []Event
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// Config configures a Watcher.
|
|
type Config struct {
|
|
// Root is the absolute path to watch.
|
|
Root string
|
|
|
|
// Matcher determines which paths to ignore.
|
|
Matcher *ignore.Matcher
|
|
|
|
// Logf is the logging function.
|
|
Logf logger.Logf
|
|
|
|
// Debounce is how long to wait after the last event before emitting.
|
|
// Defaults to 200ms.
|
|
Debounce time.Duration
|
|
|
|
// ScanInterval is how often to do a full reconciliation scan.
|
|
// Defaults to 60s.
|
|
ScanInterval time.Duration
|
|
}
|
|
|
|
// New creates and starts a new Watcher. Events are delivered on the
|
|
// channel returned by Events(). Call Close() to stop watching.
|
|
func New(cfg Config) (*Watcher, error) {
|
|
if cfg.Debounce == 0 {
|
|
cfg.Debounce = 200 * time.Millisecond
|
|
}
|
|
if cfg.ScanInterval == 0 {
|
|
cfg.ScanInterval = 60 * time.Second
|
|
}
|
|
if cfg.Logf == nil {
|
|
cfg.Logf = logger.Discard
|
|
}
|
|
|
|
fsw, err := fsnotify.NewWatcher()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
w := &Watcher{
|
|
logf: cfg.Logf,
|
|
root: cfg.Root,
|
|
matcher: cfg.Matcher,
|
|
debounce: cfg.Debounce,
|
|
scanInterval: cfg.ScanInterval,
|
|
fsw: fsw,
|
|
events: make(chan []Event, 64),
|
|
cancel: cancel,
|
|
}
|
|
|
|
if err := w.addWatchesRecursive(cfg.Root); err != nil {
|
|
fsw.Close()
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
|
|
w.wg.Add(2)
|
|
go w.eventLoop(ctx)
|
|
go w.scanLoop(ctx)
|
|
|
|
return w, nil
|
|
}
|
|
|
|
// Events returns the channel on which batched events are delivered.
|
|
func (w *Watcher) Events() <-chan []Event {
|
|
return w.events
|
|
}
|
|
|
|
// Close stops the watcher and releases resources.
|
|
func (w *Watcher) Close() error {
|
|
w.cancel()
|
|
err := w.fsw.Close()
|
|
w.wg.Wait()
|
|
close(w.events)
|
|
return err
|
|
}
|
|
|
|
// ScanFull performs a full scan of the watched root and returns all
|
|
// file paths found (relative to root). This is used for initial index
|
|
// building and periodic reconciliation.
|
|
func (w *Watcher) ScanFull() ([]string, error) {
|
|
return w.scanDir(w.root)
|
|
}
|
|
|
|
func (w *Watcher) scanDir(root string) ([]string, error) {
|
|
var paths []string
|
|
err := filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error {
|
|
if err != nil {
|
|
return nil // skip errors, log them
|
|
}
|
|
rel, err := filepath.Rel(root, path)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if rel == "." {
|
|
return nil
|
|
}
|
|
if w.matcher.Match(rel, d.IsDir()) {
|
|
if d.IsDir() {
|
|
return filepath.SkipDir
|
|
}
|
|
return nil
|
|
}
|
|
if !d.IsDir() {
|
|
paths = append(paths, rel)
|
|
}
|
|
return nil
|
|
})
|
|
return paths, err
|
|
}
|
|
|
|
func (w *Watcher) addWatchesRecursive(root string) error {
|
|
return filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error {
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if !d.IsDir() {
|
|
return nil
|
|
}
|
|
rel, err := filepath.Rel(w.root, path)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if rel != "." && w.matcher.Match(rel, true) {
|
|
return filepath.SkipDir
|
|
}
|
|
if err := w.fsw.Add(path); err != nil {
|
|
w.logf("tailsync: watcher: failed to watch %s: %v", path, err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (w *Watcher) eventLoop(ctx context.Context) {
|
|
defer w.wg.Done()
|
|
|
|
var (
|
|
pending = make(map[string]Op)
|
|
timer *time.Timer
|
|
timerC <-chan time.Time
|
|
)
|
|
|
|
flush := func() {
|
|
if len(pending) == 0 {
|
|
return
|
|
}
|
|
batch := make([]Event, 0, len(pending))
|
|
for path, op := range pending {
|
|
batch = append(batch, Event{Path: path, Op: op})
|
|
}
|
|
pending = make(map[string]Op)
|
|
select {
|
|
case w.events <- batch:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
flush()
|
|
return
|
|
case ev, ok := <-w.fsw.Events:
|
|
if !ok {
|
|
flush()
|
|
return
|
|
}
|
|
rel, err := filepath.Rel(w.root, ev.Name)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
info, statErr := os.Lstat(ev.Name)
|
|
isDir := statErr == nil && info.IsDir()
|
|
|
|
if w.matcher.Match(rel, isDir) {
|
|
continue
|
|
}
|
|
|
|
// If a new directory was created, add watches for it.
|
|
if isDir && ev.Has(fsnotify.Create) {
|
|
w.addWatchesRecursive(ev.Name)
|
|
}
|
|
|
|
var op Op
|
|
switch {
|
|
case ev.Has(fsnotify.Create):
|
|
op = OpCreate
|
|
case ev.Has(fsnotify.Write):
|
|
op = OpModify
|
|
case ev.Has(fsnotify.Remove):
|
|
op = OpDelete
|
|
case ev.Has(fsnotify.Rename):
|
|
op = OpRename
|
|
default:
|
|
continue
|
|
}
|
|
|
|
// For directories, skip (we care about files).
|
|
if isDir && op != OpDelete {
|
|
continue
|
|
}
|
|
|
|
pending[rel] = op
|
|
|
|
if timer == nil {
|
|
timer = time.NewTimer(w.debounce)
|
|
timerC = timer.C
|
|
} else {
|
|
timer.Reset(w.debounce)
|
|
}
|
|
|
|
case err, ok := <-w.fsw.Errors:
|
|
if !ok {
|
|
flush()
|
|
return
|
|
}
|
|
w.logf("tailsync: watcher error: %v", err)
|
|
// On error, trigger a full scan.
|
|
select {
|
|
case w.events <- []Event{{Path: "", Op: OpModify}}: // empty path signals full rescan needed
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
case <-timerC:
|
|
flush()
|
|
timer = nil
|
|
timerC = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *Watcher) scanLoop(ctx context.Context) {
|
|
defer w.wg.Done()
|
|
|
|
ticker := time.NewTicker(w.scanInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
// Signal a periodic rescan by sending an empty-path event.
|
|
select {
|
|
case w.events <- []Event{{Path: "", Op: OpModify}}:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|