mirror of
https://github.com/kopia/kopia.git
synced 2025-12-23 22:57:50 -05:00
fix(providers): fixed list/get caching with rclone providers (#3284)
Added improved providervalidation logic which tests for read-after-write property between connections. The new test was failing before the change and is now passing for Google Drive, OneDrive and DropBox.
This commit is contained in:
@@ -19,6 +19,7 @@ func (c *commandRepositoryValidateProvider) setup(svc advancedAppServices, paren
|
||||
|
||||
c.opt = providervalidation.DefaultOptions
|
||||
|
||||
cmd.Flag("num-storage-connections", "Number of storage connections").IntVar(&c.opt.NumEquivalentStorageConnections)
|
||||
cmd.Flag("concurrency-test-duration", "Duration of concurrency test").DurationVar(&c.opt.ConcurrencyTestDuration)
|
||||
cmd.Flag("put-blob-workers", "Number of PutBlob workers").IntVar(&c.opt.NumPutBlobWorkers)
|
||||
cmd.Flag("get-blob-workers", "Number of GetBlob workers").IntVar(&c.opt.NumGetBlobWorkers)
|
||||
|
||||
@@ -231,11 +231,12 @@ func AssertConnectionInfoRoundTrips(ctx context.Context, t *testing.T, s blob.St
|
||||
//
|
||||
//nolint:gomnd
|
||||
var TestValidationOptions = providervalidation.Options{
|
||||
MaxClockDrift: 3 * time.Minute,
|
||||
ConcurrencyTestDuration: 15 * time.Second,
|
||||
NumPutBlobWorkers: 3,
|
||||
NumGetBlobWorkers: 3,
|
||||
NumGetMetadataWorkers: 3,
|
||||
NumListBlobsWorkers: 3,
|
||||
MaxBlobLength: 10e6,
|
||||
MaxClockDrift: 3 * time.Minute,
|
||||
ConcurrencyTestDuration: 15 * time.Second,
|
||||
NumEquivalentStorageConnections: 5,
|
||||
NumPutBlobWorkers: 3,
|
||||
NumGetBlobWorkers: 3,
|
||||
NumGetMetadataWorkers: 3,
|
||||
NumListBlobsWorkers: 3,
|
||||
MaxBlobLength: 10e6,
|
||||
}
|
||||
|
||||
@@ -12,12 +12,15 @@
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/multierr"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/logging"
|
||||
|
||||
loggingwrapper "github.com/kopia/kopia/repo/blob/logging"
|
||||
)
|
||||
|
||||
// Options provides options for provider validation.
|
||||
@@ -25,6 +28,8 @@ type Options struct {
|
||||
MaxClockDrift time.Duration
|
||||
ConcurrencyTestDuration time.Duration
|
||||
|
||||
NumEquivalentStorageConnections int
|
||||
|
||||
NumPutBlobWorkers int
|
||||
NumGetBlobWorkers int
|
||||
NumGetMetadataWorkers int
|
||||
@@ -36,35 +41,92 @@ type Options struct {
|
||||
//
|
||||
//nolint:gomnd,gochecknoglobals
|
||||
var DefaultOptions = Options{
|
||||
MaxClockDrift: 3 * time.Minute,
|
||||
ConcurrencyTestDuration: 30 * time.Second,
|
||||
NumPutBlobWorkers: 3,
|
||||
NumGetBlobWorkers: 3,
|
||||
NumGetMetadataWorkers: 3,
|
||||
NumListBlobsWorkers: 3,
|
||||
MaxBlobLength: 10e6,
|
||||
MaxClockDrift: 3 * time.Minute,
|
||||
ConcurrencyTestDuration: 30 * time.Second,
|
||||
NumEquivalentStorageConnections: 5,
|
||||
NumPutBlobWorkers: 3,
|
||||
NumGetBlobWorkers: 3,
|
||||
NumGetMetadataWorkers: 3,
|
||||
NumListBlobsWorkers: 3,
|
||||
MaxBlobLength: 10e6,
|
||||
}
|
||||
|
||||
var log = logging.Module("providervalidation")
|
||||
|
||||
// equivalentBlobStorageConnections is a slice of different instances of the same blob storage provider
|
||||
// connecting to the same underlying storage.
|
||||
type equivalentBlobStorageConnections []blob.Storage
|
||||
|
||||
func (st equivalentBlobStorageConnections) pickOne() blob.Storage {
|
||||
return st[rand.Intn(len(st))] //nolint:gosec
|
||||
}
|
||||
|
||||
// closeAdditional closes all but the first connection to the underlying storage.
|
||||
func (st equivalentBlobStorageConnections) closeAdditional(ctx context.Context) error {
|
||||
var err error
|
||||
|
||||
for i := 1; i < len(st); i++ {
|
||||
err = multierr.Combine(err, st[i].Close(ctx))
|
||||
}
|
||||
|
||||
return errors.Wrap(err, "error closing additional connections")
|
||||
}
|
||||
|
||||
// openEquivalentStorageConnections creates n-1 additional connections to the same underlying storage
|
||||
// and returns a slice of all connections.
|
||||
func openEquivalentStorageConnections(ctx context.Context, st blob.Storage, n int) (equivalentBlobStorageConnections, error) {
|
||||
result := equivalentBlobStorageConnections{st}
|
||||
ci := st.ConnectionInfo()
|
||||
|
||||
log(ctx).Infof("Opening %v equivalent storage connections...", n-1)
|
||||
|
||||
for i := 1; i < n; i++ {
|
||||
c, err := blob.NewStorage(ctx, ci, false)
|
||||
if err != nil {
|
||||
if cerr := result.closeAdditional(ctx); cerr != nil {
|
||||
log(ctx).Warn("unable to close storage connection", "err", cerr)
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "unable to open storage connection")
|
||||
}
|
||||
|
||||
log(ctx).Debugw("opened equivalent storage connection", "connectionID", i)
|
||||
|
||||
result = append(result, loggingwrapper.NewWrapper(c, log(ctx), fmt.Sprintf("[STORAGE-%v] ", i)))
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// ValidateProvider runs a series of tests against provided storage to validate that
|
||||
// it can be used with Kopia.
|
||||
//
|
||||
//nolint:gomnd,funlen,gocyclo,cyclop
|
||||
func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error {
|
||||
func ValidateProvider(ctx context.Context, st0 blob.Storage, opt Options) error {
|
||||
if os.Getenv("KOPIA_SKIP_PROVIDER_VALIDATION") != "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
st, err := openEquivalentStorageConnections(ctx, st0, opt.NumEquivalentStorageConnections)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to open additional storage connections")
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if cerr := st.closeAdditional(ctx); cerr != nil {
|
||||
log(ctx).Warn("unable to close additional connections", "err", cerr)
|
||||
}
|
||||
}()
|
||||
|
||||
uberPrefix := blob.ID("z" + uuid.NewString())
|
||||
defer cleanupAllBlobs(ctx, st, uberPrefix)
|
||||
defer cleanupAllBlobs(ctx, st[0], uberPrefix)
|
||||
|
||||
prefix1 := uberPrefix + "a"
|
||||
prefix2 := uberPrefix + "b"
|
||||
|
||||
log(ctx).Infof("Validating storage capacity and usage")
|
||||
|
||||
c, err := st.GetCapacity(ctx)
|
||||
c, err := st.pickOne().GetCapacity(ctx)
|
||||
|
||||
switch {
|
||||
case errors.Is(err, blob.ErrNotAVolume):
|
||||
@@ -77,7 +139,7 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error {
|
||||
|
||||
log(ctx).Infof("Validating blob list responses")
|
||||
|
||||
if err := verifyBlobCount(ctx, st, uberPrefix, 0); err != nil {
|
||||
if err := verifyBlobCount(ctx, st.pickOne(), uberPrefix, 0); err != nil {
|
||||
return errors.Wrap(err, "invalid blob count")
|
||||
}
|
||||
|
||||
@@ -87,17 +149,17 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error {
|
||||
defer out.Close()
|
||||
|
||||
// read non-existent full blob
|
||||
if err := st.GetBlob(ctx, prefix1+"1", 0, -1, &out); !errors.Is(err, blob.ErrBlobNotFound) {
|
||||
if err := st.pickOne().GetBlob(ctx, prefix1+"1", 0, -1, &out); !errors.Is(err, blob.ErrBlobNotFound) {
|
||||
return errors.Errorf("got unexpected error when reading non-existent blob: %v", err)
|
||||
}
|
||||
|
||||
// read non-existent partial blob
|
||||
if err := st.GetBlob(ctx, prefix1+"1", 0, 5, &out); !errors.Is(err, blob.ErrBlobNotFound) {
|
||||
if err := st.pickOne().GetBlob(ctx, prefix1+"1", 0, 5, &out); !errors.Is(err, blob.ErrBlobNotFound) {
|
||||
return errors.Errorf("got unexpected error when reading non-existent partial blob: %v", err)
|
||||
}
|
||||
|
||||
// get metadata for non-existent blob
|
||||
if _, err := st.GetMetadata(ctx, prefix1+"1"); !errors.Is(err, blob.ErrBlobNotFound) {
|
||||
if _, err := st.pickOne().GetMetadata(ctx, prefix1+"1"); !errors.Is(err, blob.ErrBlobNotFound) {
|
||||
return errors.Errorf("got unexpected error when getting metadata for non-existent blob: %v", err)
|
||||
}
|
||||
|
||||
@@ -106,13 +168,13 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error {
|
||||
log(ctx).Infof("Writing blob (%v bytes)", len(blobData))
|
||||
|
||||
// write blob
|
||||
if err := st.PutBlob(ctx, prefix1+"1", gather.FromSlice(blobData), blob.PutOptions{}); err != nil {
|
||||
if err := st.pickOne().PutBlob(ctx, prefix1+"1", gather.FromSlice(blobData), blob.PutOptions{}); err != nil {
|
||||
return errors.Wrap(err, "error writing blob #1")
|
||||
}
|
||||
|
||||
log(ctx).Infof("Validating conditional creates...")
|
||||
|
||||
err2 := st.PutBlob(ctx, prefix1+"1", gather.FromSlice([]byte{99}), blob.PutOptions{DoNotRecreate: true})
|
||||
err2 := st.pickOne().PutBlob(ctx, prefix1+"1", gather.FromSlice([]byte{99}), blob.PutOptions{DoNotRecreate: true})
|
||||
|
||||
switch {
|
||||
case errors.Is(err2, blob.ErrUnsupportedPutBlobOption):
|
||||
@@ -126,15 +188,15 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error {
|
||||
|
||||
log(ctx).Infof("Validating list responses...")
|
||||
|
||||
if err := verifyBlobCount(ctx, st, uberPrefix, 1); err != nil {
|
||||
if err := verifyBlobCount(ctx, st.pickOne(), uberPrefix, 1); err != nil {
|
||||
return errors.Wrap(err, "invalid uber blob count")
|
||||
}
|
||||
|
||||
if err := verifyBlobCount(ctx, st, prefix1, 1); err != nil {
|
||||
if err := verifyBlobCount(ctx, st.pickOne(), prefix1, 1); err != nil {
|
||||
return errors.Wrap(err, "invalid blob count with prefix 1")
|
||||
}
|
||||
|
||||
if err := verifyBlobCount(ctx, st, prefix2, 0); err != nil {
|
||||
if err := verifyBlobCount(ctx, st.pickOne(), prefix2, 0); err != nil {
|
||||
return errors.Wrap(err, "invalid blob count with prefix 2")
|
||||
}
|
||||
|
||||
@@ -152,7 +214,7 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error {
|
||||
}
|
||||
|
||||
for _, tc := range partialBlobCases {
|
||||
err := st.GetBlob(ctx, prefix1+"1", tc.offset, tc.length, &out)
|
||||
err := st.pickOne().GetBlob(ctx, prefix1+"1", tc.offset, tc.length, &out)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "got unexpected error when reading partial blob @%v+%v", tc.offset, tc.length)
|
||||
}
|
||||
@@ -165,7 +227,7 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error {
|
||||
log(ctx).Infof("Validating full reads...")
|
||||
|
||||
// read full blob
|
||||
err2 = st.GetBlob(ctx, prefix1+"1", 0, -1, &out)
|
||||
err2 = st.pickOne().GetBlob(ctx, prefix1+"1", 0, -1, &out)
|
||||
if err2 != nil {
|
||||
return errors.Wrap(err2, "got unexpected error when reading partial blob")
|
||||
}
|
||||
@@ -177,7 +239,7 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error {
|
||||
log(ctx).Infof("Validating metadata...")
|
||||
|
||||
// get metadata for non-existent blob
|
||||
bm, err2 := st.GetMetadata(ctx, prefix1+"1")
|
||||
bm, err2 := st.pickOne().GetMetadata(ctx, prefix1+"1")
|
||||
if err2 != nil {
|
||||
return errors.Wrap(err2, "got unexpected error when getting metadata for blob")
|
||||
}
|
||||
@@ -216,7 +278,7 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error {
|
||||
|
||||
type concurrencyTest struct {
|
||||
opt Options
|
||||
st blob.Storage
|
||||
st equivalentBlobStorageConnections
|
||||
prefix blob.ID
|
||||
deadline time.Time
|
||||
|
||||
@@ -229,7 +291,7 @@ type concurrencyTest struct {
|
||||
blobWritten map[blob.ID]bool
|
||||
}
|
||||
|
||||
func newConcurrencyTest(st blob.Storage, prefix blob.ID, opt Options) *concurrencyTest {
|
||||
func newConcurrencyTest(st []blob.Storage, prefix blob.ID, opt Options) *concurrencyTest {
|
||||
return &concurrencyTest{
|
||||
opt: opt,
|
||||
st: st,
|
||||
@@ -271,7 +333,7 @@ func (c *concurrencyTest) putBlobWorker(ctx context.Context, worker int) func()
|
||||
|
||||
log(ctx).Debugf("PutBlob worker %v writing %v (%v bytes)", worker, id, len(data))
|
||||
|
||||
if err := c.st.PutBlob(ctx, id, gather.FromSlice(data), blob.PutOptions{}); err != nil {
|
||||
if err := c.st.pickOne().PutBlob(ctx, id, gather.FromSlice(data), blob.PutOptions{}); err != nil {
|
||||
return errors.Wrap(err, "error writing blob")
|
||||
}
|
||||
|
||||
@@ -321,7 +383,7 @@ func (c *concurrencyTest) getBlobWorker(ctx context.Context, worker int) func()
|
||||
|
||||
log(ctx).Debugf("GetBlob worker %v reading %v", worker, blobID)
|
||||
|
||||
err := c.st.GetBlob(ctx, blobID, 0, -1, &out)
|
||||
err := c.st.pickOne().GetBlob(ctx, blobID, 0, -1, &out)
|
||||
if err != nil {
|
||||
if !errors.Is(err, blob.ErrBlobNotFound) || fullyWritten {
|
||||
return errors.Wrapf(err, "unexpected error when reading %v", blobID)
|
||||
@@ -360,7 +422,7 @@ func (c *concurrencyTest) getMetadataWorker(ctx context.Context, worker int) fun
|
||||
|
||||
log(ctx).Debugf("GetMetadata worker %v: %v", worker, blobID)
|
||||
|
||||
bm, err := c.st.GetMetadata(ctx, blobID)
|
||||
bm, err := c.st.pickOne().GetMetadata(ctx, blobID)
|
||||
if err != nil {
|
||||
if !errors.Is(err, blob.ErrBlobNotFound) || fullyWritten {
|
||||
return errors.Wrapf(err, "unexpected error when reading %v", blobID)
|
||||
|
||||
@@ -9,13 +9,17 @@
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/providervalidation"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/repo/blob/filesystem"
|
||||
)
|
||||
|
||||
func TestProviderValidation(t *testing.T) {
|
||||
ctx := testlogging.Context(t)
|
||||
m := blobtesting.DataMap{}
|
||||
st := blobtesting.NewMapStorage(m, nil, nil)
|
||||
opt := providervalidation.DefaultOptions
|
||||
st, err := filesystem.New(ctx, &filesystem.Options{
|
||||
Path: t.TempDir(),
|
||||
}, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
opt := blobtesting.TestValidationOptions
|
||||
opt.ConcurrencyTestDuration = 3 * time.Second
|
||||
require.NoError(t, providervalidation.ValidateProvider(ctx, st, opt))
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package logging
|
||||
package logging_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -10,6 +10,7 @@
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/blob/logging"
|
||||
)
|
||||
|
||||
func TestLoggingStorage(t *testing.T) {
|
||||
@@ -30,7 +31,7 @@ func TestLoggingStorage(t *testing.T) {
|
||||
kt := map[blob.ID]time.Time{}
|
||||
underlying := blobtesting.NewMapStorage(data, kt, nil)
|
||||
|
||||
st := NewWrapper(underlying, testlogging.Printf(myOutput, ""), myPrefix)
|
||||
st := logging.NewWrapper(underlying, testlogging.Printf(myOutput, ""), myPrefix)
|
||||
if st == nil {
|
||||
t.Fatalf("unexpected result: %v", st)
|
||||
}
|
||||
|
||||
@@ -3,15 +3,16 @@
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/foomo/htpasswd"
|
||||
@@ -44,18 +45,39 @@ type rcloneStorage struct {
|
||||
cmd *exec.Cmd // running rclone
|
||||
temporaryDir string
|
||||
|
||||
allTransfersComplete atomic.Bool // set to true when rclone process emits "Transferred:*100%"
|
||||
hasWrites atomic.Bool // set to true when we had any writes
|
||||
remoteControlHTTPClient *http.Client
|
||||
remoteControlAddr string
|
||||
remoteControlUsername string
|
||||
remoteControlPassword string
|
||||
}
|
||||
|
||||
func (r *rcloneStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes, opts blob.PutOptions) error {
|
||||
err := r.Storage.PutBlob(ctx, b, data, opts)
|
||||
if err == nil {
|
||||
r.hasWrites.Store(true)
|
||||
return nil
|
||||
func (r *rcloneStorage) ListBlobs(ctx context.Context, blobIDPrefix blob.ID, cb func(bm blob.Metadata) error) error {
|
||||
// flushing dir cache before listing blobs
|
||||
if err := r.forgetVFS(ctx); err != nil {
|
||||
return errors.Wrap(err, "error flushing dir cache")
|
||||
}
|
||||
|
||||
return errors.Wrap(err, "error writing blob using WebDAV")
|
||||
return r.Storage.ListBlobs(ctx, blobIDPrefix, cb) //nolint:wrapcheck
|
||||
}
|
||||
|
||||
func (r *rcloneStorage) GetMetadata(ctx context.Context, b blob.ID) (blob.Metadata, error) {
|
||||
// flushing dir cache before reading blob
|
||||
if err := r.forgetVFS(ctx); err != nil {
|
||||
return blob.Metadata{}, errors.Wrap(err, "error flushing dir cache")
|
||||
}
|
||||
|
||||
//nolint:wrapcheck
|
||||
return r.Storage.GetMetadata(ctx, b)
|
||||
}
|
||||
|
||||
func (r *rcloneStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int64, output blob.OutputBuffer) error {
|
||||
// flushing dir cache before reading blob
|
||||
if err := r.forgetVFS(ctx); err != nil {
|
||||
return errors.Wrap(err, "error flushing dir cache")
|
||||
}
|
||||
|
||||
//nolint:wrapcheck
|
||||
return r.Storage.GetBlob(ctx, b, offset, length, output)
|
||||
}
|
||||
|
||||
func (r *rcloneStorage) ConnectionInfo() blob.ConnectionInfo {
|
||||
@@ -65,22 +87,6 @@ func (r *rcloneStorage) ConnectionInfo() blob.ConnectionInfo {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rcloneStorage) waitForTransfersToEnd(ctx context.Context) {
|
||||
if !r.hasWrites.Load() {
|
||||
log(ctx).Debugf("no writes in this session, no need to wait")
|
||||
return
|
||||
}
|
||||
|
||||
log(ctx).Debugf("waiting for background rclone transfers to complete...")
|
||||
|
||||
for !r.allTransfersComplete.Load() {
|
||||
log(ctx).Debugf("still waiting for background rclone transfers to complete...")
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
log(ctx).Debugf("all background rclone transfers have completed.")
|
||||
}
|
||||
|
||||
// Kill kills the rclone process. Used for testing.
|
||||
func (r *rcloneStorage) Kill() {
|
||||
// this will kill rclone process if any
|
||||
@@ -91,10 +97,6 @@ func (r *rcloneStorage) Kill() {
|
||||
}
|
||||
|
||||
func (r *rcloneStorage) Close(ctx context.Context) error {
|
||||
if !r.Options.NoWaitForTransfers {
|
||||
r.waitForTransfersToEnd(ctx)
|
||||
}
|
||||
|
||||
if r.Storage != nil {
|
||||
if err := r.Storage.Close(ctx); err != nil {
|
||||
return errors.Wrap(err, "error closing webdav connection")
|
||||
@@ -121,26 +123,60 @@ func (r *rcloneStorage) DisplayName() string {
|
||||
return "RClone " + r.Options.RemotePath
|
||||
}
|
||||
|
||||
func (r *rcloneStorage) processStderrStatus(ctx context.Context, statsMarker string, s *bufio.Scanner) {
|
||||
func (r *rcloneStorage) processStderrStatus(ctx context.Context, s *bufio.Scanner) {
|
||||
for s.Scan() {
|
||||
l := s.Text()
|
||||
|
||||
if r.Debug {
|
||||
log(ctx).Debugf("[RCLONE] %v", l)
|
||||
}
|
||||
|
||||
if strings.Contains(l, statsMarker) {
|
||||
if strings.Contains(l, " 100%,") || strings.Contains(l, ", -,") {
|
||||
r.allTransfersComplete.Store(true)
|
||||
} else {
|
||||
r.allTransfersComplete.Store(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rcloneStorage) runRCloneAndWaitForServerAddress(ctx context.Context, c *exec.Cmd, statsMarker string, startupTimeout time.Duration) (string, error) {
|
||||
rcloneAddressChan := make(chan string)
|
||||
func (r *rcloneStorage) remoteControl(ctx context.Context, path string, input, output any) error {
|
||||
var reqBuf bytes.Buffer
|
||||
|
||||
if err := json.NewEncoder(&reqBuf).Encode(input); err != nil {
|
||||
return errors.Wrap(err, "unable to serialize input")
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.remoteControlAddr+path, &reqBuf)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to create request")
|
||||
}
|
||||
|
||||
req.SetBasicAuth(r.remoteControlUsername, r.remoteControlPassword)
|
||||
|
||||
resp, err := r.remoteControlHTTPClient.Do(req)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "RC error")
|
||||
}
|
||||
|
||||
defer resp.Body.Close() //nolint:errcheck
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return errors.Errorf("RC error: %v", resp.Status)
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(resp.Body).Decode(&output); err != nil {
|
||||
return errors.Errorf("error decoding response: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rcloneStorage) forgetVFS(ctx context.Context) error {
|
||||
out := map[string]any{}
|
||||
return r.remoteControl(ctx, "vfs/forget", map[string]string{}, &out)
|
||||
}
|
||||
|
||||
type rcloneURLs struct {
|
||||
webdavAddr string
|
||||
remoteControlAddr string
|
||||
}
|
||||
|
||||
func (r *rcloneStorage) runRCloneAndWaitForServerAddress(ctx context.Context, c *exec.Cmd, startupTimeout time.Duration) (rcloneURLs, error) {
|
||||
rcloneAddressChan := make(chan rcloneURLs)
|
||||
rcloneErrChan := make(chan error)
|
||||
|
||||
log(ctx).Debugf("starting %v", c.Path)
|
||||
@@ -162,17 +198,28 @@ func (r *rcloneStorage) runRCloneAndWaitForServerAddress(ctx context.Context, c
|
||||
|
||||
var lastOutput string
|
||||
|
||||
serverRegexp := regexp.MustCompile(`(?i)WebDav Server started on \[?(https://.+:\d{1,5}/)\]?`)
|
||||
webdavServerRegexp := regexp.MustCompile(`(?i)WebDav Server started on \[?(https://.+:\d{1,5}/)\]?`)
|
||||
remoteControlRegexp := regexp.MustCompile(`(?i)Serving remote control on \[?(https://.+:\d{1,5}/)\]?`)
|
||||
|
||||
var u rcloneURLs
|
||||
|
||||
for s.Scan() {
|
||||
l := s.Text()
|
||||
lastOutput = l
|
||||
|
||||
params := serverRegexp.FindStringSubmatch(l)
|
||||
if params != nil {
|
||||
rcloneAddressChan <- params[1]
|
||||
if p := webdavServerRegexp.FindStringSubmatch(l); p != nil {
|
||||
u.webdavAddr = p[1]
|
||||
}
|
||||
|
||||
go r.processStderrStatus(ctx, statsMarker, s)
|
||||
if p := remoteControlRegexp.FindStringSubmatch(l); p != nil {
|
||||
u.remoteControlAddr = p[1]
|
||||
}
|
||||
|
||||
if u.webdavAddr != "" && u.remoteControlAddr != "" {
|
||||
// return to caller when we've detected both WebDav and remote control addresses.
|
||||
rcloneAddressChan <- u
|
||||
|
||||
go r.processStderrStatus(ctx, s)
|
||||
|
||||
return
|
||||
}
|
||||
@@ -190,10 +237,10 @@ func (r *rcloneStorage) runRCloneAndWaitForServerAddress(ctx context.Context, c
|
||||
return addr, nil
|
||||
|
||||
case err := <-rcloneErrChan:
|
||||
return "", err
|
||||
return rcloneURLs{}, err
|
||||
|
||||
case <-time.After(startupTimeout):
|
||||
return "", errors.Errorf("timed out waiting for rclone to start")
|
||||
return rcloneURLs{}, errors.Errorf("timed out waiting for rclone to start")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -278,6 +325,11 @@ func New(ctx context.Context, opt *Options, isCreate bool) (blob.Storage, error)
|
||||
// arguments.
|
||||
arguments = append(arguments,
|
||||
"--addr", "127.0.0.1:0", // allocate random port,
|
||||
"--rc",
|
||||
"--rc-addr", "127.0.0.1:0", // allocate random remote control port
|
||||
"--rc-cert", temporaryCertPath,
|
||||
"--rc-key", temporaryKeyPath,
|
||||
"--rc-htpasswd", temporaryHtpassword,
|
||||
"--cert", temporaryCertPath,
|
||||
"--key", temporaryKeyPath,
|
||||
"--htpasswd", temporaryHtpassword,
|
||||
@@ -298,20 +350,31 @@ func New(ctx context.Context, opt *Options, isCreate bool) (blob.Storage, error)
|
||||
startupTimeout = time.Duration(opt.StartupTimeout) * time.Second
|
||||
}
|
||||
|
||||
rcloneAddr, err := r.runRCloneAndWaitForServerAddress(ctx, r.cmd, statsMarker, startupTimeout)
|
||||
rcloneUrls, err := r.runRCloneAndWaitForServerAddress(ctx, r.cmd, startupTimeout)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to start rclone")
|
||||
}
|
||||
|
||||
log(ctx).Debugf("detected webdav address: %v", rcloneAddr)
|
||||
log(ctx).Debugf("detected webdav address: %v RC: %v", rcloneUrls.webdavAddr, rcloneUrls.remoteControlAddr)
|
||||
|
||||
fingerprintBytes := sha256.Sum256(cert.Raw)
|
||||
fingerprintHexString := hex.EncodeToString(fingerprintBytes[:])
|
||||
|
||||
var cli http.Client
|
||||
cli.Transport = &http.Transport{
|
||||
TLSClientConfig: tlsutil.TLSConfigTrustingSingleCertificate(fingerprintHexString),
|
||||
}
|
||||
|
||||
r.remoteControlHTTPClient = &cli
|
||||
r.remoteControlUsername = webdavUsername
|
||||
r.remoteControlPassword = webdavPassword
|
||||
r.remoteControlAddr = rcloneUrls.remoteControlAddr
|
||||
|
||||
wst, err := webdav.New(ctx, &webdav.Options{
|
||||
URL: rcloneAddr,
|
||||
URL: rcloneUrls.webdavAddr,
|
||||
Username: webdavUsername,
|
||||
Password: webdavPassword,
|
||||
TrustedServerCertificateFingerprint: hex.EncodeToString(fingerprintBytes[:]),
|
||||
TrustedServerCertificateFingerprint: fingerprintHexString,
|
||||
AtomicWrites: opt.AtomicWrites,
|
||||
Options: opt.Options,
|
||||
}, isCreate)
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/providervalidation"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/internal/testutil"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
@@ -235,14 +236,10 @@ func TestRCloneProviders(t *testing.T) {
|
||||
}
|
||||
|
||||
t.Run("Cleanup-"+name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cleanupOldData(t, rcloneExe, rp)
|
||||
})
|
||||
|
||||
t.Run(name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
|
||||
// we are using shared storage, append a guid so that tests don't collide
|
||||
@@ -258,6 +255,7 @@ func TestRCloneProviders(t *testing.T) {
|
||||
blob.PutOptions{})
|
||||
|
||||
blobtesting.AssertConnectionInfoRoundTrips(ctx, t, st)
|
||||
require.NoError(t, providervalidation.ValidateProvider(ctx, st, blobtesting.TestValidationOptions))
|
||||
|
||||
// write a bunch of tiny blobs massively in parallel
|
||||
// and kill rclone immediately after to ensure all writes are synchronous
|
||||
|
||||
Reference in New Issue
Block a user