Added support for upload and download throttling for GCS.

This commit is contained in:
Jarek Kowalski
2016-09-19 19:03:35 -07:00
parent 5fea168603
commit 88e3366a41
7 changed files with 249 additions and 15 deletions

View File

@@ -15,12 +15,16 @@
"runtime"
"time"
"github.com/efarrer/iothrottler"
"github.com/kopia/kopia/blob"
"github.com/skratchdot/open-golang/open"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"github.com/kopia/kopia/internal/throttle"
"google.golang.org/api/googleapi"
gcsclient "google.golang.org/api/storage/v1"
)
@@ -35,6 +39,9 @@
type gcsStorage struct {
Options
objectsService *gcsclient.ObjectsService
downloadThrottler *iothrottler.IOThrottlerPool
uploadThrottler *iothrottler.IOThrottlerPool
}
func (gcs *gcsStorage) BlockSize(b string) (int64, error) {
@@ -193,6 +200,20 @@ func (gcs *gcsStorage) String() string {
return fmt.Sprintf("gcs://%v/%v", gcs.BucketName, gcs.Prefix)
}
func (gcs *gcsStorage) SetThrottle(downloadBytesPerSecond, uploadBytesPerSecond int) error {
gcs.downloadThrottler.SetBandwidth(toBandwidth(downloadBytesPerSecond))
gcs.uploadThrottler.SetBandwidth(toBandwidth(uploadBytesPerSecond))
return nil
}
func toBandwidth(bytesPerSecond int) iothrottler.Bandwidth {
if bytesPerSecond <= 0 {
return iothrottler.Unlimited
}
return iothrottler.Bandwidth(bytesPerSecond) * iothrottler.BytesPerSecond
}
func tokenFromFile(file string) (*oauth2.Token, error) {
f, err := os.Open(file)
if err != nil {
@@ -222,7 +243,9 @@ func saveToken(file string, token *oauth2.Token) {
// but this can be disabled by setting IgnoreDefaultCredentials to true.
func New(ctx context.Context, options *Options) (blob.Storage, error) {
gcs := &gcsStorage{
Options: *options,
Options: *options,
downloadThrottler: iothrottler.NewIOThrottlerPool(iothrottler.Unlimited),
uploadThrottler: iothrottler.NewIOThrottlerPool(iothrottler.Unlimited),
}
if gcs.BucketName == "" {
@@ -240,6 +263,13 @@ func New(ctx context.Context, options *Options) (blob.Storage, error) {
var client *http.Client
var err error
ctx = context.WithValue(ctx, oauth2.HTTPClient, &http.Client{
Transport: throttle.NewRoundTripper(
http.DefaultTransport,
gcs.downloadThrottler,
gcs.uploadThrottler),
})
if !gcs.IgnoreDefaultCredentials {
client, _ = google.DefaultClient(ctx, scope)
}
@@ -276,6 +306,7 @@ func New(ctx context.Context, options *Options) (blob.Storage, error) {
saveToken(gcs.TokenCacheFile, token)
}
}
client = config.Client(ctx, token)
}
@@ -413,3 +444,4 @@ func(ctx context.Context, o interface{}) (blob.Storage, error) {
}
var _ blob.ConnectionInfoProvider = &gcsStorage{}
var _ blob.Throttler = &gcsStorage{}

View File

@@ -38,3 +38,8 @@ type BlockMetadata struct {
TimeStamp time.Time
Error error
}
// Throttler is an interface optionally implemented by Storage that sets the upload throttle.
type Throttler interface {
SetThrottle(downloadBytesPerSecond, uploadBytesPerSecond int) error
}

View File

@@ -33,18 +33,11 @@
passwordFile = app.Flag("passwordfile", "Read vault password from a file.").PlaceHolder("FILENAME").Envar("KOPIA_PASSWORD_FILE").ExistingFile()
key = app.Flag("key", "Specify vault master key (hexadecimal).").Envar("KOPIA_KEY").Short('k').String()
keyFile = app.Flag("keyfile", "Read vault master key from file.").PlaceHolder("FILENAME").Envar("KOPIA_KEY_FILE").ExistingFile()
maxDownloadSpeed = app.Flag("max-download-speed", "Limit the download speed.").PlaceHolder("BYTES_PER_SEC").Int()
maxUploadSpeed = app.Flag("max-upload-speed", "Limit the upload speed.").PlaceHolder("BYTES_PER_SEC").Int()
)
func mustLoadLocalConfig() *config.LocalConfig {
lc, err := loadLocalConfig()
failOnError(err)
return lc
}
func loadLocalConfig() (*config.LocalConfig, error) {
return config.LoadFromFile(vaultConfigFileName())
}
func failOnError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: %v\n", err)
@@ -53,7 +46,8 @@ func failOnError(err error) {
}
func getContext() context.Context {
return context.Background()
ctx := context.Background()
return ctx
}
func openConnection(options ...repo.RepositoryOption) (*kopia.Connection, error) {
@@ -70,6 +64,14 @@ func connectionOptionsFromFlags(options ...repo.RepositoryOption) *kopia.Connect
opts.TraceStorage = log.Printf
}
if *maxUploadSpeed != 0 {
opts.MaxUploadSpeed = *maxUploadSpeed
}
if *maxDownloadSpeed != 0 {
opts.MaxDownloadSpeed = *maxDownloadSpeed
}
return opts
}

View File

@@ -4,6 +4,7 @@
"context"
"errors"
"fmt"
"log"
"github.com/kopia/kopia/blob"
"github.com/kopia/kopia/blob/caching"
@@ -25,6 +26,9 @@ type ConnectionOptions struct {
TraceStorage func(f string, args ...interface{})
RepositoryOptions []repo.RepositoryOption
MaxDownloadSpeed int
MaxUploadSpeed int
}
// Close closes the underlying Vault and Repository.
@@ -63,7 +67,7 @@ func Open(ctx context.Context, configFile string, options *ConnectionOptions) (*
return nil, fmt.Errorf("invalid vault credentials: %v", err)
}
rawVaultStorage, err := blob.NewStorage(ctx, lc.VaultConnection.ConnectionInfo)
rawVaultStorage, err := newStorageWithOptions(ctx, lc.VaultConnection.ConnectionInfo, options)
if err != nil {
return nil, fmt.Errorf("cannot open vault storage: %v", err)
}
@@ -86,7 +90,7 @@ func Open(ctx context.Context, configFile string, options *ConnectionOptions) (*
if lc.RepoConnection == nil {
repositoryStorage = rawVaultStorage
} else {
repositoryStorage, err = blob.NewStorage(ctx, *lc.RepoConnection)
repositoryStorage, err = newStorageWithOptions(ctx, *lc.RepoConnection, options)
if err != nil {
vaultStorage.Close()
return nil, err
@@ -119,3 +123,21 @@ func Open(ctx context.Context, configFile string, options *ConnectionOptions) (*
return &conn, nil
}
func newStorageWithOptions(ctx context.Context, cfg blob.ConnectionInfo, options *ConnectionOptions) (blob.Storage, error) {
s, err := blob.NewStorage(ctx, cfg)
if err != nil {
return nil, err
}
if options.MaxUploadSpeed > 0 || options.MaxDownloadSpeed > 0 {
t, ok := s.(blob.Throttler)
if ok {
t.SetThrottle(options.MaxDownloadSpeed, options.MaxUploadSpeed)
} else {
log.Printf("Throttling not supported for '%v'.", cfg.Type)
}
}
return s, nil
}

View File

@@ -6,6 +6,7 @@
"hash/fnv"
"io"
"log"
"time"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/repo"
@@ -61,6 +62,7 @@ type Uploader struct {
func (u *Uploader) uploadFileInternal(f fs.File, relativePath string, forceStored bool) (*dirEntry, uint64, error) {
log.Printf("Uploading file %v", relativePath)
t0 := time.Now()
file, err := f.Open()
if err != nil {
return nil, 0, fmt.Errorf("unable to open file: %v", err)
@@ -89,10 +91,30 @@ func (u *Uploader) uploadFileInternal(f fs.File, relativePath string, forceStore
de := newDirEntry(e2, r)
de.FileSize = written
dt := time.Since(t0)
log.Printf("Uploaded file %v, %v bytes in %v. %v", relativePath, written, dt, bytesPerSecond(written, dt))
return de, metadataHash(&de.EntryMetadata), nil
}
func bytesPerSecond(bytes int64, duration time.Duration) string {
if duration == 0 {
return "0 B/s"
}
bps := float64(bytes) / duration.Seconds()
if bps >= 700000 {
return fmt.Sprintf("%.2f MB/s", bps/1000000)
}
if bps >= 700 {
return fmt.Sprintf("%.2f KB/s", bps/1000)
}
return fmt.Sprintf("%.2f B/s", bps)
}
func newDirEntry(md *fs.EntryMetadata, oid repo.ObjectID) *dirEntry {
return &dirEntry{
EntryMetadata: *md,
@@ -104,7 +126,7 @@ func (u *Uploader) uploadBundleInternal(b *bundle) (*dirEntry, uint64, error) {
bundleMetadata := b.Metadata()
log.Printf("uploading bundle %v (%v files)", bundleMetadata.Name, len(b.files))
defer log.Printf("finished uploading bundle")
t0 := time.Now()
writer := u.repo.NewWriter(
repo.WithDescription("BUNDLE:" + bundleMetadata.Name),
@@ -115,6 +137,7 @@ func (u *Uploader) uploadBundleInternal(b *bundle) (*dirEntry, uint64, error) {
var err error
de := newDirEntry(bundleMetadata, repo.NullObjectID)
var totalBytes int64
for _, fileEntry := range b.files {
file, err := fileEntry.Open()
@@ -136,11 +159,14 @@ func (u *Uploader) uploadBundleInternal(b *bundle) (*dirEntry, uint64, error) {
de.BundledChildren = append(de.BundledChildren, newDirEntry(fileMetadata, repo.NullObjectID))
uploadedFiles = append(uploadedFiles, &bundledFile{metadata: fileMetadata})
totalBytes += written
file.Close()
}
b.files = uploadedFiles
de.ObjectID, err = writer.Result(true)
dt := time.Since(t0)
log.Printf("Uploaded bundle %v (%v files) %v bytes in %v. %v", bundleMetadata.Name, len(b.files), totalBytes, dt, bytesPerSecond(totalBytes, dt))
if err != nil {
return nil, 0, err
}

View File

@@ -0,0 +1,44 @@
package throttle
import (
"io"
"net/http"
)
type throttlerPool interface {
AddReader(io.ReadCloser) (io.ReadCloser, error)
}
type throttlingRoundTripper struct {
base http.RoundTripper
downloadPool throttlerPool
uploadPool throttlerPool
}
func (rt *throttlingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if req.Body != nil && rt.uploadPool != nil {
var err error
req.Body, err = rt.uploadPool.AddReader(req.Body)
if err != nil {
return nil, err
}
}
resp, err := rt.base.RoundTrip(req)
if resp != nil && resp.Body != nil && rt.downloadPool != nil {
resp.Body, err = rt.downloadPool.AddReader(resp.Body)
}
return resp, err
}
// NewRoundTripper returns http.RoundTripper that throttles upload and downloads.
func NewRoundTripper(base http.RoundTripper, downloadPool throttlerPool, uploadPool throttlerPool) http.RoundTripper {
if base == nil {
base = http.DefaultTransport
}
return &throttlingRoundTripper{
base: base,
downloadPool: downloadPool,
uploadPool: uploadPool,
}
}

View File

@@ -0,0 +1,103 @@
package throttle
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"testing"
)
type baseRoundTripper struct {
responses map[*http.Request]*http.Response
}
func (rt *baseRoundTripper) add(req *http.Request, resp *http.Response) (*http.Request, *http.Response) {
rt.responses[req] = resp
return req, resp
}
func (rt *baseRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
resp := rt.responses[req]
if resp != nil {
return resp, nil
}
return nil, fmt.Errorf("error occurred")
}
type fakePool struct {
readers []io.ReadCloser
}
func (fp *fakePool) reset() {
fp.readers = nil
}
func (fp *fakePool) AddReader(r io.ReadCloser) (io.ReadCloser, error) {
fp.readers = append(fp.readers, r)
return r, nil
}
func TestRoundTripper(t *testing.T) {
downloadBody := ioutil.NopCloser(bytes.NewReader([]byte("data1")))
uploadBody := ioutil.NopCloser(bytes.NewReader([]byte("data1")))
base := &baseRoundTripper{
responses: make(map[*http.Request]*http.Response),
}
downloadPool := &fakePool{}
uploadPool := &fakePool{}
rt := NewRoundTripper(base, downloadPool, uploadPool)
// Empty request (no request, no reponse)
uploadPool.reset()
downloadPool.reset()
req1, resp1 := base.add(&http.Request{}, &http.Response{})
resp, err := rt.RoundTrip(req1)
if resp != resp1 || err != nil {
t.Errorf("invalid response or error: %v", err)
}
if len(downloadPool.readers) != 0 || len(uploadPool.readers) != 0 {
t.Errorf("invalid pool contents: %v %v", downloadPool.readers, uploadPool.readers)
}
// Upload request
uploadPool.reset()
downloadPool.reset()
req2, resp2 := base.add(&http.Request{
Body: uploadBody,
}, &http.Response{})
resp, err = rt.RoundTrip(req2)
if resp != resp2 || err != nil {
t.Errorf("invalid response or error: %v", err)
}
if len(downloadPool.readers) != 0 || len(uploadPool.readers) != 1 {
t.Errorf("invalid pool contents: %v %v", downloadPool.readers, uploadPool.readers)
}
// Download request
uploadPool.reset()
downloadPool.reset()
req3, resp3 := base.add(&http.Request{}, &http.Response{Body: downloadBody})
resp, err = rt.RoundTrip(req3)
if resp != resp3 || err != nil {
t.Errorf("invalid response or error: %v", err)
}
if len(downloadPool.readers) != 1 || len(uploadPool.readers) != 0 {
t.Errorf("invalid pool contents: %v %v", downloadPool.readers, uploadPool.readers)
}
// Upload/Download request
uploadPool.reset()
downloadPool.reset()
req4, resp4 := base.add(&http.Request{Body: uploadBody}, &http.Response{Body: downloadBody})
resp, err = rt.RoundTrip(req4)
if resp != resp4 || err != nil {
t.Errorf("invalid response or error: %v", err)
}
if len(downloadPool.readers) != 1 || len(uploadPool.readers) != 1 {
t.Errorf("invalid pool contents: %v %v", downloadPool.readers, uploadPool.readers)
}
}