added support for S3 storage provider

This commit is contained in:
Jarek Kowalski
2018-01-10 18:10:15 -08:00
parent b6611d2131
commit 61abc8eee2
5 changed files with 379 additions and 0 deletions

34
cli/s3cli/s3cli.go Normal file
View File

@@ -0,0 +1,34 @@
package s3cli
import (
"context"
"github.com/kopia/kopia/cli"
"github.com/kopia/kopia/storage"
"github.com/kopia/kopia/storage/s3"
"gopkg.in/alecthomas/kingpin.v2"
)
var options s3.Options
func connect(ctx context.Context) (storage.Storage, error) {
return s3.New(ctx, &options)
}
func init() {
cli.RegisterStorageConnectFlags(
"s3",
"an S3 bucket",
func(cmd *kingpin.CmdClause) {
cmd.Flag("bucket", "Name of the S3 bucket").Required().StringVar(&options.BucketName)
cmd.Flag("endpoint", "Endpoint to use").Default("s3.amazonaws.com").StringVar(&options.Endpoint)
cmd.Flag("access-key", "Access key ID (overrides AWS_ACCESS_KEY_ID environment variable)").Required().Envar("AWS_ACCESS_KEY_ID").StringVar(&options.AccessKeyID)
cmd.Flag("secret-access-key", "Secret access key (overrides AWS_SECRET_ACCESS_KEY environment variable)").Required().Envar("AWS_SECRET_ACCESS_KEY").StringVar(&options.SecretAccessKey)
cmd.Flag("prefix", "Prefix to use for objects in the bucket").StringVar(&options.Prefix)
cmd.Flag("disable-tls", "Disable TLS security (HTTPS)").BoolVar(&options.DoNotUseTLS)
cmd.Flag("max-download-speed", "Limit the download speed.").PlaceHolder("BYTES_PER_SEC").IntVar(&options.MaxDownloadSpeedBytesPerSecond)
cmd.Flag("max-upload-speed", "Limit the upload speed.").PlaceHolder("BYTES_PER_SEC").IntVar(&options.MaxUploadSpeedBytesPerSecond)
},
connect)
}

View File

@@ -24,6 +24,7 @@
_ "github.com/kopia/kopia/cli/filesystemcli"
_ "github.com/kopia/kopia/cli/gcscli"
_ "github.com/kopia/kopia/cli/s3cli"
_ "github.com/kopia/kopia/cli/webdavcli"
)

20
storage/s3/s3_options.go Normal file
View File

@@ -0,0 +1,20 @@
package s3
// Options defines options for S3-based storage.
type Options struct {
// BucketName is the name of the bucket where data is stored.
BucketName string `json:"bucket"`
// Prefix specifies additional string to prepend to all objects.
Prefix string `json:"prefix,omitempty"`
Endpoint string `json:"endpoint"`
DoNotUseTLS bool `json:"doNotUseTLS,omitempyy"`
AccessKeyID string `json:"accessKeyID"`
SecretAccessKey string `json:"secretAccessKey" kopia:"sensitive"`
MaxUploadSpeedBytesPerSecond int `json:"maxUploadSpeedBytesPerSecond,omitempty"`
MaxDownloadSpeedBytesPerSecond int `json:"maxDownloadSpeedBytesPerSecond,omitempty"`
}

244
storage/s3/s3_storage.go Normal file
View File

