diff --git a/cli/s3cli/s3cli.go b/cli/s3cli/s3cli.go new file mode 100644 index 000000000..955a412c6 --- /dev/null +++ b/cli/s3cli/s3cli.go @@ -0,0 +1,34 @@ +package s3cli + +import ( + "context" + + "github.com/kopia/kopia/cli" + "github.com/kopia/kopia/storage" + "github.com/kopia/kopia/storage/s3" + "gopkg.in/alecthomas/kingpin.v2" +) + +var options s3.Options + +func connect(ctx context.Context) (storage.Storage, error) { + return s3.New(ctx, &options) +} + +func init() { + cli.RegisterStorageConnectFlags( + "s3", + "an S3 bucket", + func(cmd *kingpin.CmdClause) { + cmd.Flag("bucket", "Name of the S3 bucket").Required().StringVar(&options.BucketName) + cmd.Flag("endpoint", "Endpoint to use").Default("s3.amazonaws.com").StringVar(&options.Endpoint) + cmd.Flag("access-key", "Access key ID (overrides AWS_ACCESS_KEY_ID environment variable)").Required().Envar("AWS_ACCESS_KEY_ID").StringVar(&options.AccessKeyID) + cmd.Flag("secret-access-key", "Secret access key (overrides AWS_SECRET_ACCESS_KEY environment variable)").Required().Envar("AWS_SECRET_ACCESS_KEY").StringVar(&options.SecretAccessKey) + cmd.Flag("prefix", "Prefix to use for objects in the bucket").StringVar(&options.Prefix) + cmd.Flag("disable-tls", "Disable TLS security (HTTPS)").BoolVar(&options.DoNotUseTLS) + cmd.Flag("max-download-speed", "Limit the download speed.").PlaceHolder("BYTES_PER_SEC").IntVar(&options.MaxDownloadSpeedBytesPerSecond) + cmd.Flag("max-upload-speed", "Limit the upload speed.").PlaceHolder("BYTES_PER_SEC").IntVar(&options.MaxUploadSpeedBytesPerSecond) + + }, + connect) +} diff --git a/main.go b/main.go index 5f9fe93ef..001bca2f5 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ _ "github.com/kopia/kopia/cli/filesystemcli" _ "github.com/kopia/kopia/cli/gcscli" + _ "github.com/kopia/kopia/cli/s3cli" _ "github.com/kopia/kopia/cli/webdavcli" ) diff --git a/storage/s3/s3_options.go b/storage/s3/s3_options.go new file mode 100644 index 000000000..c2d675953 --- /dev/null +++ b/storage/s3/s3_options.go @@ -0,0 +1,20 @@ +package s3 + +// Options defines options for S3-based storage. +type Options struct { + // BucketName is the name of the bucket where data is stored. + BucketName string `json:"bucket"` + + // Prefix specifies additional string to prepend to all objects. + Prefix string `json:"prefix,omitempty"` + + Endpoint string `json:"endpoint"` + DoNotUseTLS bool `json:"doNotUseTLS,omitempyy"` + + AccessKeyID string `json:"accessKeyID"` + SecretAccessKey string `json:"secretAccessKey" kopia:"sensitive"` + + MaxUploadSpeedBytesPerSecond int `json:"maxUploadSpeedBytesPerSecond,omitempty"` + + MaxDownloadSpeedBytesPerSecond int `json:"maxDownloadSpeedBytesPerSecond,omitempty"` +} diff --git a/storage/s3/s3_storage.go b/storage/s3/s3_storage.go new file mode 100644 index 000000000..05c42d634 --- /dev/null +++ b/storage/s3/s3_storage.go @@ -0,0 +1,244 @@ +package s3 + +import ( + "bytes" + "context" + "errors" + "fmt" + "io/ioutil" + + "github.com/efarrer/iothrottler" + "github.com/kopia/kopia/internal/retry" + "github.com/kopia/kopia/storage" + "github.com/minio/minio-go" +) + +const ( + s3storageType = "s3" +) + +type s3Storage struct { + Options + + ctx context.Context + + cli *minio.Client + + downloadThrottler *iothrottler.IOThrottlerPool + uploadThrottler *iothrottler.IOThrottlerPool +} + +func (s *s3Storage) BlockSize(b string) (int64, error) { + attempt := func() (interface{}, error) { + oi, err := s.cli.StatObject(s.BucketName, s.getObjectNameString(b), minio.StatObjectOptions{}) + if err != nil { + return 0, err + } + + return oi.Size, nil + } + + v, err := exponentialBackoff(fmt.Sprintf("BlockSize(%q)", b), attempt) + if err != nil { + return 0, translateError(err) + } + + return v.(int64), nil +} + +func (s *s3Storage) GetBlock(b string, offset, length int64) ([]byte, error) { + attempt := func() (interface{}, error) { + var opt minio.GetObjectOptions + if length > 0 { + opt.SetRange(offset, offset+length) + } + + o, err := s.cli.GetObject(s.BucketName, s.getObjectNameString(b), opt) + if err != nil { + return 0, err + } + + defer o.Close() + throttled, err := s.downloadThrottler.AddReader(o) + if err != nil { + return nil, err + } + + return ioutil.ReadAll(throttled) + } + + v, err := exponentialBackoff(fmt.Sprintf("GetBlock(%q,%v,%v)", b, offset, length), attempt) + if err != nil { + return nil, translateError(err) + } + + return v.([]byte), nil +} + +func exponentialBackoff(desc string, att retry.AttemptFunc) (interface{}, error) { + return retry.WithExponentialBackoff(desc, att, isRetriableError) +} + +func isRetriableError(err error) bool { + if me, ok := err.(minio.ErrorResponse); ok { + // retry on server errors, not on client errors + return me.StatusCode >= 500 + } + + switch err { + case nil: + return false + default: + return true + } +} + +func translateError(err error) error { + if me, ok := err.(minio.ErrorResponse); ok { + if me.StatusCode == 404 { + return storage.ErrBlockNotFound + } + } + + switch err { + case nil: + return nil + default: + return fmt.Errorf("unexpected S3 error: %v", err) + } +} + +func (s *s3Storage) PutBlock(b string, data []byte) error { + attempt := func() (interface{}, error) { + rc := ioutil.NopCloser(bytes.NewReader(data)) + throttled, err := s.uploadThrottler.AddReader(rc) + if err != nil { + return nil, err + } + + n, err := s.cli.PutObject(s.BucketName, s.getObjectNameString(b), throttled, int64(len(data)), minio.PutObjectOptions{}) + if err != nil { + return nil, err + } + + if n != int64(len(data)) { + return nil, fmt.Errorf("truncated write %v of %v bytes", n, len(data)) + } + + return nil, nil + } + + _, err := exponentialBackoff(fmt.Sprintf("PutBlock(%q)", b), attempt) + return translateError(err) +} + +func (s *s3Storage) DeleteBlock(b string) error { + attempt := func() (interface{}, error) { + return nil, s.cli.RemoveObject(s.BucketName, s.getObjectNameString(b)) + } + + _, err := exponentialBackoff(fmt.Sprintf("DeleteBlock(%q)", b), attempt) + return translateError(err) +} + +func (s *s3Storage) getObjectNameString(b string) string { + return s.Prefix + b +} + +func (s *s3Storage) ListBlocks(prefix string) (chan storage.BlockMetadata, storage.CancelFunc) { + ch := make(chan storage.BlockMetadata, 100) + cancelled := make(chan struct{}) + + go func() { + defer close(ch) + + oi := s.cli.ListObjects(s.BucketName, s.Prefix+prefix, false, cancelled) + for o := range oi { + if err := o.Err; err != nil { + select { + case ch <- storage.BlockMetadata{Error: translateError(err)}: + return + case <-cancelled: + return + } + } + + bm := storage.BlockMetadata{ + BlockID: o.Key[len(s.Prefix):], + Length: o.Size, + TimeStamp: o.LastModified, + } + + select { + case ch <- bm: + case <-cancelled: + return + } + } + }() + + return ch, func() { + close(cancelled) + } +} + +func (s *s3Storage) ConnectionInfo() storage.ConnectionInfo { + return storage.ConnectionInfo{ + Type: s3storageType, + Config: &s.Options, + } +} + +func (s *s3Storage) Close() error { + return nil +} + +func (s *s3Storage) String() string { + return fmt.Sprintf("s3://%v/%v", s.BucketName, s.Prefix) +} + +func toBandwidth(bytesPerSecond int) iothrottler.Bandwidth { + if bytesPerSecond <= 0 { + return iothrottler.Unlimited + } + + return iothrottler.Bandwidth(bytesPerSecond) * iothrottler.BytesPerSecond +} + +// New creates new S3-backed storage with specified options: +// +// - the 'BucketName' field is required and all other parameters are optional. +func New(ctx context.Context, opt *Options) (storage.Storage, error) { + if opt.BucketName == "" { + return nil, errors.New("bucket name must be specified") + } + + cli, err := minio.New(opt.Endpoint, opt.AccessKeyID, opt.SecretAccessKey, !opt.DoNotUseTLS) + if err != nil { + return nil, fmt.Errorf("unable to create client: %v", err) + } + + downloadThrottler := iothrottler.NewIOThrottlerPool(toBandwidth(opt.MaxDownloadSpeedBytesPerSecond)) + uploadThrottler := iothrottler.NewIOThrottlerPool(toBandwidth(opt.MaxUploadSpeedBytesPerSecond)) + + return &s3Storage{ + Options: *opt, + ctx: ctx, + cli: cli, + downloadThrottler: downloadThrottler, + uploadThrottler: uploadThrottler, + }, nil +} + +func init() { + storage.AddSupportedStorage( + s3storageType, + func() interface{} { + return &Options{} + }, + func(ctx context.Context, o interface{}) (storage.Storage, error) { + return New(ctx, o.(*Options)) + }) +} + +var _ storage.ConnectionInfoProvider = &s3Storage{} diff --git a/storage/s3/s3_storage_test.go b/storage/s3/s3_storage_test.go new file mode 100644 index 000000000..c1e6348dd --- /dev/null +++ b/storage/s3/s3_storage_test.go @@ -0,0 +1,80 @@ +package s3 + +import ( + "context" + "crypto/rand" + "fmt" + "log" + "testing" + "time" + + "github.com/kopia/kopia/internal/storagetesting" +) + +// https://github.com/minio/minio-go +const ( + endpoint = "play.minio.io:9000" + accessKeyID = "Q3AM3UQ867SPQQA43P2F" + secretAccessKey = "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG" + useSSL = true + + // the test takes a few seconds, delete stuff older than 1h to avoid accumulating cruft + cleanupAge = 1 * time.Hour + + bucketName = "kopia-test-1" +) + +func TestS3Storage(t *testing.T) { + if testing.Short() { + return + } + + cleanupOldData(t) + + data := make([]byte, 8) + rand.Read(data) + + st, err := New(context.Background(), &Options{ + AccessKeyID: accessKeyID, + SecretAccessKey: secretAccessKey, + Endpoint: endpoint, + BucketName: bucketName, + Prefix: fmt.Sprintf("test-%v-%x-", time.Now().Unix(), data), + }) + if err != nil { + t.Fatalf("err: %v", err) + } + + storagetesting.VerifyStorage(t, st) +} + +func cleanupOldData(t *testing.T) { + // cleanup old data from the bucket + st, err := New(context.Background(), &Options{ + AccessKeyID: accessKeyID, + SecretAccessKey: secretAccessKey, + Endpoint: endpoint, + BucketName: bucketName, + }) + if err != nil { + t.Fatalf("err: %v", err) + } + + items, cancel := st.ListBlocks("") + defer cancel() + for it := range items { + if it.Error != nil { + t.Errorf("can't cleanup: %v", it.Error) + return + } + + age := time.Since(it.TimeStamp) + if age > cleanupAge { + if err := st.DeleteBlock(it.BlockID); err != nil { + t.Errorf("warning: unable to delete %q: %v", it.BlockID, err) + } + } else { + log.Printf("keeping %v", it.BlockID) + } + } +}