added utility functions to fetch all matching list results without a callback until a self-consistent snapshot has been reached

This commit is contained in:
Jarek Kowalski
2018-07-20 17:28:07 -08:00
parent 306ab5888d
commit bc835955d5
4 changed files with 128 additions and 7 deletions

View File

@@ -13,10 +13,11 @@
// Fault describes the behavior of a single fault.
type Fault struct {
Repeat int // how many times to repeat this fault
Sleep time.Duration // sleep before returning
WaitFor chan struct{} // waits until the given channel is closed before returning
Err error // error to return (can be nil in combination with Sleep and WaitFor)
Repeat int // how many times to repeat this fault
Sleep time.Duration // sleep before returning
ErrCallback func() error
WaitFor chan struct{} // waits until the given channel is closed before returning
Err error // error to return (can be nil in combination with Sleep and WaitFor)
}
// FaultyStorage implements fault injection for Storage.
@@ -102,6 +103,11 @@ func (s *FaultyStorage) getNextFault(method string, args ...interface{}) error {
if f.Sleep > 0 {
log.Debugf("sleeping for %v in %v %v", f.Sleep, method, args)
}
if f.ErrCallback != nil {
err := f.ErrCallback()
log.Debugf("returning %v for %v %v", err, method, args)
return err
}
log.Debugf("returning %v for %v %v", f.Err, method, args)
return f.Err
}

View File

@@ -66,7 +66,7 @@ func (s *mapStorage) DeleteBlock(ctx context.Context, id string) error {
func (s *mapStorage) ListBlocks(ctx context.Context, prefix string, callback func(storage.BlockMetadata) error) error {
s.mutex.RLock()
defer s.mutex.RUnlock()
s.mutex.RUnlock()
keys := []string{}
for k := range s.data {
@@ -78,11 +78,17 @@ func (s *mapStorage) ListBlocks(ctx context.Context, prefix string, callback fun
sort.Strings(keys)
for _, k := range keys {
v := s.data[k]
s.mutex.RLock()
v, ok := s.data[k]
ts := s.keyTime[k]
s.mutex.RUnlock()
if !ok {
continue
}
if err := callback(storage.BlockMetadata{
BlockID: k,
Length: int64(len(v)),
Timestamp: s.keyTime[k],
Timestamp: ts,
}); err != nil {
return err
}

View File

@@ -3,6 +3,7 @@
import (
"context"
"errors"
"fmt"
"time"
)
@@ -54,3 +55,54 @@ type BlockMetadata struct {
// ErrBlockNotFound is returned when a block cannot be found in storage.
var ErrBlockNotFound = errors.New("block not found")
// ListAllBlocks returns BlockMetadata for all blocks in a given storage that have the provided name prefix.
func ListAllBlocks(ctx context.Context, st Storage, prefix string) ([]BlockMetadata, error) {
var result []BlockMetadata
err := st.ListBlocks(ctx, prefix, func(bm BlockMetadata) error {
result = append(result, bm)
return nil
})
return result, err
}
// ListAllBlocksConsistent lists all blocks with given name prefix in the provided storage until the results are
// consistent. The results are consistent if the list result fetched twice is identical. This guarantees that while
// the first scan was in progress, no new block was added or removed.
// maxAttempts specifies maximum number of list attempts (must be >= 2)
func ListAllBlocksConsistent(ctx context.Context, st Storage, prefix string, maxAttempts int) ([]BlockMetadata, error) {
var previous []BlockMetadata
for i := 0; i < maxAttempts; i++ {
result, err := ListAllBlocks(ctx, st, prefix)
if err != nil {
return nil, err
}
if i > 0 && sameBlocks(result, previous) {
return result, nil
}
previous = result
}
return nil, fmt.Errorf("unable to achieve consistent snapshot despite %v attempts", maxAttempts)
}
// sameBlocks returns true if b1 & b2 contain the same blocks (ignoring order).
func sameBlocks(b1, b2 []BlockMetadata) bool {
if len(b1) != len(b2) {
return false
}
m := map[string]BlockMetadata{}
for _, b := range b1 {
m[b.BlockID] = b
}
for _, b := range b2 {
if m[b.BlockID] != b {
return false
}
}
return true
}

57
storage/storage_test.go Normal file
View File

@@ -0,0 +1,57 @@
package storage_test
import (
"context"
"testing"
"time"
"github.com/kopia/kopia/internal/storagetesting"
"github.com/kopia/kopia/storage"
)
func TestListAllBlocksConsistent(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
st := storagetesting.NewMapStorage(data, nil, time.Now)
st.PutBlock(ctx, "foo1", []byte{1, 2, 3})
st.PutBlock(ctx, "foo2", []byte{1, 2, 3})
st.PutBlock(ctx, "foo3", []byte{1, 2, 3})
// set up faulty storage that will add a block while a scan is in progress.
f := &storagetesting.FaultyStorage{
Base: st,
Faults: map[string][]*storagetesting.Fault{
"ListBlocksItem": {
{ErrCallback: func() error {
st.PutBlock(ctx, "foo0", []byte{1, 2, 3})
return nil
}},
},
},
}
r, err := storage.ListAllBlocksConsistent(ctx, f, "foo", 3)
if err != nil {
t.Fatalf("error: %v", err)
}
// make sure we get the list with 4 items, not 3.
if got, want := len(r), 4; got != want {
t.Errorf("unexpected list result count: %v, want %v", got, want)
}
}
func TestListAllBlocksConsistentEmpty(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
st := storagetesting.NewMapStorage(data, nil, time.Now)
r, err := storage.ListAllBlocksConsistent(ctx, st, "foo", 3)
if err != nil {
t.Fatalf("error: %v", err)
}
if got, want := len(r), 0; got != want {
t.Errorf("unexpected list result count: %v, want %v", got, want)
}
}