mirror of
https://github.com/kopia/kopia.git
synced 2026-04-26 09:00:15 -04:00
Support setting AWS S3 storage class for all types of blobs (#1335)
* Support setting AWS S3 storage class for all types of blobs * Read .storageconfig file * Improve loading logic * Hide .storageconfig from ListBlobs()
This commit is contained in:
@@ -496,7 +496,7 @@ func (c *App) advancedCommand(ctx context.Context) {
|
||||
This command could be dangerous or lead to repository corruption when used improperly.
|
||||
|
||||
Running this command is not needed for using Kopia. Instead, most users should rely on periodic repository maintenance. See https://kopia.io/docs/advanced/maintenance/ for more information.
|
||||
To run this command despite the warning, set KOPIA_ADVANCED_COMMANDS=enabled
|
||||
To run this command despite the warning, set --advanced-commands=enabled
|
||||
|
||||
`)
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ type s3Storage struct {
|
||||
|
||||
downloadThrottler *iothrottler.IOThrottlerPool
|
||||
uploadThrottler *iothrottler.IOThrottlerPool
|
||||
storageConfig *StorageConfig
|
||||
}
|
||||
|
||||
func (s *s3Storage) GetBlob(ctx context.Context, b blob.ID, offset, length int64, output *gather.WriteBuffer) error {
|
||||
@@ -142,9 +143,12 @@ func (s *s3Storage) putBlob(ctx context.Context, b blob.ID, data blob.Bytes) (ve
|
||||
return versionMetadata{}, errors.Wrap(err, "AddReader")
|
||||
}
|
||||
|
||||
storageClass := s.storageConfig.getStorageClassForBlobID(b)
|
||||
|
||||
uploadInfo, err := s.cli.PutObject(ctx, s.BucketName, s.getObjectNameString(b), throttled, int64(data.Length()), minio.PutObjectOptions{
|
||||
ContentType: "application/x-kopia",
|
||||
SendContentMd5: atomic.LoadInt32(&s.sendMD5) > 0,
|
||||
StorageClass: storageClass,
|
||||
})
|
||||
|
||||
var er minio.ErrorResponse
|
||||
@@ -158,7 +162,8 @@ func (s *s3Storage) putBlob(ctx context.Context, b blob.ID, data blob.Bytes) (ve
|
||||
if errors.Is(err, io.EOF) && uploadInfo.Size == 0 {
|
||||
// special case empty stream
|
||||
_, err = s.cli.PutObject(ctx, s.BucketName, s.getObjectNameString(b), bytes.NewBuffer(nil), 0, minio.PutObjectOptions{
|
||||
ContentType: "application/x-kopia",
|
||||
ContentType: "application/x-kopia",
|
||||
StorageClass: storageClass,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -212,6 +217,10 @@ func (s *s3Storage) ListBlobs(ctx context.Context, prefix blob.ID, callback func
|
||||
Timestamp: o.LastModified,
|
||||
}
|
||||
|
||||
if bm.BlobID == ConfigName {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := callback(bm); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -306,13 +315,26 @@ func newStorage(ctx context.Context, opt *Options) (*s3Storage, error) {
|
||||
return nil, errors.Errorf("bucket %q does not exist", opt.BucketName)
|
||||
}
|
||||
|
||||
return &s3Storage{
|
||||
s := s3Storage{
|
||||
Options: *opt,
|
||||
cli: cli,
|
||||
sendMD5: 0,
|
||||
downloadThrottler: downloadThrottler,
|
||||
uploadThrottler: uploadThrottler,
|
||||
}, nil
|
||||
storageConfig: &StorageConfig{},
|
||||
}
|
||||
|
||||
var scOutput gather.WriteBuffer
|
||||
|
||||
if getBlobErr := s.GetBlob(ctx, ConfigName, 0, -1, &scOutput); getBlobErr == nil {
|
||||
if scErr := s.storageConfig.Load(scOutput.Bytes().Reader()); scErr != nil {
|
||||
return nil, errors.Wrapf(scErr, "error parsing storage config for bucket %q", opt.BucketName)
|
||||
}
|
||||
} else if !errors.Is(getBlobErr, blob.ErrBlobNotFound) {
|
||||
return nil, errors.Wrapf(getBlobErr, "error retrieving storage config from bucket %q", opt.BucketName)
|
||||
}
|
||||
|
||||
return &s, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
||||
45
repo/blob/s3/s3_storage_config.go
Normal file
45
repo/blob/s3/s3_storage_config.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
)
|
||||
|
||||
// ConfigName is the name of the hidden storage config file in a S3 bucket.
|
||||
const ConfigName = ".storageconfig"
|
||||
|
||||
// PrefixAndStorageClass defines the storage class to use for a particular blob ID prefix.
|
||||
type PrefixAndStorageClass struct {
|
||||
Prefix blob.ID `json:"prefix"`
|
||||
StorageClass string `json:"storageClass"`
|
||||
}
|
||||
|
||||
// StorageConfig contains storage configuration optionally persisted in the storage itself.
|
||||
type StorageConfig struct {
|
||||
BlobOptions []PrefixAndStorageClass `json:"blobOptions,omitempty"`
|
||||
}
|
||||
|
||||
// Load loads the StorageConfig from the provided reader.
|
||||
func (p *StorageConfig) Load(r io.Reader) error {
|
||||
return errors.Wrap(json.NewDecoder(r).Decode(p), "error parsing JSON")
|
||||
}
|
||||
|
||||
// Save saves the parameters to the provided writer.
|
||||
func (p *StorageConfig) Save(w io.Writer) error {
|
||||
return errors.Wrap(json.NewEncoder(w).Encode(p), "error writing JSON")
|
||||
}
|
||||
|
||||
func (p *StorageConfig) getStorageClassForBlobID(id blob.ID) string {
|
||||
for _, o := range p.BlobOptions {
|
||||
if strings.HasPrefix(string(id), string(o.Prefix)) {
|
||||
return o.StorageClass
|
||||
}
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
Reference in New Issue
Block a user