diff --git a/blob/gcs/gcs_storage.go b/blob/gcs/gcs_storage.go index 4370308cb..0820e4827 100644 --- a/blob/gcs/gcs_storage.go +++ b/blob/gcs/gcs_storage.go @@ -15,12 +15,16 @@ "runtime" "time" + "github.com/efarrer/iothrottler" + "github.com/kopia/kopia/blob" "github.com/skratchdot/open-golang/open" "golang.org/x/oauth2" "golang.org/x/oauth2/google" + "github.com/kopia/kopia/internal/throttle" "google.golang.org/api/googleapi" + gcsclient "google.golang.org/api/storage/v1" ) @@ -35,6 +39,9 @@ type gcsStorage struct { Options objectsService *gcsclient.ObjectsService + + downloadThrottler *iothrottler.IOThrottlerPool + uploadThrottler *iothrottler.IOThrottlerPool } func (gcs *gcsStorage) BlockSize(b string) (int64, error) { @@ -193,6 +200,20 @@ func (gcs *gcsStorage) String() string { return fmt.Sprintf("gcs://%v/%v", gcs.BucketName, gcs.Prefix) } +func (gcs *gcsStorage) SetThrottle(downloadBytesPerSecond, uploadBytesPerSecond int) error { + gcs.downloadThrottler.SetBandwidth(toBandwidth(downloadBytesPerSecond)) + gcs.uploadThrottler.SetBandwidth(toBandwidth(uploadBytesPerSecond)) + return nil +} + +func toBandwidth(bytesPerSecond int) iothrottler.Bandwidth { + if bytesPerSecond <= 0 { + return iothrottler.Unlimited + } + + return iothrottler.Bandwidth(bytesPerSecond) * iothrottler.BytesPerSecond +} + func tokenFromFile(file string) (*oauth2.Token, error) { f, err := os.Open(file) if err != nil { @@ -222,7 +243,9 @@ func saveToken(file string, token *oauth2.Token) { // but this can be disabled by setting IgnoreDefaultCredentials to true. func New(ctx context.Context, options *Options) (blob.Storage, error) { gcs := &gcsStorage{ - Options: *options, + Options: *options, + downloadThrottler: iothrottler.NewIOThrottlerPool(iothrottler.Unlimited), + uploadThrottler: iothrottler.NewIOThrottlerPool(iothrottler.Unlimited), } if gcs.BucketName == "" { @@ -240,6 +263,13 @@ func New(ctx context.Context, options *Options) (blob.Storage, error) { var client *http.Client var err error + ctx = context.WithValue(ctx, oauth2.HTTPClient, &http.Client{ + Transport: throttle.NewRoundTripper( + http.DefaultTransport, + gcs.downloadThrottler, + gcs.uploadThrottler), + }) + if !gcs.IgnoreDefaultCredentials { client, _ = google.DefaultClient(ctx, scope) } @@ -276,6 +306,7 @@ func New(ctx context.Context, options *Options) (blob.Storage, error) { saveToken(gcs.TokenCacheFile, token) } } + client = config.Client(ctx, token) } @@ -413,3 +444,4 @@ func(ctx context.Context, o interface{}) (blob.Storage, error) { } var _ blob.ConnectionInfoProvider = &gcsStorage{} +var _ blob.Throttler = &gcsStorage{} diff --git a/blob/storage.go b/blob/storage.go index fd4ba1a5d..9dd5b9aa2 100644 --- a/blob/storage.go +++ b/blob/storage.go @@ -38,3 +38,8 @@ type BlockMetadata struct { TimeStamp time.Time Error error } + +// Throttler is an interface optionally implemented by Storage that sets the upload throttle. +type Throttler interface { + SetThrottle(downloadBytesPerSecond, uploadBytesPerSecond int) error +} diff --git a/cmd/kopia/config.go b/cmd/kopia/config.go index f6673d6bc..8ba7a3065 100644 --- a/cmd/kopia/config.go +++ b/cmd/kopia/config.go @@ -33,18 +33,11 @@ passwordFile = app.Flag("passwordfile", "Read vault password from a file.").PlaceHolder("FILENAME").Envar("KOPIA_PASSWORD_FILE").ExistingFile() key = app.Flag("key", "Specify vault master key (hexadecimal).").Envar("KOPIA_KEY").Short('k').String() keyFile = app.Flag("keyfile", "Read vault master key from file.").PlaceHolder("FILENAME").Envar("KOPIA_KEY_FILE").ExistingFile() + + maxDownloadSpeed = app.Flag("max-download-speed", "Limit the download speed.").PlaceHolder("BYTES_PER_SEC").Int() + maxUploadSpeed = app.Flag("max-upload-speed", "Limit the upload speed.").PlaceHolder("BYTES_PER_SEC").Int() ) -func mustLoadLocalConfig() *config.LocalConfig { - lc, err := loadLocalConfig() - failOnError(err) - return lc -} - -func loadLocalConfig() (*config.LocalConfig, error) { - return config.LoadFromFile(vaultConfigFileName()) -} - func failOnError(err error) { if err != nil { fmt.Fprintf(os.Stderr, "ERROR: %v\n", err) @@ -53,7 +46,8 @@ func failOnError(err error) { } func getContext() context.Context { - return context.Background() + ctx := context.Background() + return ctx } func openConnection(options ...repo.RepositoryOption) (*kopia.Connection, error) { @@ -70,6 +64,14 @@ func connectionOptionsFromFlags(options ...repo.RepositoryOption) *kopia.Connect opts.TraceStorage = log.Printf } + if *maxUploadSpeed != 0 { + opts.MaxUploadSpeed = *maxUploadSpeed + } + + if *maxDownloadSpeed != 0 { + opts.MaxDownloadSpeed = *maxDownloadSpeed + } + return opts } diff --git a/connection.go b/connection.go index 1970bbb3f..0e5076ff0 100644 --- a/connection.go +++ b/connection.go @@ -4,6 +4,7 @@ "context" "errors" "fmt" + "log" "github.com/kopia/kopia/blob" "github.com/kopia/kopia/blob/caching" @@ -25,6 +26,9 @@ type ConnectionOptions struct { TraceStorage func(f string, args ...interface{}) RepositoryOptions []repo.RepositoryOption + + MaxDownloadSpeed int + MaxUploadSpeed int } // Close closes the underlying Vault and Repository. @@ -63,7 +67,7 @@ func Open(ctx context.Context, configFile string, options *ConnectionOptions) (* return nil, fmt.Errorf("invalid vault credentials: %v", err) } - rawVaultStorage, err := blob.NewStorage(ctx, lc.VaultConnection.ConnectionInfo) + rawVaultStorage, err := newStorageWithOptions(ctx, lc.VaultConnection.ConnectionInfo, options) if err != nil { return nil, fmt.Errorf("cannot open vault storage: %v", err) } @@ -86,7 +90,7 @@ func Open(ctx context.Context, configFile string, options *ConnectionOptions) (* if lc.RepoConnection == nil { repositoryStorage = rawVaultStorage } else { - repositoryStorage, err = blob.NewStorage(ctx, *lc.RepoConnection) + repositoryStorage, err = newStorageWithOptions(ctx, *lc.RepoConnection, options) if err != nil { vaultStorage.Close() return nil, err @@ -119,3 +123,21 @@ func Open(ctx context.Context, configFile string, options *ConnectionOptions) (* return &conn, nil } + +func newStorageWithOptions(ctx context.Context, cfg blob.ConnectionInfo, options *ConnectionOptions) (blob.Storage, error) { + s, err := blob.NewStorage(ctx, cfg) + if err != nil { + return nil, err + } + + if options.MaxUploadSpeed > 0 || options.MaxDownloadSpeed > 0 { + t, ok := s.(blob.Throttler) + if ok { + t.SetThrottle(options.MaxDownloadSpeed, options.MaxUploadSpeed) + } else { + log.Printf("Throttling not supported for '%v'.", cfg.Type) + } + } + + return s, nil +} diff --git a/fs/repofs/upload.go b/fs/repofs/upload.go index cbe556f46..8ec6b6a56 100644 --- a/fs/repofs/upload.go +++ b/fs/repofs/upload.go @@ -6,6 +6,7 @@ "hash/fnv" "io" "log" + "time" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/repo" @@ -61,6 +62,7 @@ type Uploader struct { func (u *Uploader) uploadFileInternal(f fs.File, relativePath string, forceStored bool) (*dirEntry, uint64, error) { log.Printf("Uploading file %v", relativePath) + t0 := time.Now() file, err := f.Open() if err != nil { return nil, 0, fmt.Errorf("unable to open file: %v", err) @@ -89,10 +91,30 @@ func (u *Uploader) uploadFileInternal(f fs.File, relativePath string, forceStore de := newDirEntry(e2, r) de.FileSize = written + dt := time.Since(t0) + log.Printf("Uploaded file %v, %v bytes in %v. %v", relativePath, written, dt, bytesPerSecond(written, dt)) return de, metadataHash(&de.EntryMetadata), nil } +func bytesPerSecond(bytes int64, duration time.Duration) string { + if duration == 0 { + return "0 B/s" + } + + bps := float64(bytes) / duration.Seconds() + + if bps >= 700000 { + return fmt.Sprintf("%.2f MB/s", bps/1000000) + } + + if bps >= 700 { + return fmt.Sprintf("%.2f KB/s", bps/1000) + } + + return fmt.Sprintf("%.2f B/s", bps) +} + func newDirEntry(md *fs.EntryMetadata, oid repo.ObjectID) *dirEntry { return &dirEntry{ EntryMetadata: *md, @@ -104,7 +126,7 @@ func (u *Uploader) uploadBundleInternal(b *bundle) (*dirEntry, uint64, error) { bundleMetadata := b.Metadata() log.Printf("uploading bundle %v (%v files)", bundleMetadata.Name, len(b.files)) - defer log.Printf("finished uploading bundle") + t0 := time.Now() writer := u.repo.NewWriter( repo.WithDescription("BUNDLE:" + bundleMetadata.Name), @@ -115,6 +137,7 @@ func (u *Uploader) uploadBundleInternal(b *bundle) (*dirEntry, uint64, error) { var err error de := newDirEntry(bundleMetadata, repo.NullObjectID) + var totalBytes int64 for _, fileEntry := range b.files { file, err := fileEntry.Open() @@ -136,11 +159,14 @@ func (u *Uploader) uploadBundleInternal(b *bundle) (*dirEntry, uint64, error) { de.BundledChildren = append(de.BundledChildren, newDirEntry(fileMetadata, repo.NullObjectID)) uploadedFiles = append(uploadedFiles, &bundledFile{metadata: fileMetadata}) + totalBytes += written file.Close() } b.files = uploadedFiles de.ObjectID, err = writer.Result(true) + dt := time.Since(t0) + log.Printf("Uploaded bundle %v (%v files) %v bytes in %v. %v", bundleMetadata.Name, len(b.files), totalBytes, dt, bytesPerSecond(totalBytes, dt)) if err != nil { return nil, 0, err } diff --git a/internal/throttle/round_tripper.go b/internal/throttle/round_tripper.go new file mode 100644 index 000000000..c59a2deb3 --- /dev/null +++ b/internal/throttle/round_tripper.go @@ -0,0 +1,44 @@ +package throttle + +import ( + "io" + "net/http" +) + +type throttlerPool interface { + AddReader(io.ReadCloser) (io.ReadCloser, error) +} + +type throttlingRoundTripper struct { + base http.RoundTripper + downloadPool throttlerPool + uploadPool throttlerPool +} + +func (rt *throttlingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + if req.Body != nil && rt.uploadPool != nil { + var err error + req.Body, err = rt.uploadPool.AddReader(req.Body) + if err != nil { + return nil, err + } + } + resp, err := rt.base.RoundTrip(req) + if resp != nil && resp.Body != nil && rt.downloadPool != nil { + resp.Body, err = rt.downloadPool.AddReader(resp.Body) + } + return resp, err +} + +// NewRoundTripper returns http.RoundTripper that throttles upload and downloads. +func NewRoundTripper(base http.RoundTripper, downloadPool throttlerPool, uploadPool throttlerPool) http.RoundTripper { + if base == nil { + base = http.DefaultTransport + } + + return &throttlingRoundTripper{ + base: base, + downloadPool: downloadPool, + uploadPool: uploadPool, + } +} diff --git a/internal/throttle/round_tripper_test.go b/internal/throttle/round_tripper_test.go new file mode 100644 index 000000000..7ef1dba31 --- /dev/null +++ b/internal/throttle/round_tripper_test.go @@ -0,0 +1,103 @@ +package throttle + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "net/http" + "testing" +) + +type baseRoundTripper struct { + responses map[*http.Request]*http.Response +} + +func (rt *baseRoundTripper) add(req *http.Request, resp *http.Response) (*http.Request, *http.Response) { + rt.responses[req] = resp + return req, resp +} + +func (rt *baseRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + resp := rt.responses[req] + if resp != nil { + return resp, nil + } + + return nil, fmt.Errorf("error occurred") +} + +type fakePool struct { + readers []io.ReadCloser +} + +func (fp *fakePool) reset() { + fp.readers = nil +} + +func (fp *fakePool) AddReader(r io.ReadCloser) (io.ReadCloser, error) { + fp.readers = append(fp.readers, r) + return r, nil +} + +func TestRoundTripper(t *testing.T) { + downloadBody := ioutil.NopCloser(bytes.NewReader([]byte("data1"))) + uploadBody := ioutil.NopCloser(bytes.NewReader([]byte("data1"))) + + base := &baseRoundTripper{ + responses: make(map[*http.Request]*http.Response), + } + downloadPool := &fakePool{} + uploadPool := &fakePool{} + rt := NewRoundTripper(base, downloadPool, uploadPool) + + // Empty request (no request, no reponse) + uploadPool.reset() + downloadPool.reset() + req1, resp1 := base.add(&http.Request{}, &http.Response{}) + resp, err := rt.RoundTrip(req1) + if resp != resp1 || err != nil { + t.Errorf("invalid response or error: %v", err) + } + if len(downloadPool.readers) != 0 || len(uploadPool.readers) != 0 { + t.Errorf("invalid pool contents: %v %v", downloadPool.readers, uploadPool.readers) + } + + // Upload request + uploadPool.reset() + downloadPool.reset() + req2, resp2 := base.add(&http.Request{ + Body: uploadBody, + }, &http.Response{}) + resp, err = rt.RoundTrip(req2) + if resp != resp2 || err != nil { + t.Errorf("invalid response or error: %v", err) + } + if len(downloadPool.readers) != 0 || len(uploadPool.readers) != 1 { + t.Errorf("invalid pool contents: %v %v", downloadPool.readers, uploadPool.readers) + } + + // Download request + uploadPool.reset() + downloadPool.reset() + req3, resp3 := base.add(&http.Request{}, &http.Response{Body: downloadBody}) + resp, err = rt.RoundTrip(req3) + if resp != resp3 || err != nil { + t.Errorf("invalid response or error: %v", err) + } + if len(downloadPool.readers) != 1 || len(uploadPool.readers) != 0 { + t.Errorf("invalid pool contents: %v %v", downloadPool.readers, uploadPool.readers) + } + + // Upload/Download request + uploadPool.reset() + downloadPool.reset() + req4, resp4 := base.add(&http.Request{Body: uploadBody}, &http.Response{Body: downloadBody}) + resp, err = rt.RoundTrip(req4) + if resp != resp4 || err != nil { + t.Errorf("invalid response or error: %v", err) + } + if len(downloadPool.readers) != 1 || len(uploadPool.readers) != 1 { + t.Errorf("invalid pool contents: %v %v", downloadPool.readers, uploadPool.readers) + } +}