mirror of
https://github.com/kopia/kopia.git
synced 2026-02-18 15:05:46 -05:00
Added 'kopia repository validate-provider` (#1205)
* cli: added 'repository validate-provider' which runs a set of tests against blob storage provider to validate it This implements a provider tests which exercises subtle behaviors which are not always correctly implemented by providers claiming compatibility with S3, for example. The test checks: - not found behavior - prefix scans - timestamps - write atomicity * retry: improved error message on failure * rclone: fixed stats reporting and awaiting for completion * webdav: prevent panic when attempting to mkdir with empty name * testing: run providervalidation.ValidateProvider as part of regular provider tests * cli: print a recommendation to validate provider after repository creation
This commit is contained in:
@@ -28,6 +28,7 @@
|
||||
defaultColor = color.New()
|
||||
warningColor = color.New(color.FgYellow)
|
||||
errorColor = color.New(color.FgHiRed)
|
||||
noteColor = color.New(color.FgHiCyan)
|
||||
)
|
||||
|
||||
type textOutput struct {
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
package cli
|
||||
|
||||
type commandRepository struct {
|
||||
connect commandRepositoryConnect
|
||||
create commandRepositoryCreate
|
||||
disconnect commandRepositoryDisconnect
|
||||
repair commandRepositoryRepair
|
||||
setClient commandRepositorySetClient
|
||||
setParameters commandRepositorySetParameters
|
||||
changePassword commandRepositoryChangePassword
|
||||
status commandRepositoryStatus
|
||||
syncTo commandRepositorySyncTo
|
||||
connect commandRepositoryConnect
|
||||
create commandRepositoryCreate
|
||||
disconnect commandRepositoryDisconnect
|
||||
repair commandRepositoryRepair
|
||||
setClient commandRepositorySetClient
|
||||
setParameters commandRepositorySetParameters
|
||||
changePassword commandRepositoryChangePassword
|
||||
status commandRepositoryStatus
|
||||
syncTo commandRepositorySyncTo
|
||||
validateProvider commandRepositoryValidateProvider
|
||||
}
|
||||
|
||||
func (c *commandRepository) setup(svc advancedAppServices, parent commandParent) {
|
||||
@@ -24,4 +25,5 @@ func (c *commandRepository) setup(svc advancedAppServices, parent commandParent)
|
||||
c.status.setup(svc, cmd)
|
||||
c.syncTo.setup(svc, cmd)
|
||||
c.changePassword.setup(svc, cmd)
|
||||
c.validateProvider.setup(svc, cmd)
|
||||
}
|
||||
|
||||
@@ -17,6 +17,12 @@
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
)
|
||||
|
||||
const runValidationNote = `NOTE: To validate that your provider is compatible with Kopia, please run:
|
||||
|
||||
$ kopia repository validate-provider
|
||||
|
||||
`
|
||||
|
||||
type commandRepositoryCreate struct {
|
||||
createBlockHashFormat string
|
||||
createBlockEncryptionFormat string
|
||||
@@ -137,7 +143,13 @@ func (c *commandRepositoryCreate) runCreateCommandWithStorage(ctx context.Contex
|
||||
return errors.Wrap(err, "unable to connect to repository")
|
||||
}
|
||||
|
||||
return c.populateRepository(ctx, pass)
|
||||
if err := c.populateRepository(ctx, pass); err != nil {
|
||||
return errors.Wrap(err, "error populating repository")
|
||||
}
|
||||
|
||||
noteColor.Fprintf(c.out.stdout(), runValidationNote) // nolint:errcheck
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *commandRepositoryCreate) populateRepository(ctx context.Context, password string) error {
|
||||
|
||||
35
cli/command_repository_validate_provider.go
Normal file
35
cli/command_repository_validate_provider.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/providervalidation"
|
||||
"github.com/kopia/kopia/repo"
|
||||
)
|
||||
|
||||
type commandRepositoryValidateProvider struct {
|
||||
opt providervalidation.Options
|
||||
out textOutput
|
||||
}
|
||||
|
||||
func (c *commandRepositoryValidateProvider) setup(svc advancedAppServices, parent commandParent) {
|
||||
cmd := parent.Command("validate-provider", "Validates that a repository provider is compatible with Kopia")
|
||||
|
||||
c.opt = providervalidation.DefaultOptions
|
||||
|
||||
cmd.Flag("concurrency-test-duration", "Duration of concurrency test").DurationVar(&c.opt.ConcurrencyTestDuration)
|
||||
cmd.Flag("put-blob-workers", "Number of PutBlob workers").IntVar(&c.opt.NumPutBlobWorkers)
|
||||
cmd.Flag("get-blob-workers", "Number of GetBlob workers").IntVar(&c.opt.NumGetBlobWorkers)
|
||||
cmd.Flag("get-metadata-workers", "Number of GetMetadata workers").IntVar(&c.opt.NumGetMetadataWorkers)
|
||||
c.out.setup(svc)
|
||||
|
||||
cmd.Action(c.out.svc.directRepositoryWriteAction(c.run))
|
||||
}
|
||||
|
||||
func (c *commandRepositoryValidateProvider) run(ctx context.Context, dr repo.DirectRepositoryWriter) error {
|
||||
return errors.Wrap(
|
||||
providervalidation.ValidateProvider(ctx, dr.BlobStorage(), c.opt),
|
||||
"provider validation error")
|
||||
}
|
||||
@@ -12,6 +12,7 @@
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/providervalidation"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
)
|
||||
|
||||
@@ -168,3 +169,15 @@ func AssertConnectionInfoRoundTrips(ctx context.Context, t *testing.T, s blob.St
|
||||
|
||||
require.NoError(t, s2.Close(ctx))
|
||||
}
|
||||
|
||||
// TestValidationOptions is the set of options used when running providing validation from tests.
|
||||
// nolint:gomnd
|
||||
var TestValidationOptions = providervalidation.Options{
|
||||
MaxClockDrift: 3 * time.Minute,
|
||||
ConcurrencyTestDuration: 15 * time.Second,
|
||||
NumPutBlobWorkers: 3,
|
||||
NumGetBlobWorkers: 3,
|
||||
NumGetMetadataWorkers: 3,
|
||||
NumListBlobsWorkers: 3,
|
||||
MaxBlobLength: 10e6,
|
||||
}
|
||||
|
||||
379
internal/providervalidation/providervalidation.go
Normal file
379
internal/providervalidation/providervalidation.go
Normal file
@@ -0,0 +1,379 @@
|
||||
// Package providervalidation implements validation to ensure the blob storage is compatible with Kopia requirements.
|
||||
package providervalidation
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
cryptorand "crypto/rand"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/logging"
|
||||
)
|
||||
|
||||
// Options provides options for provider validation.
|
||||
type Options struct {
|
||||
MaxClockDrift time.Duration
|
||||
ConcurrencyTestDuration time.Duration
|
||||
|
||||
NumPutBlobWorkers int
|
||||
NumGetBlobWorkers int
|
||||
NumGetMetadataWorkers int
|
||||
NumListBlobsWorkers int
|
||||
MaxBlobLength int
|
||||
}
|
||||
|
||||
// DefaultOptions is the default set of options.
|
||||
// nolint:gomnd
|
||||
var DefaultOptions = Options{
|
||||
MaxClockDrift: 3 * time.Minute,
|
||||
ConcurrencyTestDuration: 30 * time.Second,
|
||||
NumPutBlobWorkers: 3,
|
||||
NumGetBlobWorkers: 3,
|
||||
NumGetMetadataWorkers: 3,
|
||||
NumListBlobsWorkers: 3,
|
||||
MaxBlobLength: 10e6,
|
||||
}
|
||||
|
||||
const blobIDLength = 16
|
||||
|
||||
var log = logging.GetContextLoggerFunc("providervalidation")
|
||||
|
||||
// ValidateProvider runs a series of tests against provided storage to validate that
|
||||
// it can be used with Kopia.
|
||||
// nolint:gomnd,funlen,gocyclo
|
||||
func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error {
|
||||
uberPrefix := blob.ID("z" + uuid.NewString())
|
||||
defer cleanupAllBlobs(ctx, st, uberPrefix)
|
||||
|
||||
prefix1 := uberPrefix + "a"
|
||||
prefix2 := uberPrefix + "b"
|
||||
|
||||
log(ctx).Infof("Validating blob list responses")
|
||||
|
||||
if err := verifyBlobCount(ctx, st, uberPrefix, 0); err != nil {
|
||||
return errors.Wrap(err, "invalid blob count")
|
||||
}
|
||||
|
||||
log(ctx).Infof("Validating non-existent blob responses")
|
||||
|
||||
// read non-existent full blob
|
||||
if _, err := st.GetBlob(ctx, prefix1+"1", 0, -1); !errors.Is(err, blob.ErrBlobNotFound) {
|
||||
return errors.Errorf("got unexpected error when reading non-existent blob: %v", err)
|
||||
}
|
||||
|
||||
// read non-existent partial blob
|
||||
if _, err := st.GetBlob(ctx, prefix1+"1", 0, 5); !errors.Is(err, blob.ErrBlobNotFound) {
|
||||
return errors.Errorf("got unexpected error when reading non-existent partial blob: %v", err)
|
||||
}
|
||||
|
||||
// get metadata for non-existent blob
|
||||
if _, err := st.GetMetadata(ctx, prefix1+"1"); !errors.Is(err, blob.ErrBlobNotFound) {
|
||||
return errors.Errorf("got unexpected error when getting metadata for non-existent blob: %v", err)
|
||||
}
|
||||
|
||||
blobData := bytes.Repeat([]byte{1, 2, 3, 4, 5}, 1e6)
|
||||
|
||||
log(ctx).Infof("Writing blob (%v bytes)", len(blobData))
|
||||
|
||||
// write blob
|
||||
if err := st.PutBlob(ctx, prefix1+"1", gather.FromSlice(blobData)); err != nil {
|
||||
return errors.Wrap(err, "error writing blob #1")
|
||||
}
|
||||
|
||||
log(ctx).Infof("Validating list responses...")
|
||||
|
||||
if err := verifyBlobCount(ctx, st, uberPrefix, 1); err != nil {
|
||||
return errors.Wrap(err, "invalid uber blob count")
|
||||
}
|
||||
|
||||
if err := verifyBlobCount(ctx, st, prefix1, 1); err != nil {
|
||||
return errors.Wrap(err, "invalid blob count with prefix 1")
|
||||
}
|
||||
|
||||
if err := verifyBlobCount(ctx, st, prefix2, 0); err != nil {
|
||||
return errors.Wrap(err, "invalid blob count with prefix 2")
|
||||
}
|
||||
|
||||
log(ctx).Infof("Validating partial reads...")
|
||||
|
||||
partialBlobCases := []struct {
|
||||
offset int64
|
||||
length int64
|
||||
}{
|
||||
{0, 10},
|
||||
{1, 10},
|
||||
{2, 1},
|
||||
{5, 0},
|
||||
{int64(len(blobData)) - 5, 5},
|
||||
}
|
||||
|
||||
for _, tc := range partialBlobCases {
|
||||
v, err := st.GetBlob(ctx, prefix1+"1", tc.offset, tc.length)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "got unexpected error when reading partial blob @%v+%v", tc.offset, tc.length)
|
||||
}
|
||||
|
||||
if got, want := v, blobData[tc.offset:tc.offset+tc.length]; !bytes.Equal(got, want) {
|
||||
return errors.Errorf("got unexpected data after reading partial blob @%v+%v: %x, wanted %x", tc.offset, tc.length, got, want)
|
||||
}
|
||||
}
|
||||
|
||||
log(ctx).Infof("Validating full reads...")
|
||||
|
||||
// read full blob
|
||||
v, err := st.GetBlob(ctx, prefix1+"1", 0, -1)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "got unexpected error when reading partial blob")
|
||||
}
|
||||
|
||||
if got, want := v, blobData; !bytes.Equal(got, want) {
|
||||
return errors.Errorf("got unexpected data after reading partial blob: %x, wanted %x", got, want)
|
||||
}
|
||||
|
||||
log(ctx).Infof("Validating metadata...")
|
||||
|
||||
// get metadata for non-existent blob
|
||||
bm, err := st.GetMetadata(ctx, prefix1+"1")
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "got unexpected error when getting metadata for blob")
|
||||
}
|
||||
|
||||
if got, want := bm.Length, int64(len(blobData)); got != want {
|
||||
return errors.Errorf("invalid length returned by GetMetadata(): %v, wanted %v", got, want)
|
||||
}
|
||||
|
||||
now := clock.Now()
|
||||
|
||||
timeDiff := now.Sub(bm.Timestamp)
|
||||
if timeDiff < 0 {
|
||||
timeDiff = -timeDiff
|
||||
}
|
||||
|
||||
if timeDiff > opt.MaxClockDrift {
|
||||
return errors.Errorf(
|
||||
"newly-written blob has a timestamp very different from local clock: %v, expected %v. Max difference allowed is %v",
|
||||
bm.Timestamp,
|
||||
now,
|
||||
opt.MaxClockDrift,
|
||||
)
|
||||
}
|
||||
|
||||
ct := newConcurrencyTest(st, prefix2, opt)
|
||||
log(ctx).Infof("Running concurrency test for %v...", opt.ConcurrencyTestDuration)
|
||||
|
||||
if err := ct.run(ctx); err != nil {
|
||||
return errors.Wrap(err, "error validating concurrency")
|
||||
}
|
||||
|
||||
log(ctx).Infof("All good.")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type concurrencyTest struct {
|
||||
opt Options
|
||||
st blob.Storage
|
||||
prefix blob.ID
|
||||
deadline time.Time
|
||||
|
||||
mu sync.Mutex
|
||||
blobData map[blob.ID][]byte
|
||||
blobIDs []blob.ID
|
||||
blobWritten map[blob.ID]bool
|
||||
}
|
||||
|
||||
func newConcurrencyTest(st blob.Storage, prefix blob.ID, opt Options) *concurrencyTest {
|
||||
return &concurrencyTest{
|
||||
opt: opt,
|
||||
st: st,
|
||||
prefix: prefix,
|
||||
deadline: clock.Now().Add(opt.ConcurrencyTestDuration),
|
||||
|
||||
blobData: make(map[blob.ID][]byte),
|
||||
blobWritten: make(map[blob.ID]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *concurrencyTest) putBlobWorker(ctx context.Context, worker int) func() error {
|
||||
return func() error {
|
||||
for clock.Now().Before(c.deadline) {
|
||||
blobLen := blobIDLength + rand.Intn(c.opt.MaxBlobLength-blobIDLength) //nolint:gosec
|
||||
|
||||
data := make([]byte, blobLen)
|
||||
if _, err := cryptorand.Read(data); err != nil {
|
||||
return errors.Wrap(err, "unable to get randomness")
|
||||
}
|
||||
|
||||
id := c.prefix + blob.ID(fmt.Sprintf("%x", data[0:16]))
|
||||
|
||||
c.mu.Lock()
|
||||
c.blobData[id] = data
|
||||
c.blobIDs = append(c.blobIDs, id)
|
||||
c.mu.Unlock()
|
||||
|
||||
// sleep for a short time so that readers can start getting the blob when it's still
|
||||
// not written.
|
||||
c.randomSleep()
|
||||
|
||||
log(ctx).Debugf("PutBlob worker %v writing %v (%v bytes)", worker, id, len(data))
|
||||
|
||||
if err := c.st.PutBlob(ctx, id, gather.FromSlice(data)); err != nil {
|
||||
return errors.Wrap(err, "error writing blob")
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.blobWritten[id] = true
|
||||
c.mu.Unlock()
|
||||
|
||||
log(ctx).Debugf("PutBlob worker %v finished writing %v (%v bytes)", worker, id, len(data))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *concurrencyTest) randomSleep() {
|
||||
time.Sleep(time.Duration(rand.Intn(int(100 * time.Millisecond)))) //nolint:gosec,gomnd
|
||||
}
|
||||
|
||||
func (c *concurrencyTest) pickBlob() (blob.ID, []byte, bool) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if len(c.blobIDs) == 0 {
|
||||
return "", nil, false
|
||||
}
|
||||
|
||||
id := c.blobIDs[rand.Intn(len(c.blobIDs))] // nolint:gosec
|
||||
|
||||
return id, c.blobData[id], c.blobWritten[id]
|
||||
}
|
||||
|
||||
func (c *concurrencyTest) getBlobWorker(ctx context.Context, worker int) func() error {
|
||||
return func() error {
|
||||
for clock.Now().Before(c.deadline) {
|
||||
c.randomSleep()
|
||||
|
||||
blobID, blobData, fullyWritten := c.pickBlob()
|
||||
if blobID == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
log(ctx).Debugf("GetBlob worker %v reading %v", worker, blobID)
|
||||
|
||||
v, err := c.st.GetBlob(ctx, blobID, 0, -1)
|
||||
if err != nil {
|
||||
if !errors.Is(err, blob.ErrBlobNotFound) || fullyWritten {
|
||||
return errors.Wrapf(err, "unexpected error when reading %v", blobID)
|
||||
}
|
||||
|
||||
log(ctx).Debugf("GetBlob worker %v - valid error when reading %v", worker, blobID)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if !bytes.Equal(v, blobData) {
|
||||
return errors.Wrapf(err, "invalid data read for %v", blobID)
|
||||
}
|
||||
|
||||
log(ctx).Debugf("GetBlob worker %v - valid data read %v", worker, blobID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *concurrencyTest) getMetadataWorker(ctx context.Context, worker int) func() error {
|
||||
return func() error {
|
||||
for clock.Now().Before(c.deadline) {
|
||||
c.randomSleep()
|
||||
|
||||
blobID, blobData, fullyWritten := c.pickBlob()
|
||||
if blobID == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
log(ctx).Debugf("GetMetadata worker %v: %v", worker, blobID)
|
||||
|
||||
bm, err := c.st.GetMetadata(ctx, blobID)
|
||||
if err != nil {
|
||||
if !errors.Is(err, blob.ErrBlobNotFound) || fullyWritten {
|
||||
return errors.Wrapf(err, "unexpected error when reading %v", blobID)
|
||||
}
|
||||
|
||||
log(ctx).Debugf("GetMetadata worker %v - valid error when reading %v", worker, blobID)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if bm.Length != int64(len(blobData)) {
|
||||
return errors.Wrapf(err, "unexpected partial read - invalid length read for %v: %v wanted %v", blobID, bm.Length, len(blobData))
|
||||
}
|
||||
|
||||
log(ctx).Debugf("GetMetadata worker %v - valid data read %v", worker, blobID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *concurrencyTest) listBlobWorker(ctx context.Context, worker int) func() error {
|
||||
return func() error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *concurrencyTest) run(ctx context.Context) error {
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
for worker := 0; worker < c.opt.NumPutBlobWorkers; worker++ {
|
||||
eg.Go(c.putBlobWorker(ctx, worker))
|
||||
}
|
||||
|
||||
for worker := 0; worker < c.opt.NumGetBlobWorkers; worker++ {
|
||||
eg.Go(c.getBlobWorker(ctx, worker))
|
||||
}
|
||||
|
||||
for worker := 0; worker < c.opt.NumGetMetadataWorkers; worker++ {
|
||||
eg.Go(c.getMetadataWorker(ctx, worker))
|
||||
}
|
||||
|
||||
for worker := 0; worker < c.opt.NumListBlobsWorkers; worker++ {
|
||||
eg.Go(c.listBlobWorker(ctx, worker))
|
||||
}
|
||||
|
||||
return errors.Wrap(eg.Wait(), "encountered errors")
|
||||
}
|
||||
|
||||
func cleanupAllBlobs(ctx context.Context, st blob.Storage, prefix blob.ID) {
|
||||
log(ctx).Infof("Cleaning up temporary data...")
|
||||
|
||||
if err := st.ListBlobs(ctx, prefix, func(bm blob.Metadata) error {
|
||||
return errors.Wrapf(st.DeleteBlob(ctx, bm.BlobID), "error deleting blob %v", bm.BlobID)
|
||||
}); err != nil {
|
||||
log(ctx).Debugf("error cleaning up")
|
||||
}
|
||||
}
|
||||
|
||||
func verifyBlobCount(ctx context.Context, st blob.Storage, prefix blob.ID, want int) error {
|
||||
got, err := blob.ListAllBlobs(ctx, st, prefix)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error listing blobs")
|
||||
}
|
||||
|
||||
if len(got) != want {
|
||||
return errors.Errorf("unexpected number of blobs returned for prefix %v: %v, wanted %v", prefix, len(got), want)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
18
internal/providervalidation/providervalidation_test.go
Normal file
18
internal/providervalidation/providervalidation_test.go
Normal file
@@ -0,0 +1,18 @@
|
||||
package providervalidation_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/providervalidation"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
)
|
||||
|
||||
func TestProviderValidation(t *testing.T) {
|
||||
ctx := testlogging.Context(t)
|
||||
st := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, nil)
|
||||
opt := providervalidation.DefaultOptions
|
||||
opt.ConcurrencyTestDuration = 15 * time.Second
|
||||
providervalidation.ValidateProvider(ctx, st, opt)
|
||||
}
|
||||
@@ -53,12 +53,16 @@ func PeriodicallyNoValue(ctx context.Context, interval time.Duration, count int,
|
||||
func internalRetry(ctx context.Context, desc string, attempt AttemptFunc, isRetriableError IsRetriableFunc, initial, max time.Duration, count int, factor float64) (interface{}, error) {
|
||||
sleepAmount := initial
|
||||
|
||||
var lastError error
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
v, err := attempt()
|
||||
if err == nil {
|
||||
return v, nil
|
||||
}
|
||||
|
||||
lastError = err
|
||||
|
||||
if !isRetriableError(err) {
|
||||
return v, err
|
||||
}
|
||||
@@ -72,7 +76,7 @@ func internalRetry(ctx context.Context, desc string, attempt AttemptFunc, isRetr
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.Errorf("unable to complete %v despite %v retries", desc, count)
|
||||
return nil, errors.Errorf("unable to complete %v despite %v retries, last error: %v", desc, count, lastError)
|
||||
}
|
||||
|
||||
// WithExponentialBackoffNoValue is a shorthand for WithExponentialBackoff except the
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/providervalidation"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/internal/testutil"
|
||||
"github.com/kopia/kopia/repo/blob/azure"
|
||||
@@ -119,6 +120,7 @@ func TestAzureStorage(t *testing.T) {
|
||||
|
||||
blobtesting.VerifyStorage(ctx, t, st)
|
||||
blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st)
|
||||
require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions))
|
||||
}
|
||||
|
||||
func TestAzureStorageSASToken(t *testing.T) {
|
||||
@@ -147,6 +149,7 @@ func TestAzureStorageSASToken(t *testing.T) {
|
||||
|
||||
blobtesting.VerifyStorage(ctx, t, st)
|
||||
blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st)
|
||||
require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions))
|
||||
}
|
||||
|
||||
func TestAzureStorageInvalidBlob(t *testing.T) {
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/providervalidation"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/internal/testutil"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
@@ -79,6 +80,7 @@ func TestB2Storage(t *testing.T) {
|
||||
|
||||
blobtesting.VerifyStorage(ctx, t, st)
|
||||
blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st)
|
||||
require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions))
|
||||
}
|
||||
|
||||
func TestB2StorageInvalidBlob(t *testing.T) {
|
||||
|
||||
@@ -6,8 +6,11 @@
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/providervalidation"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/internal/testutil"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
@@ -42,6 +45,7 @@ func TestFileStorage(t *testing.T) {
|
||||
|
||||
blobtesting.VerifyStorage(ctx, t, r)
|
||||
blobtesting.AssertConnectionInfoRoundTrips(ctx, t, r)
|
||||
require.NoError(t, providervalidation.ValidateProvider(ctx, r, blobtesting.TestValidationOptions))
|
||||
|
||||
if err := r.Close(ctx); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/providervalidation"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/internal/testutil"
|
||||
"github.com/kopia/kopia/repo/blob/gcs"
|
||||
@@ -44,6 +45,7 @@ func TestGCSStorage(t *testing.T) {
|
||||
|
||||
blobtesting.VerifyStorage(ctx, t, st)
|
||||
blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st)
|
||||
require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions))
|
||||
}
|
||||
|
||||
func TestGCSStorageInvalid(t *testing.T) {
|
||||
|
||||
@@ -111,7 +111,7 @@ func (r *rcloneStorage) DisplayName() string {
|
||||
return "RClone " + r.Options.RemotePath
|
||||
}
|
||||
|
||||
func (r *rcloneStorage) processStderrStatus(ctx context.Context, s *bufio.Scanner) {
|
||||
func (r *rcloneStorage) processStderrStatus(ctx context.Context, statsMarker string, s *bufio.Scanner) {
|
||||
for s.Scan() {
|
||||
l := s.Text()
|
||||
|
||||
@@ -119,8 +119,8 @@ func (r *rcloneStorage) processStderrStatus(ctx context.Context, s *bufio.Scanne
|
||||
log(ctx).Debugf("[RCLONE] %v", l)
|
||||
}
|
||||
|
||||
if strings.HasPrefix(l, "Transferred:") && strings.HasSuffix(l, "%") {
|
||||
if strings.HasSuffix(l, "100%") {
|
||||
if strings.Contains(l, statsMarker) {
|
||||
if strings.Contains(l, " 100%,") {
|
||||
atomic.StoreInt32(r.allTransfersComplete, 1)
|
||||
} else {
|
||||
atomic.StoreInt32(r.allTransfersComplete, 0)
|
||||
@@ -129,7 +129,7 @@ func (r *rcloneStorage) processStderrStatus(ctx context.Context, s *bufio.Scanne
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rcloneStorage) runRCloneAndWaitForServerAddress(ctx context.Context, c *exec.Cmd, startupTimeout time.Duration) (string, error) {
|
||||
func (r *rcloneStorage) runRCloneAndWaitForServerAddress(ctx context.Context, c *exec.Cmd, statsMarker string, startupTimeout time.Duration) (string, error) {
|
||||
rcloneAddressChan := make(chan string)
|
||||
rcloneErrChan := make(chan error)
|
||||
|
||||
@@ -159,7 +159,7 @@ func (r *rcloneStorage) runRCloneAndWaitForServerAddress(ctx context.Context, c
|
||||
if p := strings.Index(l, "https://"); p >= 0 {
|
||||
rcloneAddressChan <- l[p:]
|
||||
|
||||
go r.processStderrStatus(ctx, s)
|
||||
go r.processStderrStatus(ctx, statsMarker, s)
|
||||
|
||||
return
|
||||
}
|
||||
@@ -245,6 +245,8 @@ func New(ctx context.Context, opt *Options) (blob.Storage, error) {
|
||||
rcloneExe = opt.RCloneExe
|
||||
}
|
||||
|
||||
statsMarker := "STATS:KOPIA"
|
||||
|
||||
arguments := append([]string{
|
||||
"-v",
|
||||
"serve", "webdav", opt.RemotePath,
|
||||
@@ -253,6 +255,8 @@ func New(ctx context.Context, opt *Options) (blob.Storage, error) {
|
||||
"--key", temporaryKeyPath,
|
||||
"--htpasswd", temporaryHtpassword,
|
||||
"--stats", "1s",
|
||||
"--stats-one-line",
|
||||
"--stats-one-line-date-format=" + statsMarker,
|
||||
}, opt.RCloneArgs...)
|
||||
|
||||
if opt.EmbeddedConfig != "" {
|
||||
@@ -274,7 +278,7 @@ func New(ctx context.Context, opt *Options) (blob.Storage, error) {
|
||||
startupTimeout = time.Duration(opt.StartupTimeout) * time.Second
|
||||
}
|
||||
|
||||
rcloneAddr, err := r.runRCloneAndWaitForServerAddress(ctx, r.cmd, startupTimeout)
|
||||
rcloneAddr, err := r.runRCloneAndWaitForServerAddress(ctx, r.cmd, statsMarker, startupTimeout)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to start rclone")
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/providervalidation"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/internal/testutil"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
@@ -321,6 +322,7 @@ func testStorage(t *testing.T, options *Options) {
|
||||
|
||||
blobtesting.VerifyStorage(ctx, t, st)
|
||||
blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st)
|
||||
require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions))
|
||||
}
|
||||
|
||||
func TestCustomTransportNoSSLVerify(t *testing.T) {
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/providervalidation"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/internal/testutil"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
@@ -187,6 +188,7 @@ func TestSFTPStorageValid(t *testing.T) {
|
||||
|
||||
blobtesting.VerifyStorage(ctx, t, st)
|
||||
blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st)
|
||||
require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions))
|
||||
|
||||
// delete everything again
|
||||
deleteBlobs(ctx, t, st)
|
||||
|
||||
@@ -133,7 +133,7 @@ func (d *davStorageImpl) PutBlobInPath(ctx context.Context, dirPath, filePath st
|
||||
|
||||
// An error above may indicate that the directory doesn't exist.
|
||||
// Attempt to create required directories and try again, if successful.
|
||||
if !mkdirAttempted {
|
||||
if !mkdirAttempted && dirPath != "" {
|
||||
mkdirAttempted = true
|
||||
|
||||
if mkdirErr := d.cli.MkdirAll(dirPath, defaultDirPerm); mkdirErr == nil {
|
||||
|
||||
@@ -9,9 +9,11 @@
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/net/webdav"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/providervalidation"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/internal/testutil"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
@@ -164,6 +166,7 @@ func verifyWebDAVStorage(t *testing.T, url, username, password string, shardSpec
|
||||
|
||||
blobtesting.VerifyStorage(ctx, t, st)
|
||||
blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st)
|
||||
require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions))
|
||||
|
||||
if err := st.Close(ctx); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
||||
Reference in New Issue
Block a user