@@ -0,0 +1,244 @@
package s3
import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"github.com/efarrer/iothrottler"
"github.com/kopia/kopia/internal/retry"
"github.com/kopia/kopia/storage"
"github.com/minio/minio-go"
)
const (
s3storageType = "s3"
)
type s3Storage struct {
Options
ctx context.Context
cli *minio.Client
downloadThrottler *iothrottler.IOThrottlerPool
uploadThrottler *iothrottler.IOThrottlerPool
}
func (s *s3Storage) BlockSize(b string) (int64, error) {
attempt := func() (interface{}, error) {
oi, err := s.cli.StatObject(s.BucketName, s.getObjectNameString(b), minio.StatObjectOptions{})
if err != nil {
return 0, err
}
return oi.Size, nil
}
v, err := exponentialBackoff(fmt.Sprintf("BlockSize(%q)", b), attempt)
if err != nil {
return 0, translateError(err)
}
return v.(int64), nil
}
func (s *s3Storage) GetBlock(b string, offset, length int64) ([]byte, error) {
attempt := func() (interface{}, error) {
var opt minio.GetObjectOptions
if length > 0 {
opt.SetRange(offset, offset+length)
}
o, err := s.cli.GetObject(s.BucketName, s.getObjectNameString(b), opt)
if err != nil {
return 0, err
}
defer o.Close()
throttled, err := s.downloadThrottler.AddReader(o)
if err != nil {
return nil, err
}
return ioutil.ReadAll(throttled)
}
v, err := exponentialBackoff(fmt.Sprintf("GetBlock(%q,%v,%v)", b, offset, length), attempt)
if err != nil {
return nil, translateError(err)
}
return v.([]byte), nil
}
func exponentialBackoff(desc string, att retry.AttemptFunc) (interface{}, error) {
return retry.WithExponentialBackoff(desc, att, isRetriableError)
}
func isRetriableError(err error) bool {
if me, ok := err.(minio.ErrorResponse); ok {
// retry on server errors, not on client errors
return me.StatusCode >= 500
}
switch err {
case nil:
return false
default:
return true
}
}
func translateError(err error) error {
if me, ok := err.(minio.ErrorResponse); ok {
if me.StatusCode == 404 {
return storage.ErrBlockNotFound
}
}
switch err {
case nil:
return nil
default:
return fmt.Errorf("unexpected S3 error: %v", err)
}
}
func (s *s3Storage) PutBlock(b string, data []byte) error {
attempt := func() (interface{}, error) {
rc := ioutil.NopCloser(bytes.NewReader(data))
throttled, err := s.uploadThrottler.AddReader(rc)
if err != nil {
return nil, err
}
n, err := s.cli.PutObject(s.BucketName, s.getObjectNameString(b), throttled, int64(len(data)), minio.PutObjectOptions{})
if err != nil {
return nil, err
}
if n != int64(len(data)) {
return nil, fmt.Errorf("truncated write %v of %v bytes", n, len(data))
}
return nil, nil
}
_, err := exponentialBackoff(fmt.Sprintf("PutBlock(%q)", b), attempt)
return translateError(err)
}
func (s *s3Storage) DeleteBlock(b string) error {
attempt := func() (interface{}, error) {
return nil, s.cli.RemoveObject(s.BucketName, s.getObjectNameString(b))
}
_, err := exponentialBackoff(fmt.Sprintf("DeleteBlock(%q)", b), attempt)
return translateError(err)
}
func (s *s3Storage) getObjectNameString(b string) string {
return s.Prefix + b
}
func (s *s3Storage) ListBlocks(prefix string) (chan storage.BlockMetadata, storage.CancelFunc) {
ch := make(chan storage.BlockMetadata, 100)
cancelled := make(chan struct{})
go func() {
defer close(ch)
oi := s.cli.ListObjects(s.BucketName, s.Prefix+prefix, false, cancelled)
for o := range oi {
if err := o.Err; err != nil {
select {
case ch <- storage.BlockMetadata{Error: translateError(err)}:
return
case <-cancelled:
return
}
}
bm := storage.BlockMetadata{
BlockID: o.Key[len(s.Prefix):],
Length: o.Size,
TimeStamp: o.LastModified,
}
select {
case ch <- bm:
case <-cancelled:
return
}
}
}()
return ch, func() {
close(cancelled)
}
}
func (s *s3Storage) ConnectionInfo() storage.ConnectionInfo {
return storage.ConnectionInfo{
Type: s3storageType,
Config: &s.Options,
}
}
func (s *s3Storage) Close() error {
return nil
}
func (s *s3Storage) String() string {
return fmt.Sprintf("s3://%v/%v", s.BucketName, s.Prefix)
}
func toBandwidth(bytesPerSecond int) iothrottler.Bandwidth {
if bytesPerSecond <= 0 {
return iothrottler.Unlimited
}
return iothrottler.Bandwidth(bytesPerSecond) * iothrottler.BytesPerSecond
}
// New creates new S3-backed storage with specified options:
//
// - the 'BucketName' field is required and all other parameters are optional.
func New(ctx context.Context, opt *Options) (storage.Storage, error) {
if opt.BucketName == "" {
return nil, errors.New("bucket name must be specified")
}
cli, err := minio.New(opt.Endpoint, opt.AccessKeyID, opt.SecretAccessKey, !opt.DoNotUseTLS)
if err != nil {
return nil, fmt.Errorf("unable to create client: %v", err)
}
downloadThrottler := iothrottler.NewIOThrottlerPool(toBandwidth(opt.MaxDownloadSpeedBytesPerSecond))
uploadThrottler := iothrottler.NewIOThrottlerPool(toBandwidth(opt.MaxUploadSpeedBytesPerSecond))
return &s3Storage{
Options: *opt,
ctx: ctx,
cli: cli,
downloadThrottler: downloadThrottler,
uploadThrottler: uploadThrottler,
}, nil
}
func init() {
storage.AddSupportedStorage(
s3storageType,
func() interface{} {
return &Options{}
},
func(ctx context.Context, o interface{}) (storage.Storage, error) {
return New(ctx, o.(*Options))
})
}
var _ storage.ConnectionInfoProvider = &s3Storage{}

View File

@@ -0,0 +1,80 @@
package s3
import (
"context"
"crypto/rand"
"fmt"
"log"
"testing"
"time"
"github.com/kopia/kopia/internal/storagetesting"
)
// https://github.com/minio/minio-go
const (
endpoint = "play.minio.io:9000"
accessKeyID = "Q3AM3UQ867SPQQA43P2F"
secretAccessKey = "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"
useSSL = true
// the test takes a few seconds, delete stuff older than 1h to avoid accumulating cruft
cleanupAge = 1 * time.Hour
bucketName = "kopia-test-1"
)
func TestS3Storage(t *testing.T) {
if testing.Short() {
return
}
cleanupOldData(t)
data := make([]byte, 8)
rand.Read(data)
st, err := New(context.Background(), &Options{
AccessKeyID: accessKeyID,
SecretAccessKey: secretAccessKey,
Endpoint: endpoint,
BucketName: bucketName,
Prefix: fmt.Sprintf("test-%v-%x-", time.Now().Unix(), data),
})
if err != nil {
t.Fatalf("err: %v", err)
}
storagetesting.VerifyStorage(t, st)
}
func cleanupOldData(t *testing.T) {
// cleanup old data from the bucket
st, err := New(context.Background(), &Options{
AccessKeyID: accessKeyID,
SecretAccessKey: secretAccessKey,
Endpoint: endpoint,
BucketName: bucketName,
})
if err != nil {
t.Fatalf("err: %v", err)
}
items, cancel := st.ListBlocks("")
defer cancel()
for it := range items {
if it.Error != nil {
t.Errorf("can't cleanup: %v", it.Error)
return
}
age := time.Since(it.TimeStamp)
if age > cleanupAge {
if err := st.DeleteBlock(it.BlockID); err != nil {
t.Errorf("warning: unable to delete %q: %v", it.BlockID, err)
}
} else {
log.Printf("keeping %v", it.BlockID)
}
}
}