From e42cc6ccceaaf28d4f7c7e3855de5cee91cef5f5 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Mon, 19 Jul 2021 21:42:24 -0700 Subject: [PATCH] Added 'kopia repository validate-provider` (#1205) * cli: added 'repository validate-provider' which runs a set of tests against blob storage provider to validate it This implements a provider tests which exercises subtle behaviors which are not always correctly implemented by providers claiming compatibility with S3, for example. The test checks: - not found behavior - prefix scans - timestamps - write atomicity * retry: improved error message on failure * rclone: fixed stats reporting and awaiting for completion * webdav: prevent panic when attempting to mkdir with empty name * testing: run providervalidation.ValidateProvider as part of regular provider tests * cli: print a recommendation to validate provider after repository creation --- cli/app.go | 1 + cli/command_repository.go | 20 +- cli/command_repository_create.go | 14 +- cli/command_repository_validate_provider.go | 35 ++ internal/blobtesting/verify.go | 13 + .../providervalidation/providervalidation.go | 379 ++++++++++++++++++ .../providervalidation_test.go | 18 + internal/retry/retry.go | 6 +- repo/blob/azure/azure_storage_test.go | 3 + repo/blob/b2/b2_storage_test.go | 2 + .../filesystem/filesystem_storage_test.go | 4 + repo/blob/gcs/gcs_storage_test.go | 2 + repo/blob/rclone/rclone_storage.go | 16 +- repo/blob/s3/s3_storage_test.go | 2 + repo/blob/sftp/sftp_storage_test.go | 2 + repo/blob/webdav/webdav_storage.go | 2 +- repo/blob/webdav/webdav_storage_test.go | 3 + 17 files changed, 504 insertions(+), 18 deletions(-) create mode 100644 cli/command_repository_validate_provider.go create mode 100644 internal/providervalidation/providervalidation.go create mode 100644 internal/providervalidation/providervalidation_test.go diff --git a/cli/app.go b/cli/app.go index fc16a261d..8efcff458 100644 --- a/cli/app.go +++ b/cli/app.go @@ -28,6 +28,7 @@ defaultColor = color.New() warningColor = color.New(color.FgYellow) errorColor = color.New(color.FgHiRed) + noteColor = color.New(color.FgHiCyan) ) type textOutput struct { diff --git a/cli/command_repository.go b/cli/command_repository.go index 276e96763..58591ff3e 100644 --- a/cli/command_repository.go +++ b/cli/command_repository.go @@ -1,15 +1,16 @@ package cli type commandRepository struct { - connect commandRepositoryConnect - create commandRepositoryCreate - disconnect commandRepositoryDisconnect - repair commandRepositoryRepair - setClient commandRepositorySetClient - setParameters commandRepositorySetParameters - changePassword commandRepositoryChangePassword - status commandRepositoryStatus - syncTo commandRepositorySyncTo + connect commandRepositoryConnect + create commandRepositoryCreate + disconnect commandRepositoryDisconnect + repair commandRepositoryRepair + setClient commandRepositorySetClient + setParameters commandRepositorySetParameters + changePassword commandRepositoryChangePassword + status commandRepositoryStatus + syncTo commandRepositorySyncTo + validateProvider commandRepositoryValidateProvider } func (c *commandRepository) setup(svc advancedAppServices, parent commandParent) { @@ -24,4 +25,5 @@ func (c *commandRepository) setup(svc advancedAppServices, parent commandParent) c.status.setup(svc, cmd) c.syncTo.setup(svc, cmd) c.changePassword.setup(svc, cmd) + c.validateProvider.setup(svc, cmd) } diff --git a/cli/command_repository_create.go b/cli/command_repository_create.go index ad371fe79..6a1146b57 100644 --- a/cli/command_repository_create.go +++ b/cli/command_repository_create.go @@ -17,6 +17,12 @@ "github.com/kopia/kopia/snapshot/policy" ) +const runValidationNote = `NOTE: To validate that your provider is compatible with Kopia, please run: + +$ kopia repository validate-provider + +` + type commandRepositoryCreate struct { createBlockHashFormat string createBlockEncryptionFormat string @@ -137,7 +143,13 @@ func (c *commandRepositoryCreate) runCreateCommandWithStorage(ctx context.Contex return errors.Wrap(err, "unable to connect to repository") } - return c.populateRepository(ctx, pass) + if err := c.populateRepository(ctx, pass); err != nil { + return errors.Wrap(err, "error populating repository") + } + + noteColor.Fprintf(c.out.stdout(), runValidationNote) // nolint:errcheck + + return nil } func (c *commandRepositoryCreate) populateRepository(ctx context.Context, password string) error { diff --git a/cli/command_repository_validate_provider.go b/cli/command_repository_validate_provider.go new file mode 100644 index 000000000..dd0f8b4fc --- /dev/null +++ b/cli/command_repository_validate_provider.go @@ -0,0 +1,35 @@ +package cli + +import ( + "context" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/providervalidation" + "github.com/kopia/kopia/repo" +) + +type commandRepositoryValidateProvider struct { + opt providervalidation.Options + out textOutput +} + +func (c *commandRepositoryValidateProvider) setup(svc advancedAppServices, parent commandParent) { + cmd := parent.Command("validate-provider", "Validates that a repository provider is compatible with Kopia") + + c.opt = providervalidation.DefaultOptions + + cmd.Flag("concurrency-test-duration", "Duration of concurrency test").DurationVar(&c.opt.ConcurrencyTestDuration) + cmd.Flag("put-blob-workers", "Number of PutBlob workers").IntVar(&c.opt.NumPutBlobWorkers) + cmd.Flag("get-blob-workers", "Number of GetBlob workers").IntVar(&c.opt.NumGetBlobWorkers) + cmd.Flag("get-metadata-workers", "Number of GetMetadata workers").IntVar(&c.opt.NumGetMetadataWorkers) + c.out.setup(svc) + + cmd.Action(c.out.svc.directRepositoryWriteAction(c.run)) +} + +func (c *commandRepositoryValidateProvider) run(ctx context.Context, dr repo.DirectRepositoryWriter) error { + return errors.Wrap( + providervalidation.ValidateProvider(ctx, dr.BlobStorage(), c.opt), + "provider validation error") +} diff --git a/internal/blobtesting/verify.go b/internal/blobtesting/verify.go index 7137a731d..e65a04f8e 100644 --- a/internal/blobtesting/verify.go +++ b/internal/blobtesting/verify.go @@ -12,6 +12,7 @@ "github.com/stretchr/testify/require" "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/providervalidation" "github.com/kopia/kopia/repo/blob" ) @@ -168,3 +169,15 @@ func AssertConnectionInfoRoundTrips(ctx context.Context, t *testing.T, s blob.St require.NoError(t, s2.Close(ctx)) } + +// TestValidationOptions is the set of options used when running providing validation from tests. +// nolint:gomnd +var TestValidationOptions = providervalidation.Options{ + MaxClockDrift: 3 * time.Minute, + ConcurrencyTestDuration: 15 * time.Second, + NumPutBlobWorkers: 3, + NumGetBlobWorkers: 3, + NumGetMetadataWorkers: 3, + NumListBlobsWorkers: 3, + MaxBlobLength: 10e6, +} diff --git a/internal/providervalidation/providervalidation.go b/internal/providervalidation/providervalidation.go new file mode 100644 index 000000000..4fbacf95b --- /dev/null +++ b/internal/providervalidation/providervalidation.go @@ -0,0 +1,379 @@ +// Package providervalidation implements validation to ensure the blob storage is compatible with Kopia requirements. +package providervalidation + +import ( + "bytes" + "context" + cryptorand "crypto/rand" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/google/uuid" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + + "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/logging" +) + +// Options provides options for provider validation. +type Options struct { + MaxClockDrift time.Duration + ConcurrencyTestDuration time.Duration + + NumPutBlobWorkers int + NumGetBlobWorkers int + NumGetMetadataWorkers int + NumListBlobsWorkers int + MaxBlobLength int +} + +// DefaultOptions is the default set of options. +// nolint:gomnd +var DefaultOptions = Options{ + MaxClockDrift: 3 * time.Minute, + ConcurrencyTestDuration: 30 * time.Second, + NumPutBlobWorkers: 3, + NumGetBlobWorkers: 3, + NumGetMetadataWorkers: 3, + NumListBlobsWorkers: 3, + MaxBlobLength: 10e6, +} + +const blobIDLength = 16 + +var log = logging.GetContextLoggerFunc("providervalidation") + +// ValidateProvider runs a series of tests against provided storage to validate that +// it can be used with Kopia. +// nolint:gomnd,funlen,gocyclo +func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error { + uberPrefix := blob.ID("z" + uuid.NewString()) + defer cleanupAllBlobs(ctx, st, uberPrefix) + + prefix1 := uberPrefix + "a" + prefix2 := uberPrefix + "b" + + log(ctx).Infof("Validating blob list responses") + + if err := verifyBlobCount(ctx, st, uberPrefix, 0); err != nil { + return errors.Wrap(err, "invalid blob count") + } + + log(ctx).Infof("Validating non-existent blob responses") + + // read non-existent full blob + if _, err := st.GetBlob(ctx, prefix1+"1", 0, -1); !errors.Is(err, blob.ErrBlobNotFound) { + return errors.Errorf("got unexpected error when reading non-existent blob: %v", err) + } + + // read non-existent partial blob + if _, err := st.GetBlob(ctx, prefix1+"1", 0, 5); !errors.Is(err, blob.ErrBlobNotFound) { + return errors.Errorf("got unexpected error when reading non-existent partial blob: %v", err) + } + + // get metadata for non-existent blob + if _, err := st.GetMetadata(ctx, prefix1+"1"); !errors.Is(err, blob.ErrBlobNotFound) { + return errors.Errorf("got unexpected error when getting metadata for non-existent blob: %v", err) + } + + blobData := bytes.Repeat([]byte{1, 2, 3, 4, 5}, 1e6) + + log(ctx).Infof("Writing blob (%v bytes)", len(blobData)) + + // write blob + if err := st.PutBlob(ctx, prefix1+"1", gather.FromSlice(blobData)); err != nil { + return errors.Wrap(err, "error writing blob #1") + } + + log(ctx).Infof("Validating list responses...") + + if err := verifyBlobCount(ctx, st, uberPrefix, 1); err != nil { + return errors.Wrap(err, "invalid uber blob count") + } + + if err := verifyBlobCount(ctx, st, prefix1, 1); err != nil { + return errors.Wrap(err, "invalid blob count with prefix 1") + } + + if err := verifyBlobCount(ctx, st, prefix2, 0); err != nil { + return errors.Wrap(err, "invalid blob count with prefix 2") + } + + log(ctx).Infof("Validating partial reads...") + + partialBlobCases := []struct { + offset int64 + length int64 + }{ + {0, 10}, + {1, 10}, + {2, 1}, + {5, 0}, + {int64(len(blobData)) - 5, 5}, + } + + for _, tc := range partialBlobCases { + v, err := st.GetBlob(ctx, prefix1+"1", tc.offset, tc.length) + if err != nil { + return errors.Wrapf(err, "got unexpected error when reading partial blob @%v+%v", tc.offset, tc.length) + } + + if got, want := v, blobData[tc.offset:tc.offset+tc.length]; !bytes.Equal(got, want) { + return errors.Errorf("got unexpected data after reading partial blob @%v+%v: %x, wanted %x", tc.offset, tc.length, got, want) + } + } + + log(ctx).Infof("Validating full reads...") + + // read full blob + v, err := st.GetBlob(ctx, prefix1+"1", 0, -1) + if err != nil { + return errors.Wrap(err, "got unexpected error when reading partial blob") + } + + if got, want := v, blobData; !bytes.Equal(got, want) { + return errors.Errorf("got unexpected data after reading partial blob: %x, wanted %x", got, want) + } + + log(ctx).Infof("Validating metadata...") + + // get metadata for non-existent blob + bm, err := st.GetMetadata(ctx, prefix1+"1") + if err != nil { + return errors.Wrap(err, "got unexpected error when getting metadata for blob") + } + + if got, want := bm.Length, int64(len(blobData)); got != want { + return errors.Errorf("invalid length returned by GetMetadata(): %v, wanted %v", got, want) + } + + now := clock.Now() + + timeDiff := now.Sub(bm.Timestamp) + if timeDiff < 0 { + timeDiff = -timeDiff + } + + if timeDiff > opt.MaxClockDrift { + return errors.Errorf( + "newly-written blob has a timestamp very different from local clock: %v, expected %v. Max difference allowed is %v", + bm.Timestamp, + now, + opt.MaxClockDrift, + ) + } + + ct := newConcurrencyTest(st, prefix2, opt) + log(ctx).Infof("Running concurrency test for %v...", opt.ConcurrencyTestDuration) + + if err := ct.run(ctx); err != nil { + return errors.Wrap(err, "error validating concurrency") + } + + log(ctx).Infof("All good.") + + return nil +} + +type concurrencyTest struct { + opt Options + st blob.Storage + prefix blob.ID + deadline time.Time + + mu sync.Mutex + blobData map[blob.ID][]byte + blobIDs []blob.ID + blobWritten map[blob.ID]bool +} + +func newConcurrencyTest(st blob.Storage, prefix blob.ID, opt Options) *concurrencyTest { + return &concurrencyTest{ + opt: opt, + st: st, + prefix: prefix, + deadline: clock.Now().Add(opt.ConcurrencyTestDuration), + + blobData: make(map[blob.ID][]byte), + blobWritten: make(map[blob.ID]bool), + } +} + +func (c *concurrencyTest) putBlobWorker(ctx context.Context, worker int) func() error { + return func() error { + for clock.Now().Before(c.deadline) { + blobLen := blobIDLength + rand.Intn(c.opt.MaxBlobLength-blobIDLength) //nolint:gosec + + data := make([]byte, blobLen) + if _, err := cryptorand.Read(data); err != nil { + return errors.Wrap(err, "unable to get randomness") + } + + id := c.prefix + blob.ID(fmt.Sprintf("%x", data[0:16])) + + c.mu.Lock() + c.blobData[id] = data + c.blobIDs = append(c.blobIDs, id) + c.mu.Unlock() + + // sleep for a short time so that readers can start getting the blob when it's still + // not written. + c.randomSleep() + + log(ctx).Debugf("PutBlob worker %v writing %v (%v bytes)", worker, id, len(data)) + + if err := c.st.PutBlob(ctx, id, gather.FromSlice(data)); err != nil { + return errors.Wrap(err, "error writing blob") + } + + c.mu.Lock() + c.blobWritten[id] = true + c.mu.Unlock() + + log(ctx).Debugf("PutBlob worker %v finished writing %v (%v bytes)", worker, id, len(data)) + } + + return nil + } +} + +func (c *concurrencyTest) randomSleep() { + time.Sleep(time.Duration(rand.Intn(int(100 * time.Millisecond)))) //nolint:gosec,gomnd +} + +func (c *concurrencyTest) pickBlob() (blob.ID, []byte, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + if len(c.blobIDs) == 0 { + return "", nil, false + } + + id := c.blobIDs[rand.Intn(len(c.blobIDs))] // nolint:gosec + + return id, c.blobData[id], c.blobWritten[id] +} + +func (c *concurrencyTest) getBlobWorker(ctx context.Context, worker int) func() error { + return func() error { + for clock.Now().Before(c.deadline) { + c.randomSleep() + + blobID, blobData, fullyWritten := c.pickBlob() + if blobID == "" { + continue + } + + log(ctx).Debugf("GetBlob worker %v reading %v", worker, blobID) + + v, err := c.st.GetBlob(ctx, blobID, 0, -1) + if err != nil { + if !errors.Is(err, blob.ErrBlobNotFound) || fullyWritten { + return errors.Wrapf(err, "unexpected error when reading %v", blobID) + } + + log(ctx).Debugf("GetBlob worker %v - valid error when reading %v", worker, blobID) + + continue + } + + if !bytes.Equal(v, blobData) { + return errors.Wrapf(err, "invalid data read for %v", blobID) + } + + log(ctx).Debugf("GetBlob worker %v - valid data read %v", worker, blobID) + } + + return nil + } +} + +func (c *concurrencyTest) getMetadataWorker(ctx context.Context, worker int) func() error { + return func() error { + for clock.Now().Before(c.deadline) { + c.randomSleep() + + blobID, blobData, fullyWritten := c.pickBlob() + if blobID == "" { + continue + } + + log(ctx).Debugf("GetMetadata worker %v: %v", worker, blobID) + + bm, err := c.st.GetMetadata(ctx, blobID) + if err != nil { + if !errors.Is(err, blob.ErrBlobNotFound) || fullyWritten { + return errors.Wrapf(err, "unexpected error when reading %v", blobID) + } + + log(ctx).Debugf("GetMetadata worker %v - valid error when reading %v", worker, blobID) + + continue + } + + if bm.Length != int64(len(blobData)) { + return errors.Wrapf(err, "unexpected partial read - invalid length read for %v: %v wanted %v", blobID, bm.Length, len(blobData)) + } + + log(ctx).Debugf("GetMetadata worker %v - valid data read %v", worker, blobID) + } + + return nil + } +} + +func (c *concurrencyTest) listBlobWorker(ctx context.Context, worker int) func() error { + return func() error { + return nil + } +} + +func (c *concurrencyTest) run(ctx context.Context) error { + eg, ctx := errgroup.WithContext(ctx) + + for worker := 0; worker < c.opt.NumPutBlobWorkers; worker++ { + eg.Go(c.putBlobWorker(ctx, worker)) + } + + for worker := 0; worker < c.opt.NumGetBlobWorkers; worker++ { + eg.Go(c.getBlobWorker(ctx, worker)) + } + + for worker := 0; worker < c.opt.NumGetMetadataWorkers; worker++ { + eg.Go(c.getMetadataWorker(ctx, worker)) + } + + for worker := 0; worker < c.opt.NumListBlobsWorkers; worker++ { + eg.Go(c.listBlobWorker(ctx, worker)) + } + + return errors.Wrap(eg.Wait(), "encountered errors") +} + +func cleanupAllBlobs(ctx context.Context, st blob.Storage, prefix blob.ID) { + log(ctx).Infof("Cleaning up temporary data...") + + if err := st.ListBlobs(ctx, prefix, func(bm blob.Metadata) error { + return errors.Wrapf(st.DeleteBlob(ctx, bm.BlobID), "error deleting blob %v", bm.BlobID) + }); err != nil { + log(ctx).Debugf("error cleaning up") + } +} + +func verifyBlobCount(ctx context.Context, st blob.Storage, prefix blob.ID, want int) error { + got, err := blob.ListAllBlobs(ctx, st, prefix) + if err != nil { + return errors.Wrap(err, "error listing blobs") + } + + if len(got) != want { + return errors.Errorf("unexpected number of blobs returned for prefix %v: %v, wanted %v", prefix, len(got), want) + } + + return nil +} diff --git a/internal/providervalidation/providervalidation_test.go b/internal/providervalidation/providervalidation_test.go new file mode 100644 index 000000000..a0a226989 --- /dev/null +++ b/internal/providervalidation/providervalidation_test.go @@ -0,0 +1,18 @@ +package providervalidation_test + +import ( + "testing" + "time" + + "github.com/kopia/kopia/internal/blobtesting" + "github.com/kopia/kopia/internal/providervalidation" + "github.com/kopia/kopia/internal/testlogging" +) + +func TestProviderValidation(t *testing.T) { + ctx := testlogging.Context(t) + st := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, nil) + opt := providervalidation.DefaultOptions + opt.ConcurrencyTestDuration = 15 * time.Second + providervalidation.ValidateProvider(ctx, st, opt) +} diff --git a/internal/retry/retry.go b/internal/retry/retry.go index d04139df8..3025b4cc1 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -53,12 +53,16 @@ func PeriodicallyNoValue(ctx context.Context, interval time.Duration, count int, func internalRetry(ctx context.Context, desc string, attempt AttemptFunc, isRetriableError IsRetriableFunc, initial, max time.Duration, count int, factor float64) (interface{}, error) { sleepAmount := initial + var lastError error + for i := 0; i < count; i++ { v, err := attempt() if err == nil { return v, nil } + lastError = err + if !isRetriableError(err) { return v, err } @@ -72,7 +76,7 @@ func internalRetry(ctx context.Context, desc string, attempt AttemptFunc, isRetr } } - return nil, errors.Errorf("unable to complete %v despite %v retries", desc, count) + return nil, errors.Errorf("unable to complete %v despite %v retries, last error: %v", desc, count, lastError) } // WithExponentialBackoffNoValue is a shorthand for WithExponentialBackoff except the diff --git a/repo/blob/azure/azure_storage_test.go b/repo/blob/azure/azure_storage_test.go index d658c4158..c5cc90f05 100644 --- a/repo/blob/azure/azure_storage_test.go +++ b/repo/blob/azure/azure_storage_test.go @@ -14,6 +14,7 @@ "github.com/kopia/kopia/internal/blobtesting" "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/providervalidation" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/internal/testutil" "github.com/kopia/kopia/repo/blob/azure" @@ -119,6 +120,7 @@ func TestAzureStorage(t *testing.T) { blobtesting.VerifyStorage(ctx, t, st) blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st) + require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions)) } func TestAzureStorageSASToken(t *testing.T) { @@ -147,6 +149,7 @@ func TestAzureStorageSASToken(t *testing.T) { blobtesting.VerifyStorage(ctx, t, st) blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st) + require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions)) } func TestAzureStorageInvalidBlob(t *testing.T) { diff --git a/repo/blob/b2/b2_storage_test.go b/repo/blob/b2/b2_storage_test.go index c471f92e2..76499d0e4 100644 --- a/repo/blob/b2/b2_storage_test.go +++ b/repo/blob/b2/b2_storage_test.go @@ -11,6 +11,7 @@ "github.com/kopia/kopia/internal/blobtesting" "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/providervalidation" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/internal/testutil" "github.com/kopia/kopia/repo/blob" @@ -79,6 +80,7 @@ func TestB2Storage(t *testing.T) { blobtesting.VerifyStorage(ctx, t, st) blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st) + require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions)) } func TestB2StorageInvalidBlob(t *testing.T) { diff --git a/repo/blob/filesystem/filesystem_storage_test.go b/repo/blob/filesystem/filesystem_storage_test.go index 8ac9d02b7..48fa7f823 100644 --- a/repo/blob/filesystem/filesystem_storage_test.go +++ b/repo/blob/filesystem/filesystem_storage_test.go @@ -6,8 +6,11 @@ "testing" "time" + "github.com/stretchr/testify/require" + "github.com/kopia/kopia/internal/blobtesting" "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/providervalidation" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/internal/testutil" "github.com/kopia/kopia/repo/blob" @@ -42,6 +45,7 @@ func TestFileStorage(t *testing.T) { blobtesting.VerifyStorage(ctx, t, r) blobtesting.AssertConnectionInfoRoundTrips(ctx, t, r) + require.NoError(t, providervalidation.ValidateProvider(ctx, r, blobtesting.TestValidationOptions)) if err := r.Close(ctx); err != nil { t.Fatalf("err: %v", err) diff --git a/repo/blob/gcs/gcs_storage_test.go b/repo/blob/gcs/gcs_storage_test.go index 7416f6241..1d9954d7d 100644 --- a/repo/blob/gcs/gcs_storage_test.go +++ b/repo/blob/gcs/gcs_storage_test.go @@ -12,6 +12,7 @@ "github.com/stretchr/testify/require" "github.com/kopia/kopia/internal/blobtesting" + "github.com/kopia/kopia/internal/providervalidation" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/internal/testutil" "github.com/kopia/kopia/repo/blob/gcs" @@ -44,6 +45,7 @@ func TestGCSStorage(t *testing.T) { blobtesting.VerifyStorage(ctx, t, st) blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st) + require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions)) } func TestGCSStorageInvalid(t *testing.T) { diff --git a/repo/blob/rclone/rclone_storage.go b/repo/blob/rclone/rclone_storage.go index a14591cbe..f28e8b125 100644 --- a/repo/blob/rclone/rclone_storage.go +++ b/repo/blob/rclone/rclone_storage.go @@ -111,7 +111,7 @@ func (r *rcloneStorage) DisplayName() string { return "RClone " + r.Options.RemotePath } -func (r *rcloneStorage) processStderrStatus(ctx context.Context, s *bufio.Scanner) { +func (r *rcloneStorage) processStderrStatus(ctx context.Context, statsMarker string, s *bufio.Scanner) { for s.Scan() { l := s.Text() @@ -119,8 +119,8 @@ func (r *rcloneStorage) processStderrStatus(ctx context.Context, s *bufio.Scanne log(ctx).Debugf("[RCLONE] %v", l) } - if strings.HasPrefix(l, "Transferred:") && strings.HasSuffix(l, "%") { - if strings.HasSuffix(l, "100%") { + if strings.Contains(l, statsMarker) { + if strings.Contains(l, " 100%,") { atomic.StoreInt32(r.allTransfersComplete, 1) } else { atomic.StoreInt32(r.allTransfersComplete, 0) @@ -129,7 +129,7 @@ func (r *rcloneStorage) processStderrStatus(ctx context.Context, s *bufio.Scanne } } -func (r *rcloneStorage) runRCloneAndWaitForServerAddress(ctx context.Context, c *exec.Cmd, startupTimeout time.Duration) (string, error) { +func (r *rcloneStorage) runRCloneAndWaitForServerAddress(ctx context.Context, c *exec.Cmd, statsMarker string, startupTimeout time.Duration) (string, error) { rcloneAddressChan := make(chan string) rcloneErrChan := make(chan error) @@ -159,7 +159,7 @@ func (r *rcloneStorage) runRCloneAndWaitForServerAddress(ctx context.Context, c if p := strings.Index(l, "https://"); p >= 0 { rcloneAddressChan <- l[p:] - go r.processStderrStatus(ctx, s) + go r.processStderrStatus(ctx, statsMarker, s) return } @@ -245,6 +245,8 @@ func New(ctx context.Context, opt *Options) (blob.Storage, error) { rcloneExe = opt.RCloneExe } + statsMarker := "STATS:KOPIA" + arguments := append([]string{ "-v", "serve", "webdav", opt.RemotePath, @@ -253,6 +255,8 @@ func New(ctx context.Context, opt *Options) (blob.Storage, error) { "--key", temporaryKeyPath, "--htpasswd", temporaryHtpassword, "--stats", "1s", + "--stats-one-line", + "--stats-one-line-date-format=" + statsMarker, }, opt.RCloneArgs...) if opt.EmbeddedConfig != "" { @@ -274,7 +278,7 @@ func New(ctx context.Context, opt *Options) (blob.Storage, error) { startupTimeout = time.Duration(opt.StartupTimeout) * time.Second } - rcloneAddr, err := r.runRCloneAndWaitForServerAddress(ctx, r.cmd, startupTimeout) + rcloneAddr, err := r.runRCloneAndWaitForServerAddress(ctx, r.cmd, statsMarker, startupTimeout) if err != nil { return nil, errors.Wrap(err, "unable to start rclone") } diff --git a/repo/blob/s3/s3_storage_test.go b/repo/blob/s3/s3_storage_test.go index 6fbba5e2b..fa5a6306b 100644 --- a/repo/blob/s3/s3_storage_test.go +++ b/repo/blob/s3/s3_storage_test.go @@ -23,6 +23,7 @@ "github.com/kopia/kopia/internal/blobtesting" "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/providervalidation" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/internal/testutil" "github.com/kopia/kopia/repo/blob" @@ -321,6 +322,7 @@ func testStorage(t *testing.T, options *Options) { blobtesting.VerifyStorage(ctx, t, st) blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st) + require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions)) } func TestCustomTransportNoSSLVerify(t *testing.T) { diff --git a/repo/blob/sftp/sftp_storage_test.go b/repo/blob/sftp/sftp_storage_test.go index 17211b455..90cb49ef6 100644 --- a/repo/blob/sftp/sftp_storage_test.go +++ b/repo/blob/sftp/sftp_storage_test.go @@ -19,6 +19,7 @@ "github.com/kopia/kopia/internal/blobtesting" "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/providervalidation" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/internal/testutil" "github.com/kopia/kopia/repo/blob" @@ -187,6 +188,7 @@ func TestSFTPStorageValid(t *testing.T) { blobtesting.VerifyStorage(ctx, t, st) blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st) + require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions)) // delete everything again deleteBlobs(ctx, t, st) diff --git a/repo/blob/webdav/webdav_storage.go b/repo/blob/webdav/webdav_storage.go index ee0bce0b6..f166a0175 100644 --- a/repo/blob/webdav/webdav_storage.go +++ b/repo/blob/webdav/webdav_storage.go @@ -133,7 +133,7 @@ func (d *davStorageImpl) PutBlobInPath(ctx context.Context, dirPath, filePath st // An error above may indicate that the directory doesn't exist. // Attempt to create required directories and try again, if successful. - if !mkdirAttempted { + if !mkdirAttempted && dirPath != "" { mkdirAttempted = true if mkdirErr := d.cli.MkdirAll(dirPath, defaultDirPerm); mkdirErr == nil { diff --git a/repo/blob/webdav/webdav_storage_test.go b/repo/blob/webdav/webdav_storage_test.go index c208c5e9b..25d30b1ee 100644 --- a/repo/blob/webdav/webdav_storage_test.go +++ b/repo/blob/webdav/webdav_storage_test.go @@ -9,9 +9,11 @@ "os" "testing" + "github.com/stretchr/testify/require" "golang.org/x/net/webdav" "github.com/kopia/kopia/internal/blobtesting" + "github.com/kopia/kopia/internal/providervalidation" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/internal/testutil" "github.com/kopia/kopia/repo/blob" @@ -164,6 +166,7 @@ func verifyWebDAVStorage(t *testing.T, url, username, password string, shardSpec blobtesting.VerifyStorage(ctx, t, st) blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st) + require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions)) if err := st.Close(ctx); err != nil { t.Fatalf("err: %v", err)