diff --git a/cli/command_repository_validate_provider.go b/cli/command_repository_validate_provider.go index dd0f8b4fc..0cd396131 100644 --- a/cli/command_repository_validate_provider.go +++ b/cli/command_repository_validate_provider.go @@ -19,6 +19,7 @@ func (c *commandRepositoryValidateProvider) setup(svc advancedAppServices, paren c.opt = providervalidation.DefaultOptions + cmd.Flag("num-storage-connections", "Number of storage connections").IntVar(&c.opt.NumEquivalentStorageConnections) cmd.Flag("concurrency-test-duration", "Duration of concurrency test").DurationVar(&c.opt.ConcurrencyTestDuration) cmd.Flag("put-blob-workers", "Number of PutBlob workers").IntVar(&c.opt.NumPutBlobWorkers) cmd.Flag("get-blob-workers", "Number of GetBlob workers").IntVar(&c.opt.NumGetBlobWorkers) diff --git a/internal/blobtesting/verify.go b/internal/blobtesting/verify.go index 87d44b560..7cf334f41 100644 --- a/internal/blobtesting/verify.go +++ b/internal/blobtesting/verify.go @@ -231,11 +231,12 @@ func AssertConnectionInfoRoundTrips(ctx context.Context, t *testing.T, s blob.St // //nolint:gomnd var TestValidationOptions = providervalidation.Options{ - MaxClockDrift: 3 * time.Minute, - ConcurrencyTestDuration: 15 * time.Second, - NumPutBlobWorkers: 3, - NumGetBlobWorkers: 3, - NumGetMetadataWorkers: 3, - NumListBlobsWorkers: 3, - MaxBlobLength: 10e6, + MaxClockDrift: 3 * time.Minute, + ConcurrencyTestDuration: 15 * time.Second, + NumEquivalentStorageConnections: 5, + NumPutBlobWorkers: 3, + NumGetBlobWorkers: 3, + NumGetMetadataWorkers: 3, + NumListBlobsWorkers: 3, + MaxBlobLength: 10e6, } diff --git a/internal/providervalidation/providervalidation.go b/internal/providervalidation/providervalidation.go index ed00f04b5..a311b8f98 100644 --- a/internal/providervalidation/providervalidation.go +++ b/internal/providervalidation/providervalidation.go @@ -12,12 +12,15 @@ "github.com/google/uuid" "github.com/pkg/errors" + "go.uber.org/multierr" "golang.org/x/sync/errgroup" "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/logging" + + loggingwrapper "github.com/kopia/kopia/repo/blob/logging" ) // Options provides options for provider validation. @@ -25,6 +28,8 @@ type Options struct { MaxClockDrift time.Duration ConcurrencyTestDuration time.Duration + NumEquivalentStorageConnections int + NumPutBlobWorkers int NumGetBlobWorkers int NumGetMetadataWorkers int @@ -36,35 +41,92 @@ type Options struct { // //nolint:gomnd,gochecknoglobals var DefaultOptions = Options{ - MaxClockDrift: 3 * time.Minute, - ConcurrencyTestDuration: 30 * time.Second, - NumPutBlobWorkers: 3, - NumGetBlobWorkers: 3, - NumGetMetadataWorkers: 3, - NumListBlobsWorkers: 3, - MaxBlobLength: 10e6, + MaxClockDrift: 3 * time.Minute, + ConcurrencyTestDuration: 30 * time.Second, + NumEquivalentStorageConnections: 5, + NumPutBlobWorkers: 3, + NumGetBlobWorkers: 3, + NumGetMetadataWorkers: 3, + NumListBlobsWorkers: 3, + MaxBlobLength: 10e6, } var log = logging.Module("providervalidation") +// equivalentBlobStorageConnections is a slice of different instances of the same blob storage provider +// connecting to the same underlying storage. +type equivalentBlobStorageConnections []blob.Storage + +func (st equivalentBlobStorageConnections) pickOne() blob.Storage { + return st[rand.Intn(len(st))] //nolint:gosec +} + +// closeAdditional closes all but the first connection to the underlying storage. +func (st equivalentBlobStorageConnections) closeAdditional(ctx context.Context) error { + var err error + + for i := 1; i < len(st); i++ { + err = multierr.Combine(err, st[i].Close(ctx)) + } + + return errors.Wrap(err, "error closing additional connections") +} + +// openEquivalentStorageConnections creates n-1 additional connections to the same underlying storage +// and returns a slice of all connections. +func openEquivalentStorageConnections(ctx context.Context, st blob.Storage, n int) (equivalentBlobStorageConnections, error) { + result := equivalentBlobStorageConnections{st} + ci := st.ConnectionInfo() + + log(ctx).Infof("Opening %v equivalent storage connections...", n-1) + + for i := 1; i < n; i++ { + c, err := blob.NewStorage(ctx, ci, false) + if err != nil { + if cerr := result.closeAdditional(ctx); cerr != nil { + log(ctx).Warn("unable to close storage connection", "err", cerr) + } + + return nil, errors.Wrap(err, "unable to open storage connection") + } + + log(ctx).Debugw("opened equivalent storage connection", "connectionID", i) + + result = append(result, loggingwrapper.NewWrapper(c, log(ctx), fmt.Sprintf("[STORAGE-%v] ", i))) + } + + return result, nil +} + // ValidateProvider runs a series of tests against provided storage to validate that // it can be used with Kopia. // //nolint:gomnd,funlen,gocyclo,cyclop -func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error { +func ValidateProvider(ctx context.Context, st0 blob.Storage, opt Options) error { if os.Getenv("KOPIA_SKIP_PROVIDER_VALIDATION") != "" { return nil } + st, err := openEquivalentStorageConnections(ctx, st0, opt.NumEquivalentStorageConnections) + if err != nil { + return errors.Wrap(err, "unable to open additional storage connections") + } + + defer func() { + if cerr := st.closeAdditional(ctx); cerr != nil { + log(ctx).Warn("unable to close additional connections", "err", cerr) + } + }() + uberPrefix := blob.ID("z" + uuid.NewString()) - defer cleanupAllBlobs(ctx, st, uberPrefix) + defer cleanupAllBlobs(ctx, st[0], uberPrefix) prefix1 := uberPrefix + "a" prefix2 := uberPrefix + "b" log(ctx).Infof("Validating storage capacity and usage") - c, err := st.GetCapacity(ctx) + c, err := st.pickOne().GetCapacity(ctx) switch { case errors.Is(err, blob.ErrNotAVolume): @@ -77,7 +139,7 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error { log(ctx).Infof("Validating blob list responses") - if err := verifyBlobCount(ctx, st, uberPrefix, 0); err != nil { + if err := verifyBlobCount(ctx, st.pickOne(), uberPrefix, 0); err != nil { return errors.Wrap(err, "invalid blob count") } @@ -87,17 +149,17 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error { defer out.Close() // read non-existent full blob - if err := st.GetBlob(ctx, prefix1+"1", 0, -1, &out); !errors.Is(err, blob.ErrBlobNotFound) { + if err := st.pickOne().GetBlob(ctx, prefix1+"1", 0, -1, &out); !errors.Is(err, blob.ErrBlobNotFound) { return errors.Errorf("got unexpected error when reading non-existent blob: %v", err) } // read non-existent partial blob - if err := st.GetBlob(ctx, prefix1+"1", 0, 5, &out); !errors.Is(err, blob.ErrBlobNotFound) { + if err := st.pickOne().GetBlob(ctx, prefix1+"1", 0, 5, &out); !errors.Is(err, blob.ErrBlobNotFound) { return errors.Errorf("got unexpected error when reading non-existent partial blob: %v", err) } // get metadata for non-existent blob - if _, err := st.GetMetadata(ctx, prefix1+"1"); !errors.Is(err, blob.ErrBlobNotFound) { + if _, err := st.pickOne().GetMetadata(ctx, prefix1+"1"); !errors.Is(err, blob.ErrBlobNotFound) { return errors.Errorf("got unexpected error when getting metadata for non-existent blob: %v", err) } @@ -106,13 +168,13 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error { log(ctx).Infof("Writing blob (%v bytes)", len(blobData)) // write blob - if err := st.PutBlob(ctx, prefix1+"1", gather.FromSlice(blobData), blob.PutOptions{}); err != nil { + if err := st.pickOne().PutBlob(ctx, prefix1+"1", gather.FromSlice(blobData), blob.PutOptions{}); err != nil { return errors.Wrap(err, "error writing blob #1") } log(ctx).Infof("Validating conditional creates...") - err2 := st.PutBlob(ctx, prefix1+"1", gather.FromSlice([]byte{99}), blob.PutOptions{DoNotRecreate: true}) + err2 := st.pickOne().PutBlob(ctx, prefix1+"1", gather.FromSlice([]byte{99}), blob.PutOptions{DoNotRecreate: true}) switch { case errors.Is(err2, blob.ErrUnsupportedPutBlobOption): @@ -126,15 +188,15 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error { log(ctx).Infof("Validating list responses...") - if err := verifyBlobCount(ctx, st, uberPrefix, 1); err != nil { + if err := verifyBlobCount(ctx, st.pickOne(), uberPrefix, 1); err != nil { return errors.Wrap(err, "invalid uber blob count") } - if err := verifyBlobCount(ctx, st, prefix1, 1); err != nil { + if err := verifyBlobCount(ctx, st.pickOne(), prefix1, 1); err != nil { return errors.Wrap(err, "invalid blob count with prefix 1") } - if err := verifyBlobCount(ctx, st, prefix2, 0); err != nil { + if err := verifyBlobCount(ctx, st.pickOne(), prefix2, 0); err != nil { return errors.Wrap(err, "invalid blob count with prefix 2") } @@ -152,7 +214,7 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error { } for _, tc := range partialBlobCases { - err := st.GetBlob(ctx, prefix1+"1", tc.offset, tc.length, &out) + err := st.pickOne().GetBlob(ctx, prefix1+"1", tc.offset, tc.length, &out) if err != nil { return errors.Wrapf(err, "got unexpected error when reading partial blob @%v+%v", tc.offset, tc.length) } @@ -165,7 +227,7 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error { log(ctx).Infof("Validating full reads...") // read full blob - err2 = st.GetBlob(ctx, prefix1+"1", 0, -1, &out) + err2 = st.pickOne().GetBlob(ctx, prefix1+"1", 0, -1, &out) if err2 != nil { return errors.Wrap(err2, "got unexpected error when reading partial blob") } @@ -177,7 +239,7 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error { log(ctx).Infof("Validating metadata...") // get metadata for non-existent blob - bm, err2 := st.GetMetadata(ctx, prefix1+"1") + bm, err2 := st.pickOne().GetMetadata(ctx, prefix1+"1") if err2 != nil { return errors.Wrap(err2, "got unexpected error when getting metadata for blob") } @@ -216,7 +278,7 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error { type concurrencyTest struct { opt Options - st blob.Storage + st equivalentBlobStorageConnections prefix blob.ID deadline time.Time @@ -229,7 +291,7 @@ type concurrencyTest struct { blobWritten map[blob.ID]bool } -func newConcurrencyTest(st blob.Storage, prefix blob.ID, opt Options) *concurrencyTest { +func newConcurrencyTest(st []blob.Storage, prefix blob.ID, opt Options) *concurrencyTest { return &concurrencyTest{ opt: opt, st: st, @@ -271,7 +333,7 @@ func (c *concurrencyTest) putBlobWorker(ctx context.Context, worker int) func() log(ctx).Debugf("PutBlob worker %v writing %v (%v bytes)", worker, id, len(data)) - if err := c.st.PutBlob(ctx, id, gather.FromSlice(data), blob.PutOptions{}); err != nil { + if err := c.st.pickOne().PutBlob(ctx, id, gather.FromSlice(data), blob.PutOptions{}); err != nil { return errors.Wrap(err, "error writing blob") } @@ -321,7 +383,7 @@ func (c *concurrencyTest) getBlobWorker(ctx context.Context, worker int) func() log(ctx).Debugf("GetBlob worker %v reading %v", worker, blobID) - err := c.st.GetBlob(ctx, blobID, 0, -1, &out) + err := c.st.pickOne().GetBlob(ctx, blobID, 0, -1, &out) if err != nil { if !errors.Is(err, blob.ErrBlobNotFound) || fullyWritten { return errors.Wrapf(err, "unexpected error when reading %v", blobID) @@ -360,7 +422,7 @@ func (c *concurrencyTest) getMetadataWorker(ctx context.Context, worker int) fun log(ctx).Debugf("GetMetadata worker %v: %v", worker, blobID) - bm, err := c.st.GetMetadata(ctx, blobID) + bm, err := c.st.pickOne().GetMetadata(ctx, blobID) if err != nil { if !errors.Is(err, blob.ErrBlobNotFound) || fullyWritten { return errors.Wrapf(err, "unexpected error when reading %v", blobID) diff --git a/internal/providervalidation/providervalidation_test.go b/internal/providervalidation/providervalidation_test.go index 0be0c7ba6..9744a9703 100644 --- a/internal/providervalidation/providervalidation_test.go +++ b/internal/providervalidation/providervalidation_test.go @@ -9,13 +9,17 @@ "github.com/kopia/kopia/internal/blobtesting" "github.com/kopia/kopia/internal/providervalidation" "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/repo/blob/filesystem" ) func TestProviderValidation(t *testing.T) { ctx := testlogging.Context(t) - m := blobtesting.DataMap{} - st := blobtesting.NewMapStorage(m, nil, nil) - opt := providervalidation.DefaultOptions + st, err := filesystem.New(ctx, &filesystem.Options{ + Path: t.TempDir(), + }, false) + require.NoError(t, err) + + opt := blobtesting.TestValidationOptions opt.ConcurrencyTestDuration = 3 * time.Second require.NoError(t, providervalidation.ValidateProvider(ctx, st, opt)) } diff --git a/repo/blob/logging/logging_storage_test.go b/repo/blob/logging/logging_storage_test.go index 382088382..02be26c52 100644 --- a/repo/blob/logging/logging_storage_test.go +++ b/repo/blob/logging/logging_storage_test.go @@ -1,4 +1,4 @@ -package logging +package logging_test import ( "fmt" @@ -10,6 +10,7 @@ "github.com/kopia/kopia/internal/blobtesting" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/logging" ) func TestLoggingStorage(t *testing.T) { @@ -30,7 +31,7 @@ func TestLoggingStorage(t *testing.T) { kt := map[blob.ID]time.Time{} underlying := blobtesting.NewMapStorage(data, kt, nil) - st := NewWrapper(underlying, testlogging.Printf(myOutput, ""), myPrefix) + st := logging.NewWrapper(underlying, testlogging.Printf(myOutput, ""), myPrefix) if st == nil { t.Fatalf("unexpected result: %v", st) } diff --git a/repo/blob/rclone/rclone_storage.go b/repo/blob/rclone/rclone_storage.go index 0d6e1a5c2..000229012 100644 --- a/repo/blob/rclone/rclone_storage.go +++ b/repo/blob/rclone/rclone_storage.go @@ -3,15 +3,16 @@ import ( "bufio" + "bytes" "context" "crypto/sha256" "encoding/hex" + "encoding/json" + "net/http" "os" "os/exec" "path/filepath" "regexp" - "strings" - "sync/atomic" "time" "github.com/foomo/htpasswd" @@ -44,18 +45,39 @@ type rcloneStorage struct { cmd *exec.Cmd // running rclone temporaryDir string - allTransfersComplete atomic.Bool // set to true when rclone process emits "Transferred:*100%" - hasWrites atomic.Bool // set to true when we had any writes + remoteControlHTTPClient *http.Client + remoteControlAddr string + remoteControlUsername string + remoteControlPassword string } -func (r *rcloneStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes, opts blob.PutOptions) error { - err := r.Storage.PutBlob(ctx, b, data, opts) - if err == nil { - r.hasWrites.Store(true) - return nil +func (r *rcloneStorage) ListBlobs(ctx context.Context, blobIDPrefix blob.ID, cb func(bm blob.Metadata) error) error { + // flushing dir cache before listing blobs + if err := r.forgetVFS(ctx); err != nil { + return errors.Wrap(err, "error flushing dir cache") } - return errors.Wrap(err, "error writing blob using WebDAV") + return r.Storage.ListBlobs(ctx, blobIDPrefix, cb) //nolint:wrapcheck +} + +func (r *rcloneStorage) GetMetadata(ctx context.Context, b blob.ID) (blob.Metadata, error) { + // flushing dir cache before reading blob + if err := r.forgetVFS(ctx); err != nil { + return blob.Metadata{}, errors.Wrap(err, "error flushing dir cache") + } + + //nolint:wrapcheck + return r.Storage.GetMetadata(ctx, b) +} + +func (r *rcloneStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int64, output blob.OutputBuffer) error { + // flushing dir cache before reading blob + if err := r.forgetVFS(ctx); err != nil { + return errors.Wrap(err, "error flushing dir cache") + } + + //nolint:wrapcheck + return r.Storage.GetBlob(ctx, b, offset, length, output) } func (r *rcloneStorage) ConnectionInfo() blob.ConnectionInfo { @@ -65,22 +87,6 @@ func (r *rcloneStorage) ConnectionInfo() blob.ConnectionInfo { } } -func (r *rcloneStorage) waitForTransfersToEnd(ctx context.Context) { - if !r.hasWrites.Load() { - log(ctx).Debugf("no writes in this session, no need to wait") - return - } - - log(ctx).Debugf("waiting for background rclone transfers to complete...") - - for !r.allTransfersComplete.Load() { - log(ctx).Debugf("still waiting for background rclone transfers to complete...") - time.Sleep(1 * time.Second) - } - - log(ctx).Debugf("all background rclone transfers have completed.") -} - // Kill kills the rclone process. Used for testing. func (r *rcloneStorage) Kill() { // this will kill rclone process if any @@ -91,10 +97,6 @@ func (r *rcloneStorage) Kill() { } func (r *rcloneStorage) Close(ctx context.Context) error { - if !r.Options.NoWaitForTransfers { - r.waitForTransfersToEnd(ctx) - } - if r.Storage != nil { if err := r.Storage.Close(ctx); err != nil { return errors.Wrap(err, "error closing webdav connection") @@ -121,26 +123,60 @@ func (r *rcloneStorage) DisplayName() string { return "RClone " + r.Options.RemotePath } -func (r *rcloneStorage) processStderrStatus(ctx context.Context, statsMarker string, s *bufio.Scanner) { +func (r *rcloneStorage) processStderrStatus(ctx context.Context, s *bufio.Scanner) { for s.Scan() { l := s.Text() if r.Debug { log(ctx).Debugf("[RCLONE] %v", l) } - - if strings.Contains(l, statsMarker) { - if strings.Contains(l, " 100%,") || strings.Contains(l, ", -,") { - r.allTransfersComplete.Store(true) - } else { - r.allTransfersComplete.Store(false) - } - } } } -func (r *rcloneStorage) runRCloneAndWaitForServerAddress(ctx context.Context, c *exec.Cmd, statsMarker string, startupTimeout time.Duration) (string, error) { - rcloneAddressChan := make(chan string) +func (r *rcloneStorage) remoteControl(ctx context.Context, path string, input, output any) error { + var reqBuf bytes.Buffer + + if err := json.NewEncoder(&reqBuf).Encode(input); err != nil { + return errors.Wrap(err, "unable to serialize input") + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.remoteControlAddr+path, &reqBuf) + if err != nil { + return errors.Wrap(err, "unable to create request") + } + + req.SetBasicAuth(r.remoteControlUsername, r.remoteControlPassword) + + resp, err := r.remoteControlHTTPClient.Do(req) + if err != nil { + return errors.Wrap(err, "RC error") + } + + defer resp.Body.Close() //nolint:errcheck + + if resp.StatusCode != http.StatusOK { + return errors.Errorf("RC error: %v", resp.Status) + } + + if err := json.NewDecoder(resp.Body).Decode(&output); err != nil { + return errors.Errorf("error decoding response: %v", err) + } + + return nil +} + +func (r *rcloneStorage) forgetVFS(ctx context.Context) error { + out := map[string]any{} + return r.remoteControl(ctx, "vfs/forget", map[string]string{}, &out) +} + +type rcloneURLs struct { + webdavAddr string + remoteControlAddr string +} + +func (r *rcloneStorage) runRCloneAndWaitForServerAddress(ctx context.Context, c *exec.Cmd, startupTimeout time.Duration) (rcloneURLs, error) { + rcloneAddressChan := make(chan rcloneURLs) rcloneErrChan := make(chan error) log(ctx).Debugf("starting %v", c.Path) @@ -162,17 +198,28 @@ func (r *rcloneStorage) runRCloneAndWaitForServerAddress(ctx context.Context, c var lastOutput string - serverRegexp := regexp.MustCompile(`(?i)WebDav Server started on \[?(https://.+:\d{1,5}/)\]?`) + webdavServerRegexp := regexp.MustCompile(`(?i)WebDav Server started on \[?(https://.+:\d{1,5}/)\]?`) + remoteControlRegexp := regexp.MustCompile(`(?i)Serving remote control on \[?(https://.+:\d{1,5}/)\]?`) + + var u rcloneURLs for s.Scan() { l := s.Text() lastOutput = l - params := serverRegexp.FindStringSubmatch(l) - if params != nil { - rcloneAddressChan <- params[1] + if p := webdavServerRegexp.FindStringSubmatch(l); p != nil { + u.webdavAddr = p[1] + } - go r.processStderrStatus(ctx, statsMarker, s) + if p := remoteControlRegexp.FindStringSubmatch(l); p != nil { + u.remoteControlAddr = p[1] + } + + if u.webdavAddr != "" && u.remoteControlAddr != "" { + // return to caller when we've detected both WebDav and remote control addresses. + rcloneAddressChan <- u + + go r.processStderrStatus(ctx, s) return } @@ -190,10 +237,10 @@ func (r *rcloneStorage) runRCloneAndWaitForServerAddress(ctx context.Context, c return addr, nil case err := <-rcloneErrChan: - return "", err + return rcloneURLs{}, err case <-time.After(startupTimeout): - return "", errors.Errorf("timed out waiting for rclone to start") + return rcloneURLs{}, errors.Errorf("timed out waiting for rclone to start") } } @@ -278,6 +325,11 @@ func New(ctx context.Context, opt *Options, isCreate bool) (blob.Storage, error) // arguments. arguments = append(arguments, "--addr", "127.0.0.1:0", // allocate random port, + "--rc", + "--rc-addr", "127.0.0.1:0", // allocate random remote control port + "--rc-cert", temporaryCertPath, + "--rc-key", temporaryKeyPath, + "--rc-htpasswd", temporaryHtpassword, "--cert", temporaryCertPath, "--key", temporaryKeyPath, "--htpasswd", temporaryHtpassword, @@ -298,20 +350,31 @@ func New(ctx context.Context, opt *Options, isCreate bool) (blob.Storage, error) startupTimeout = time.Duration(opt.StartupTimeout) * time.Second } - rcloneAddr, err := r.runRCloneAndWaitForServerAddress(ctx, r.cmd, statsMarker, startupTimeout) + rcloneUrls, err := r.runRCloneAndWaitForServerAddress(ctx, r.cmd, startupTimeout) if err != nil { return nil, errors.Wrap(err, "unable to start rclone") } - log(ctx).Debugf("detected webdav address: %v", rcloneAddr) + log(ctx).Debugf("detected webdav address: %v RC: %v", rcloneUrls.webdavAddr, rcloneUrls.remoteControlAddr) fingerprintBytes := sha256.Sum256(cert.Raw) + fingerprintHexString := hex.EncodeToString(fingerprintBytes[:]) + + var cli http.Client + cli.Transport = &http.Transport{ + TLSClientConfig: tlsutil.TLSConfigTrustingSingleCertificate(fingerprintHexString), + } + + r.remoteControlHTTPClient = &cli + r.remoteControlUsername = webdavUsername + r.remoteControlPassword = webdavPassword + r.remoteControlAddr = rcloneUrls.remoteControlAddr wst, err := webdav.New(ctx, &webdav.Options{ - URL: rcloneAddr, + URL: rcloneUrls.webdavAddr, Username: webdavUsername, Password: webdavPassword, - TrustedServerCertificateFingerprint: hex.EncodeToString(fingerprintBytes[:]), + TrustedServerCertificateFingerprint: fingerprintHexString, AtomicWrites: opt.AtomicWrites, Options: opt.Options, }, isCreate) diff --git a/repo/blob/rclone/rclone_storage_test.go b/repo/blob/rclone/rclone_storage_test.go index 2d335d02d..e976008c6 100644 --- a/repo/blob/rclone/rclone_storage_test.go +++ b/repo/blob/rclone/rclone_storage_test.go @@ -21,6 +21,7 @@ "github.com/kopia/kopia/internal/blobtesting" "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/providervalidation" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/internal/testutil" "github.com/kopia/kopia/repo/blob" @@ -235,14 +236,10 @@ func TestRCloneProviders(t *testing.T) { } t.Run("Cleanup-"+name, func(t *testing.T) { - t.Parallel() - cleanupOldData(t, rcloneExe, rp) }) t.Run(name, func(t *testing.T) { - t.Parallel() - ctx := testlogging.Context(t) // we are using shared storage, append a guid so that tests don't collide @@ -258,6 +255,7 @@ func TestRCloneProviders(t *testing.T) { blob.PutOptions{}) blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st) + require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions)) // write a bunch of tiny blobs massively in parallel // and kill rclone immediately after to ensure all writes are synchronous