From bc835955d5eb19659d2a6d5e44bfc45339c132d1 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Fri, 20 Jul 2018 17:28:07 -0800 Subject: [PATCH] added utility functions to fetch all matching list results without a callback until a self-consistent snapshot has been reached --- internal/storagetesting/faulty.go | 14 +++++--- internal/storagetesting/map.go | 12 +++++-- storage/storage.go | 52 ++++++++++++++++++++++++++++ storage/storage_test.go | 57 +++++++++++++++++++++++++++++++ 4 files changed, 128 insertions(+), 7 deletions(-) create mode 100644 storage/storage_test.go diff --git a/internal/storagetesting/faulty.go b/internal/storagetesting/faulty.go index 16a44f68b..c8fc47bd4 100644 --- a/internal/storagetesting/faulty.go +++ b/internal/storagetesting/faulty.go @@ -13,10 +13,11 @@ // Fault describes the behavior of a single fault. type Fault struct { - Repeat int // how many times to repeat this fault - Sleep time.Duration // sleep before returning - WaitFor chan struct{} // waits until the given channel is closed before returning - Err error // error to return (can be nil in combination with Sleep and WaitFor) + Repeat int // how many times to repeat this fault + Sleep time.Duration // sleep before returning + ErrCallback func() error + WaitFor chan struct{} // waits until the given channel is closed before returning + Err error // error to return (can be nil in combination with Sleep and WaitFor) } // FaultyStorage implements fault injection for Storage. @@ -102,6 +103,11 @@ func (s *FaultyStorage) getNextFault(method string, args ...interface{}) error { if f.Sleep > 0 { log.Debugf("sleeping for %v in %v %v", f.Sleep, method, args) } + if f.ErrCallback != nil { + err := f.ErrCallback() + log.Debugf("returning %v for %v %v", err, method, args) + return err + } log.Debugf("returning %v for %v %v", f.Err, method, args) return f.Err } diff --git a/internal/storagetesting/map.go b/internal/storagetesting/map.go index e3bda18a7..464a3705e 100644 --- a/internal/storagetesting/map.go +++ b/internal/storagetesting/map.go @@ -66,7 +66,7 @@ func (s *mapStorage) DeleteBlock(ctx context.Context, id string) error { func (s *mapStorage) ListBlocks(ctx context.Context, prefix string, callback func(storage.BlockMetadata) error) error { s.mutex.RLock() - defer s.mutex.RUnlock() + s.mutex.RUnlock() keys := []string{} for k := range s.data { @@ -78,11 +78,17 @@ func (s *mapStorage) ListBlocks(ctx context.Context, prefix string, callback fun sort.Strings(keys) for _, k := range keys { - v := s.data[k] + s.mutex.RLock() + v, ok := s.data[k] + ts := s.keyTime[k] + s.mutex.RUnlock() + if !ok { + continue + } if err := callback(storage.BlockMetadata{ BlockID: k, Length: int64(len(v)), - Timestamp: s.keyTime[k], + Timestamp: ts, }); err != nil { return err } diff --git a/storage/storage.go b/storage/storage.go index 0725466cd..9934c6aed 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -3,6 +3,7 @@ import ( "context" "errors" + "fmt" "time" ) @@ -54,3 +55,54 @@ type BlockMetadata struct { // ErrBlockNotFound is returned when a block cannot be found in storage. var ErrBlockNotFound = errors.New("block not found") + +// ListAllBlocks returns BlockMetadata for all blocks in a given storage that have the provided name prefix. +func ListAllBlocks(ctx context.Context, st Storage, prefix string) ([]BlockMetadata, error) { + var result []BlockMetadata + + err := st.ListBlocks(ctx, prefix, func(bm BlockMetadata) error { + result = append(result, bm) + return nil + }) + + return result, err +} + +// ListAllBlocksConsistent lists all blocks with given name prefix in the provided storage until the results are +// consistent. The results are consistent if the list result fetched twice is identical. This guarantees that while +// the first scan was in progress, no new block was added or removed. +// maxAttempts specifies maximum number of list attempts (must be >= 2) +func ListAllBlocksConsistent(ctx context.Context, st Storage, prefix string, maxAttempts int) ([]BlockMetadata, error) { + var previous []BlockMetadata + + for i := 0; i < maxAttempts; i++ { + result, err := ListAllBlocks(ctx, st, prefix) + if err != nil { + return nil, err + } + if i > 0 && sameBlocks(result, previous) { + return result, nil + } + + previous = result + } + + return nil, fmt.Errorf("unable to achieve consistent snapshot despite %v attempts", maxAttempts) +} + +// sameBlocks returns true if b1 & b2 contain the same blocks (ignoring order). +func sameBlocks(b1, b2 []BlockMetadata) bool { + if len(b1) != len(b2) { + return false + } + m := map[string]BlockMetadata{} + for _, b := range b1 { + m[b.BlockID] = b + } + for _, b := range b2 { + if m[b.BlockID] != b { + return false + } + } + return true +} diff --git a/storage/storage_test.go b/storage/storage_test.go new file mode 100644 index 000000000..306200f50 --- /dev/null +++ b/storage/storage_test.go @@ -0,0 +1,57 @@ +package storage_test + +import ( + "context" + "testing" + "time" + + "github.com/kopia/kopia/internal/storagetesting" + "github.com/kopia/kopia/storage" +) + +func TestListAllBlocksConsistent(t *testing.T) { + ctx := context.Background() + data := map[string][]byte{} + st := storagetesting.NewMapStorage(data, nil, time.Now) + st.PutBlock(ctx, "foo1", []byte{1, 2, 3}) + st.PutBlock(ctx, "foo2", []byte{1, 2, 3}) + st.PutBlock(ctx, "foo3", []byte{1, 2, 3}) + + // set up faulty storage that will add a block while a scan is in progress. + f := &storagetesting.FaultyStorage{ + Base: st, + Faults: map[string][]*storagetesting.Fault{ + "ListBlocksItem": { + {ErrCallback: func() error { + st.PutBlock(ctx, "foo0", []byte{1, 2, 3}) + return nil + }}, + }, + }, + } + + r, err := storage.ListAllBlocksConsistent(ctx, f, "foo", 3) + if err != nil { + t.Fatalf("error: %v", err) + } + + // make sure we get the list with 4 items, not 3. + if got, want := len(r), 4; got != want { + t.Errorf("unexpected list result count: %v, want %v", got, want) + } +} + +func TestListAllBlocksConsistentEmpty(t *testing.T) { + ctx := context.Background() + data := map[string][]byte{} + st := storagetesting.NewMapStorage(data, nil, time.Now) + + r, err := storage.ListAllBlocksConsistent(ctx, st, "foo", 3) + if err != nil { + t.Fatalf("error: %v", err) + } + + if got, want := len(r), 0; got != want { + t.Errorf("unexpected list result count: %v, want %v", got, want) + } +}