diff --git a/tests/robustness/pathlock/path_lock.go b/tests/robustness/pathlock/path_lock.go new file mode 100644 index 000000000..7bf61b1e5 --- /dev/null +++ b/tests/robustness/pathlock/path_lock.go @@ -0,0 +1,160 @@ +// Package pathlock defines a PathLocker interface and an implementation +// that will synchronize based on filepath. +package pathlock + +import ( + "path/filepath" + "strings" + "sync" + "sync/atomic" +) + +// Locker is an interface for synchronizing on a given filepath. +// A call to Lock a given path will block any asynchronous calls to Lock +// that same path, or any parent or child path in the same sub-tree. +// For example: +// - Lock path /a/b/c +// - Blocks a Lock call for the same path /a/b/c +// - Blocks a Lock call for path /a/b or /a +// - Blocks a Lock call for path /a/b/c/d +// - Allows a Lock call for path /a/b/x +// - Allows a Lock call for path /a/x +type Locker interface { + Lock(path string) (Unlocker, error) +} + +// Unlocker unlocks from a previous invocation of Lock(). +type Unlocker interface { + Unlock() +} + +var _ Locker = (*pathLock)(nil) + +// pathLock is a path-based mutex mechanism that allows for synchronization +// along subpaths. A call to Lock will block as long as the requested path +// is equal to, or otherwise in the path of (e.g. parent/child) another path +// that has already been Locked. The thread will be blocked until the holder +// of the lock calls Unlock. +type pathLock struct { + mu sync.Mutex + lockedPaths map[string]chan struct{} +} + +// NewLocker returns a Locker. +func NewLocker() Locker { + return &pathLock{ + lockedPaths: make(map[string]chan struct{}), + } +} + +type lock struct { + pl *pathLock + path string +} + +func (l *lock) Unlock() { + l.pl.unlock(l.path) +} + +// busyCounter is for unit testing, to determine whether a Lock has been +// called and blocked. +var busyCounter uint64 + +// Lock will lock the given path, preventing concurrent calls to Lock +// for that path, or any parent/child path, until Unlock has been called. +// Any concurrent Lock calls will block until that path is available. +func (pl *pathLock) Lock(path string) (Unlocker, error) { + absPath, err := filepath.Abs(path) + if err != nil { + return nil, err + } + + for { + ch, err := pl.tryToLockPath(absPath) + if err != nil { + return nil, err + } + + if ch == nil { + break + } + + atomic.AddUint64(&busyCounter, 1) + + <-ch + } + + return &lock{ + pl: pl, + path: absPath, + }, nil +} + +// tryToLockPath is a helper for locking a given path/subpath. +// It locks the common mutex while accessing the internal map of locked +// paths. Each element in the list of locked paths is tested for whether +// or not it is within the same subtree as the requested path to lock. +// +// If none of the already-reserved paths coincide with this one, this +// goroutine can safely lock this path. To do so, it creates a +// new map entry whose key is the locked path, and whose value is +// a channel that other goroutines can wait on, should there be +// a collision. +// +// If this goroutine DOES find a conflicting path, that path's +// channel is returned. The caller can wait on that channel. After +// the channel is closed, the caller should try again by calling +// `tryToLockPath` until no channel is returned (indicating the lock +// has been claimed). +func (pl *pathLock) tryToLockPath(path string) (chan struct{}, error) { + pl.mu.Lock() + defer pl.mu.Unlock() + + for lockedPath, ch := range pl.lockedPaths { + var ( + pathInLockedPath, lockedPathInPath bool + err error + ) + + if pathInLockedPath, err = isInPath(path, lockedPath); err == nil { + lockedPathInPath, err = isInPath(lockedPath, path) + } + + if err != nil { + return nil, err + } + + if pathInLockedPath || lockedPathInPath { + return ch, nil + } + } + + pl.lockedPaths[path] = make(chan struct{}) + + return nil, nil +} + +// unlock will unlock the given path. It is assumed that Lock +// has already been called, and that unlock will be called once +// and only once with the exact path provided to the Lock function. +func (pl *pathLock) unlock(path string) { + pl.mu.Lock() + defer pl.mu.Unlock() + + close(pl.lockedPaths[path]) + delete(pl.lockedPaths, path) +} + +// isInPath is a helper to determine whether one path is +// either the same as another, or a child path (recursively) of it. +func isInPath(path1, path2 string) (bool, error) { + relFP, err := filepath.Rel(path2, path1) + if err != nil { + return true, err + } + + // If the relative path contains "..", this function will + // return false, because it is a cousin path. Only children (recursive) + // and the path itself will return true. + return !strings.Contains(relFP, ".."), nil +} diff --git a/tests/robustness/pathlock/path_lock_test.go b/tests/robustness/pathlock/path_lock_test.go new file mode 100644 index 000000000..ca3e62d1d --- /dev/null +++ b/tests/robustness/pathlock/path_lock_test.go @@ -0,0 +1,323 @@ +package pathlock + +import ( + "math/rand" + "os" + "path/filepath" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestPathLockBasic(t *testing.T) { + pl := NewLocker() + + cwd, err := os.Getwd() + if err != nil { + t.Fatalf("Could not get working directory: %v", err) + } + + for ti, tc := range []struct { + name string + path1 string + path2 string + }{ + { + name: "(Abs) Blocks a Lock call for the same path /a/b/c", + path1: "/a/b/c", + path2: "/a/b/c", + }, + { + name: "(Abs) Blocks a Lock call for path /a/b/c/d", + path1: "/a/b/c", + path2: "/a/b/c/d", + }, + { + name: "(Abs) Blocks a Lock call for path /a/b", + path1: "/a/b/c", + path2: "/a/b", + }, + { + name: "(Abs) Blocks a Lock call for path /a", + path1: "/a/b/c", + path2: "/a", + }, + { + name: "(Rel) Blocks a Lock call for the same path a/b/c", + path1: "a/b/c", + path2: "a/b/c", + }, + { + name: "(Rel) Blocks a Lock call for path a/b/c/d", + path1: "a/b/c", + path2: "a/b/c/d", + }, + { + name: "(Rel) Blocks a Lock call for path a/b", + path1: "a/b/c", + path2: "a/b", + }, + { + name: "(Rel) Blocks a Lock call for path a", + path1: "a/b/c", + path2: "a", + }, + { + name: "(Mix Abs/Rel) Blocks a Lock call for the same path a/b/c", + path1: filepath.Join(cwd, "a/b/c"), + path2: "a/b/c", + }, + { + name: "(Mix Abs/Rel) Blocks a Lock call for path a/b/c/d", + path1: filepath.Join(cwd, "a/b/c"), + path2: "a/b/c/d", + }, + { + name: "(Mix Abs/Rel) Blocks a Lock call for path a/b", + path1: filepath.Join(cwd, "a/b/c"), + path2: "a/b", + }, + { + name: "(Mix Abs/Rel) Blocks a Lock call for path a", + path1: filepath.Join(cwd, "a/b/c"), + path2: "a", + }, + { + name: "(Mix Rel/Abs) Blocks a Lock call for the same path a/b/c", + path1: "a/b/c", + path2: filepath.Join(cwd, "a/b/c"), + }, + { + name: "(Mix Rel/Abs) Blocks a Lock call for path a/b/c/d", + path1: "a/b/c", + path2: filepath.Join(cwd, "a/b/c/d"), + }, + { + name: "(Mix Rel/Abs) Blocks a Lock call for path a/b", + path1: "a/b/c", + path2: filepath.Join(cwd, "a/b"), + }, + { + name: "(Mix Rel/Abs) Blocks a Lock call for path a", + path1: "a/b/c", + path2: filepath.Join(cwd, "a"), + }, + } { + t.Logf("%v %v (path1: %q, path2: %q)", ti, tc.name, tc.path1, tc.path2) + + lock1, err := pl.Lock(tc.path1) + if err != nil { + t.Fatalf("Unexpected path lock error: %v", err) + } + + currBusyCounter := atomic.LoadUint64(&busyCounter) + + var path2Err error + + wg := new(sync.WaitGroup) + wg.Add(1) + + go func() { + defer wg.Done() + + lock2, err := pl.Lock(tc.path2) + if err != nil { + path2Err = err + return + } + + lock2.Unlock() + }() + + // Wait until the internal atomic counter increments. + // That will only happen once the Lock call to path2 executes + // and blocks on the prior Lock to path1. + for { + if atomic.LoadUint64(&busyCounter) > currBusyCounter { + break + } + + time.Sleep(1 * time.Millisecond) + } + + lock1.Unlock() + + // Wait for the goroutine to return + wg.Wait() + + if path2Err != nil { + t.Fatalf("Error in second lock path: %v", path2Err) + } + } +} + +func TestPathLockWithoutBlock(t *testing.T) { + pl := NewLocker() + + cwd, err := os.Getwd() + if err != nil { + t.Fatalf("Could not get working directory: %v", err) + } + + for ti, tc := range []struct { + name string + path1 string + path2 string + }{ + { + name: "(Abs) Allows a Lock call for path /a/b/x", + path1: "/a/b/c", + path2: "/a/b/x", + }, + { + name: "(Abs) Allows a Lock call for path /a/x", + path1: "/a/b/c", + path2: "/a/x", + }, + { + name: "(Rel) Allows a Lock call for path a/b/x", + path1: "a/b/c", + path2: "a/b/x", + }, + { + name: "(Rel) Allows a Lock call for path a/x", + path1: "a/b/c", + path2: "a/x", + }, + { + name: "(Mix Abs/Rel) Allows a Lock call for path a/b/x", + path1: filepath.Join(cwd, "a/b/c"), + path2: "a/b/x", + }, + { + name: "(Mix Abs/Rel) Allows a Lock call for path a/x", + path1: filepath.Join(cwd, "a/b/c"), + path2: "a/x", + }, + { + name: "(Mix Rel/Abs) Allows a Lock call for path a/b/x", + path1: "a/b/c", + path2: filepath.Join(cwd, "a/b/x"), + }, + { + name: "(Mix Rel/Abs) Allows a Lock call for path a/x", + path1: "a/b/c", + path2: filepath.Join(cwd, "a/x"), + }, + } { + t.Logf("%v %v (path1: %q, path2: %q)", ti, tc.name, tc.path1, tc.path2) + + goroutineDoneWg := new(sync.WaitGroup) + goroutineDoneWg.Add(1) + + goroutineLockedWg := new(sync.WaitGroup) + goroutineLockedWg.Add(1) + + trigger := false + + triggerFalseCh := make(chan struct{}) + + var path2Err error + + go func() { + defer goroutineDoneWg.Done() + + lock2, err := pl.Lock(tc.path2) + if err != nil { + path2Err = err + + goroutineLockedWg.Done() + + return + } + + trigger = true + + goroutineLockedWg.Done() + + time.Sleep(10 * time.Millisecond) + + trigger = false + + close(triggerFalseCh) + + lock2.Unlock() + }() + + // Wait for the goroutine to lock + goroutineLockedWg.Wait() + + if path2Err != nil { + t.Fatalf("Error in second lock path: %v", path2Err) + } + + // This should not block; the paths should not interfere + lock1, err := pl.Lock(tc.path1) + if err != nil { + t.Fatalf("Unexpected path lock error: %v", err) + } + + if !trigger { + t.Fatalf("Lock blocked") + } + + lock1.Unlock() + + <-triggerFalseCh + + if trigger { + t.Fatalf("Trigger should have been set false") + } + + // Ensure the goroutine returns + goroutineDoneWg.Wait() + } +} + +func TestPathLockRace(t *testing.T) { + pl := NewLocker() + + counter := 0 + hitError := false + + wg := new(sync.WaitGroup) + + numGoroutines := 100 + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + // Pick from three different path values that should all be + // covered by the same lock. + path := "/some/path/a/b/c" + for i := 0; i < rand.Intn(3); i++ { + path = filepath.Dir(path) + } + + lock, err := pl.Lock(path) + if err != nil { + t.Logf("Unexpected path lock error: %v", err) + + hitError = true + + return + } + + counter++ + lock.Unlock() + }() + } + + wg.Wait() + + if hitError { + t.Fatal("Hit unexpected error locking paths") + } + + if counter != numGoroutines { + t.Fatalf("counter %v != numgoroutines %v", counter, numGoroutines) + } +}