mirror of
https://github.com/kopia/kopia.git
synced 2025-12-23 22:57:50 -05:00
blob: replaced blob.Storage.SetTime() method with blob.PutOptions.SetTime (#1595)
* sharded: plumbed through blob.PutOptions * blob: removed blob.Storage.SetTime() method This was only used for `kopia repo sync-to` and got replaced with an equivalent blob.PutOptions.SetTime, which wehn set to non-zero time will attempt to set the modification time on a file. Since some providers don't support changing modification time, we are able to emulate it using per-blob metadata (on B2, Azure and GCS), sadly S3 is still unsupported, because it does not support returning metadata in list results. Also added PutOptions.GetTime, which when set to not nil, will populate the provided variable with actual time that got assigned to the blob. Added tests that verify that each provider supports GetTime and SetTime according to this spec. * blob: additional test coverage for filesystem storage * blob: added PutBlobAndGetMetadata() helper and used where appropriate * fixed test failures * pr feedback * Update repo/blob/azure/azure_storage.go Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com> * Update repo/blob/filesystem/filesystem_storage.go Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com> * Update repo/blob/filesystem/filesystem_storage.go Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com> * blobtesting: fixed object_locking_map.go * blobtesting: removed SetTime from ObjectLockingMap Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com>
This commit is contained in:
@@ -30,9 +30,8 @@ type commandRepositorySyncTo struct {
|
||||
repositorySyncDestinationMustExist bool
|
||||
repositorySyncTimes bool
|
||||
|
||||
lastSyncProgress string
|
||||
syncProgressMutex sync.Mutex
|
||||
setTimeUnsupportedOnce sync.Once
|
||||
lastSyncProgress string
|
||||
syncProgressMutex sync.Mutex
|
||||
|
||||
out textOutput
|
||||
}
|
||||
@@ -303,19 +302,25 @@ func (c *commandRepositorySyncTo) syncCopyBlob(ctx context.Context, m blob.Metad
|
||||
return errors.Wrapf(err, "error reading blob '%v' from source", m.BlobID)
|
||||
}
|
||||
|
||||
if err := dst.PutBlob(ctx, m.BlobID, data.Bytes(), blob.PutOptions{}); err != nil {
|
||||
return errors.Wrapf(err, "error writing blob '%v' to destination", m.BlobID)
|
||||
opt := blob.PutOptions{}
|
||||
if c.repositorySyncTimes {
|
||||
opt.SetModTime = m.Timestamp
|
||||
}
|
||||
|
||||
if c.repositorySyncTimes {
|
||||
if err := dst.SetTime(ctx, m.BlobID, m.Timestamp); err != nil {
|
||||
if errors.Is(err, blob.ErrSetTimeUnsupported) {
|
||||
c.setTimeUnsupportedOnce.Do(func() {
|
||||
log(ctx).Infof("destination repository does not support setting time")
|
||||
})
|
||||
}
|
||||
if err := dst.PutBlob(ctx, m.BlobID, data.Bytes(), opt); err != nil {
|
||||
if errors.Is(err, blob.ErrSetTimeUnsupported) {
|
||||
// run again without SetModTime, emit a warning
|
||||
opt.SetModTime = time.Time{}
|
||||
|
||||
return errors.Wrapf(err, "error setting time on destination '%v'", m.BlobID)
|
||||
log(ctx).Warnf("destination repository does not support preserving modification times")
|
||||
|
||||
c.repositorySyncTimes = false
|
||||
|
||||
err = dst.PutBlob(ctx, m.BlobID, data.Bytes(), opt)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error writing blob '%v' to destination", m.BlobID)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,21 @@
|
||||
|
||||
const maxTimeDiffBetweenGetAndList = time.Minute
|
||||
|
||||
// AssertTimestampsCloseEnough asserts that two provided times are close enough - some providers
|
||||
// don't store timestamps exactly but round them up/down by several seconds.
|
||||
func AssertTimestampsCloseEnough(t *testing.T, blobID blob.ID, got, want time.Time) {
|
||||
t.Helper()
|
||||
|
||||
timeDiff := got.Sub(want)
|
||||
if timeDiff < 0 {
|
||||
timeDiff = -timeDiff
|
||||
}
|
||||
|
||||
if timeDiff > maxTimeDiffBetweenGetAndList {
|
||||
t.Fatalf("invalid timestamp on %v: got %v, want %v", blobID, got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// AssertGetBlob asserts that the specified BLOB has correct content.
|
||||
func AssertGetBlob(ctx context.Context, t *testing.T, s blob.Storage, blobID blob.ID, expected []byte) {
|
||||
t.Helper()
|
||||
|
||||
@@ -176,10 +176,6 @@ func (s *eventuallyConsistentStorage) PutBlob(ctx context.Context, id blob.ID, d
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *eventuallyConsistentStorage) SetTime(ctx context.Context, id blob.ID, t time.Time) error {
|
||||
return s.realStorage.SetTime(ctx, id, t)
|
||||
}
|
||||
|
||||
func (s *eventuallyConsistentStorage) DeleteBlob(ctx context.Context, id blob.ID) error {
|
||||
s.randomFrontendCache().put(id, nil)
|
||||
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/internal/fault"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
@@ -14,7 +13,6 @@
|
||||
MethodGetBlob fault.Method = iota
|
||||
MethodGetMetadata
|
||||
MethodPutBlob
|
||||
MethodSetTime
|
||||
MethodDeleteBlob
|
||||
MethodListBlobs
|
||||
MethodListBlobsItem
|
||||
@@ -63,15 +61,6 @@ func (s *FaultyStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes
|
||||
return s.base.PutBlob(ctx, id, data, opts)
|
||||
}
|
||||
|
||||
// SetTime implements blob.Storage.
|
||||
func (s *FaultyStorage) SetTime(ctx context.Context, id blob.ID, t time.Time) error {
|
||||
if ok, err := s.GetNextFault(ctx, MethodSetTime, id); ok {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.base.SetTime(ctx, id, t)
|
||||
}
|
||||
|
||||
// DeleteBlob implements blob.Storage.
|
||||
func (s *FaultyStorage) DeleteBlob(ctx context.Context, id blob.ID) error {
|
||||
if ok, err := s.GetNextFault(ctx, MethodDeleteBlob, id); ok {
|
||||
|
||||
@@ -83,7 +83,11 @@ func (s *mapStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes, o
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
s.keyTime[id] = s.timeNow()
|
||||
if !opts.SetModTime.IsZero() {
|
||||
s.keyTime[id] = opts.SetModTime
|
||||
} else {
|
||||
s.keyTime[id] = s.timeNow()
|
||||
}
|
||||
|
||||
var b bytes.Buffer
|
||||
|
||||
@@ -91,6 +95,10 @@ func (s *mapStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes, o
|
||||
|
||||
s.data[id] = b.Bytes()
|
||||
|
||||
if opts.GetModTime != nil {
|
||||
*opts.GetModTime = s.keyTime[id]
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -147,15 +155,6 @@ func (s *mapStorage) Close(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *mapStorage) SetTime(ctx context.Context, blobID blob.ID, t time.Time) error {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
s.keyTime[blobID] = t
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *mapStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) error {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
@@ -122,7 +122,12 @@ func (s *objectLockingMap) PutBlob(ctx context.Context, id blob.ID, data blob.By
|
||||
|
||||
e := &entry{
|
||||
value: b.Bytes(),
|
||||
mtime: s.timeNow(),
|
||||
}
|
||||
|
||||
if opts.SetModTime.IsZero() {
|
||||
e.mtime = s.timeNow()
|
||||
} else {
|
||||
e.mtime = opts.SetModTime
|
||||
}
|
||||
|
||||
if opts.HasRetentionOptions() {
|
||||
@@ -131,6 +136,10 @@ func (s *objectLockingMap) PutBlob(ctx context.Context, id blob.ID, data blob.By
|
||||
|
||||
s.data[id] = append(s.data[id], e)
|
||||
|
||||
if opts.GetModTime != nil {
|
||||
*opts.GetModTime = e.mtime
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -196,20 +205,6 @@ func (s *objectLockingMap) Close(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *objectLockingMap) SetTime(ctx context.Context, id blob.ID, t time.Time) error {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
e, err := s.getLatestForMutationLocked(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.mtime = t
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *objectLockingMap) TouchBlob(ctx context.Context, id blob.ID, threshold time.Duration) error {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
@@ -120,37 +120,73 @@ func VerifyStorage(ctx context.Context, t *testing.T, r blob.Storage, opts blob.
|
||||
}
|
||||
})
|
||||
|
||||
ts := time.Date(2020, 1, 1, 15, 30, 45, 0, time.UTC)
|
||||
t.Run("DeleteBlobsAndList", func(t *testing.T) {
|
||||
require.NoError(t, r.DeleteBlob(ctx, blocks[0].blk))
|
||||
require.NoError(t, r.DeleteBlob(ctx, blocks[0].blk))
|
||||
|
||||
t.Run("SetTime", func(t *testing.T) {
|
||||
AssertListResults(ctx, t, r, "ab", blocks[2].blk, blocks[3].blk)
|
||||
AssertListResults(ctx, t, r, "", blocks[1].blk, blocks[2].blk, blocks[3].blk, blocks[4].blk)
|
||||
})
|
||||
|
||||
t.Run("PutBlobsWithSetTime", func(t *testing.T) {
|
||||
for _, b := range blocks {
|
||||
b := b
|
||||
|
||||
t.Run(string(b.blk), func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
err := r.SetTime(ctx, b.blk, ts)
|
||||
inTime := time.Date(2020, 1, 2, 12, 30, 40, 0, time.UTC)
|
||||
|
||||
err := r.PutBlob(ctx, b.blk, gather.FromSlice(b.contents), blob.PutOptions{
|
||||
SetModTime: inTime,
|
||||
})
|
||||
|
||||
if errors.Is(err, blob.ErrSetTimeUnsupported) {
|
||||
return
|
||||
t.Skip("setting time unsupported")
|
||||
}
|
||||
|
||||
bm, err := r.GetMetadata(ctx, b.blk)
|
||||
require.NoError(t, err)
|
||||
|
||||
md, err := r.GetMetadata(ctx, b.blk)
|
||||
if err != nil {
|
||||
t.Errorf("unable to get blob metadata")
|
||||
}
|
||||
AssertTimestampsCloseEnough(t, bm.BlobID, bm.Timestamp, inTime)
|
||||
|
||||
require.True(t, md.Timestamp.Equal(ts), "invalid time after SetTme(): %vm want %v", md.Timestamp, ts)
|
||||
all, err := blob.ListAllBlobs(ctx, r, b.blk)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, all, 1)
|
||||
|
||||
AssertTimestampsCloseEnough(t, all[0].BlobID, all[0].Timestamp, inTime)
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
require.NoError(t, r.DeleteBlob(ctx, blocks[0].blk))
|
||||
require.NoError(t, r.DeleteBlob(ctx, blocks[0].blk))
|
||||
t.Run("PutBlobsWithGetTime", func(t *testing.T) {
|
||||
for _, b := range blocks {
|
||||
b := b
|
||||
|
||||
AssertListResults(ctx, t, r, "ab", blocks[2].blk, blocks[3].blk)
|
||||
AssertListResults(ctx, t, r, "", blocks[1].blk, blocks[2].blk, blocks[3].blk, blocks[4].blk)
|
||||
t.Run(string(b.blk), func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var outTime time.Time
|
||||
|
||||
require.NoError(t, r.PutBlob(ctx, b.blk, gather.FromSlice(b.contents), blob.PutOptions{
|
||||
GetModTime: &outTime,
|
||||
}))
|
||||
|
||||
require.False(t, outTime.IsZero(), "modification time was not returned")
|
||||
|
||||
bm, err := r.GetMetadata(ctx, b.blk)
|
||||
require.NoError(t, err)
|
||||
|
||||
AssertTimestampsCloseEnough(t, bm.BlobID, bm.Timestamp, outTime)
|
||||
|
||||
all, err := blob.ListAllBlobs(ctx, r, b.blk)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, all, 1)
|
||||
|
||||
AssertTimestampsCloseEnough(t, all[0].BlobID, all[0].Timestamp, outTime)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// AssertConnectionInfoRoundTrips verifies that the ConnectionInfo returned by a given storage can be used to create
|
||||
|
||||
@@ -770,17 +770,12 @@ func (e *Manager) writeIndexShards(ctx context.Context, dataShards map[blob.ID]b
|
||||
return errWriteIndexTryAgain
|
||||
}
|
||||
|
||||
if err := e.st.PutBlob(ctx, blobID, data, blob.PutOptions{}); err != nil {
|
||||
bm, err := blob.PutBlobAndGetMetadata(ctx, e.st, blobID, data, blob.PutOptions{})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error writing index blob")
|
||||
}
|
||||
|
||||
bm, err := e.st.GetMetadata(ctx, blobID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error getting index metadata")
|
||||
}
|
||||
|
||||
e.log.Debugw("wrote-index-shard",
|
||||
"metadata", bm)
|
||||
e.log.Debugw("wrote-index-shard", "metadata", bm)
|
||||
|
||||
written[bm.BlobID] = bm
|
||||
}
|
||||
|
||||
@@ -125,6 +125,8 @@ func (s *CacheStorage) ListBlobs(ctx context.Context, prefix blob.ID, cb func(bl
|
||||
func (s *CacheStorage) PutBlob(ctx context.Context, blobID blob.ID, data blob.Bytes, opts blob.PutOptions) error {
|
||||
err := s.Storage.PutBlob(ctx, blobID, data, opts)
|
||||
if err == nil && s.isCachedPrefix(blobID) {
|
||||
opts.GetModTime = nil
|
||||
|
||||
// nolint:errcheck
|
||||
s.cacheStorage.PutBlob(ctx, prefixAdd+blobID, markerData, opts)
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
cryptorand "crypto/rand"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -52,6 +53,10 @@ type Options struct {
|
||||
// it can be used with Kopia.
|
||||
// nolint:gomnd,funlen,gocyclo
|
||||
func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error {
|
||||
if os.Getenv("KOPIA_SKIP_PROVIDER_VALIDATION") != "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
uberPrefix := blob.ID("z" + uuid.NewString())
|
||||
defer cleanupAllBlobs(ctx, st, uberPrefix)
|
||||
|
||||
|
||||
30
internal/timestampmeta/timestampmeta.go
Normal file
30
internal/timestampmeta/timestampmeta.go
Normal file
@@ -0,0 +1,30 @@
|
||||
// Package timestampmeta provides utilities for preserving timestamps
|
||||
// using per-blob key-value-pairs (metadata, tags, etc.)
|
||||
package timestampmeta
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ToMap returns a map containing single entry representing the provided time or nil map
|
||||
// if the time is zero. The key-value pair map should be stored alongside the blob.
|
||||
func ToMap(t time.Time, mapKey string) map[string]string {
|
||||
if t.IsZero() {
|
||||
return nil
|
||||
}
|
||||
|
||||
return map[string]string{
|
||||
mapKey: strconv.FormatInt(t.UnixNano(), 10), // nolint:gomnd
|
||||
}
|
||||
}
|
||||
|
||||
// FromValue attempts to convert the provided value stored in metadata into time.Time.
|
||||
func FromValue(v string) (t time.Time, ok bool) {
|
||||
nanos, err := strconv.ParseInt(v, 10, 64) // nolint:gomnd
|
||||
if err != nil {
|
||||
return time.Time{}, false
|
||||
}
|
||||
|
||||
return time.Unix(0, nanos), true
|
||||
}
|
||||
23
internal/timestampmeta/timestampmeta_test.go
Normal file
23
internal/timestampmeta/timestampmeta_test.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package timestampmeta_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/kopia/kopia/internal/timestampmeta"
|
||||
)
|
||||
|
||||
var (
|
||||
timeValue = time.Date(2020, 1, 2, 3, 4, 5, 0, time.UTC)
|
||||
storedValue = "1577934245000000000"
|
||||
)
|
||||
|
||||
func TestToMap(t *testing.T) {
|
||||
require.Equal(t, map[string]string{
|
||||
"aaa": storedValue,
|
||||
}, timestampmeta.ToMap(timeValue, "aaa"))
|
||||
|
||||
require.Nil(t, timestampmeta.ToMap(time.Time{}, "aaa"))
|
||||
}
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@@ -11,12 +12,15 @@
|
||||
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/iocopy"
|
||||
"github.com/kopia/kopia/internal/timestampmeta"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/blob/retrying"
|
||||
)
|
||||
|
||||
const (
|
||||
azStorageType = "azureBlob"
|
||||
|
||||
timeMapKey = "Kopiamtime" // this must be capital letter followed by lowercase, to comply with AZ tags naming convention.
|
||||
)
|
||||
|
||||
type azStorage struct {
|
||||
@@ -70,16 +74,22 @@ func (az *azStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int6
|
||||
func (az *azStorage) GetMetadata(ctx context.Context, b blob.ID) (blob.Metadata, error) {
|
||||
bc := az.bucket.NewBlockBlobClient(az.getObjectNameString(b))
|
||||
|
||||
fi, err := bc.GetProperties(ctx, nil)
|
||||
fi, err := bc.GetProperties(ctx, &azblob.GetBlobPropertiesOptions{})
|
||||
if err != nil {
|
||||
return blob.Metadata{}, errors.Wrap(translateError(err), "Attributes")
|
||||
}
|
||||
|
||||
return blob.Metadata{
|
||||
bm := blob.Metadata{
|
||||
BlobID: b,
|
||||
Length: *fi.ContentLength,
|
||||
Timestamp: *fi.LastModified,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if t, ok := timestampmeta.FromValue(fi.Metadata[timeMapKey]); ok {
|
||||
bm.Timestamp = t
|
||||
}
|
||||
|
||||
return bm, nil
|
||||
}
|
||||
|
||||
func translateError(err error) error {
|
||||
@@ -112,13 +122,20 @@ func (az *azStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes, op
|
||||
|
||||
bc := az.bucket.NewBlockBlobClient(az.getObjectNameString(b))
|
||||
|
||||
_, err := bc.Upload(ctx, data.Reader(), &azblob.UploadBlockBlobOptions{})
|
||||
ubo := &azblob.UploadBlockBlobOptions{
|
||||
Metadata: timestampmeta.ToMap(opts.SetModTime, timeMapKey),
|
||||
}
|
||||
|
||||
return translateError(err)
|
||||
}
|
||||
resp, err := bc.Upload(ctx, data.Reader(), ubo)
|
||||
if err != nil {
|
||||
return translateError(err)
|
||||
}
|
||||
|
||||
func (az *azStorage) SetTime(ctx context.Context, b blob.ID, t time.Time) error {
|
||||
return blob.ErrSetTimeUnsupported
|
||||
if opts.GetModTime != nil {
|
||||
*opts.GetModTime = *resp.LastModified
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteBlob deletes azure blob from container with given ID.
|
||||
@@ -144,16 +161,53 @@ func (az *azStorage) ListBlobs(ctx context.Context, prefix blob.ID, callback fun
|
||||
|
||||
pager := az.bucket.ListBlobsFlat(&azblob.ContainerListBlobFlatSegmentOptions{
|
||||
Prefix: &prefixStr,
|
||||
Include: []azblob.ListBlobsIncludeItem{
|
||||
"[" + azblob.ListBlobsIncludeItemMetadata + "]",
|
||||
},
|
||||
})
|
||||
|
||||
for pager.NextPage(ctx) {
|
||||
resp := pager.PageResponse()
|
||||
|
||||
for _, it := range resp.ContainerListBlobFlatSegmentResult.Segment.BlobItems {
|
||||
// workaround for the XML parsing bug reported upstream
|
||||
// https://github.com/Azure/azure-sdk-for-go/issues/16679
|
||||
var enumerationResults struct {
|
||||
Blobs struct {
|
||||
Blob []struct {
|
||||
Name string
|
||||
Properties struct {
|
||||
ContentLength int64 `xml:"Content-Length"`
|
||||
LastModified string `xml:"Last-Modified"`
|
||||
}
|
||||
Metadata struct {
|
||||
Kopiamtime string
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
dec := xml.NewDecoder(resp.RawResponse.Body)
|
||||
if err := dec.Decode(&enumerationResults); err != nil {
|
||||
return errors.Wrap(err, "unable to decode response")
|
||||
}
|
||||
|
||||
for _, it := range enumerationResults.Blobs.Blob {
|
||||
bm := blob.Metadata{
|
||||
BlobID: blob.ID((*it.Name)[len(az.Prefix):]),
|
||||
Length: *it.Properties.ContentLength,
|
||||
Timestamp: *it.Properties.LastModified,
|
||||
BlobID: blob.ID(it.Name[len(az.Prefix):]),
|
||||
Length: it.Properties.ContentLength,
|
||||
}
|
||||
|
||||
// see if we have 'Kopiamtime' metadata, if so - trust it.
|
||||
if t, ok := timestampmeta.FromValue(it.Metadata.Kopiamtime); ok {
|
||||
bm.Timestamp = t
|
||||
} else {
|
||||
// fall back to using last modified time.
|
||||
t, err := time.Parse(time.RFC1123, it.Properties.LastModified)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "invalid timestamp for BLOB '%v': %q", bm.BlobID, it.Properties.LastModified)
|
||||
}
|
||||
|
||||
bm.Timestamp = t
|
||||
}
|
||||
|
||||
if err := callback(bm); err != nil {
|
||||
|
||||
@@ -13,12 +13,15 @@
|
||||
"gopkg.in/kothar/go-backblaze.v0"
|
||||
|
||||
"github.com/kopia/kopia/internal/iocopy"
|
||||
"github.com/kopia/kopia/internal/timestampmeta"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/blob/retrying"
|
||||
)
|
||||
|
||||
const (
|
||||
b2storageType = "b2"
|
||||
|
||||
timeMapKey = "kopia-mtime" // case is important, must be all-lowercase
|
||||
)
|
||||
|
||||
type b2Storage struct {
|
||||
@@ -99,11 +102,17 @@ func (s *b2Storage) GetMetadata(ctx context.Context, id blob.ID) (blob.Metadata,
|
||||
return blob.Metadata{}, errors.Wrap(translateError(err), "GetFileInfo")
|
||||
}
|
||||
|
||||
return blob.Metadata{
|
||||
bm := blob.Metadata{
|
||||
BlobID: id,
|
||||
Length: fi.ContentLength,
|
||||
Timestamp: time.Unix(0, fi.UploadTimestamp*1e6),
|
||||
}, nil
|
||||
}
|
||||
|
||||
if t, ok := timestampmeta.FromValue(fi.FileInfo[timeMapKey]); ok {
|
||||
bm.Timestamp = t
|
||||
}
|
||||
|
||||
return bm, nil
|
||||
}
|
||||
|
||||
func translateError(err error) error {
|
||||
@@ -149,13 +158,16 @@ func (s *b2Storage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes, op
|
||||
r = http.NoBody
|
||||
}
|
||||
|
||||
_, err := s.bucket.UploadFile(fileName, nil, r)
|
||||
fi, err := s.bucket.UploadFile(fileName, timestampmeta.ToMap(opts.SetModTime, timeMapKey), r)
|
||||
if err != nil {
|
||||
return translateError(err)
|
||||
}
|
||||
|
||||
return translateError(err)
|
||||
}
|
||||
if opts.GetModTime != nil {
|
||||
*opts.GetModTime = time.Unix(0, fi.UploadTimestamp*1e6)
|
||||
}
|
||||
|
||||
func (s *b2Storage) SetTime(ctx context.Context, b blob.ID, t time.Time) error {
|
||||
return blob.ErrSetTimeUnsupported
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *b2Storage) DeleteBlob(ctx context.Context, id blob.ID) error {
|
||||
@@ -195,6 +207,10 @@ func (s *b2Storage) ListBlobs(ctx context.Context, prefix blob.ID, callback func
|
||||
Timestamp: time.Unix(0, f.UploadTimestamp*int64(time.Millisecond)),
|
||||
}
|
||||
|
||||
if t, ok := timestampmeta.FromValue(f.FileInfo[timeMapKey]); ok {
|
||||
bm.Timestamp = t
|
||||
}
|
||||
|
||||
if err := callback(bm); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -149,7 +149,7 @@ func (fs *fsImpl) GetMetadataFromPath(ctx context.Context, dirPath, path string)
|
||||
return v.(blob.Metadata), nil
|
||||
}
|
||||
|
||||
func (fs *fsImpl) PutBlobInPath(ctx context.Context, dirPath, path string, data blob.Bytes) error {
|
||||
func (fs *fsImpl) PutBlobInPath(ctx context.Context, dirPath, path string, data blob.Bytes, opts blob.PutOptions) error {
|
||||
// nolint:wrapcheck
|
||||
return retry.WithExponentialBackoffNoValue(ctx, "PutBlobInPath:"+path, func() error {
|
||||
randSuffix := make([]byte, tempFileRandomSuffixLen)
|
||||
@@ -188,6 +188,21 @@ func (fs *fsImpl) PutBlobInPath(ctx context.Context, dirPath, path string, data
|
||||
}
|
||||
}
|
||||
|
||||
if t := opts.SetModTime; !t.IsZero() {
|
||||
if chtimesErr := fs.osi.Chtimes(path, t, t); err != nil {
|
||||
return errors.Wrapf(chtimesErr, "can't change file %q times", path)
|
||||
}
|
||||
}
|
||||
|
||||
if t := opts.GetModTime; t != nil {
|
||||
fi, err := fs.osi.Stat(path)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "can't get mod time for file %q", path)
|
||||
}
|
||||
|
||||
*t = fi.ModTime()
|
||||
}
|
||||
|
||||
return nil
|
||||
}, fs.isRetriable)
|
||||
}
|
||||
@@ -256,14 +271,6 @@ func (fs *fsImpl) ReadDir(ctx context.Context, dirname string) ([]os.FileInfo, e
|
||||
return fileInfos, nil
|
||||
}
|
||||
|
||||
// SetTime updates file modification time to the provided time.
|
||||
func (fs *fsImpl) SetTimeInPath(ctx context.Context, dirPath, filePath string, n time.Time) error {
|
||||
log(ctx).Debugf("updating timestamp on %v to %v", filePath, n)
|
||||
|
||||
// nolint:wrapcheck
|
||||
return fs.osi.Chtimes(filePath, n, n)
|
||||
}
|
||||
|
||||
// TouchBlob updates file modification time to current time if it's sufficiently old.
|
||||
func (fs *fsStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) error {
|
||||
// nolint:wrapcheck
|
||||
|
||||
@@ -251,6 +251,7 @@ func TestFileStorage_PutBlob_RetriesOnErrors(t *testing.T) {
|
||||
renameRemainingErrors: 1,
|
||||
removeRemainingRetriableErrors: 3,
|
||||
chownRemainingErrors: 3,
|
||||
chtimesRemainingErrors: 3,
|
||||
|
||||
effectiveUID: 0, // running as root
|
||||
}
|
||||
@@ -279,6 +280,16 @@ func TestFileStorage_PutBlob_RetriesOnErrors(t *testing.T) {
|
||||
|
||||
require.NoError(t, st.GetBlob(ctx, "someblob1234567812345678", 1, 2, &buf))
|
||||
require.Equal(t, []byte{2, 3}, buf.ToByteSlice())
|
||||
|
||||
var mt time.Time
|
||||
|
||||
require.NoError(t, st.PutBlob(ctx, "someblob1234567812345678", gather.FromSlice([]byte{1, 2, 3}), blob.PutOptions{
|
||||
GetModTime: &mt,
|
||||
}))
|
||||
|
||||
require.NoError(t, st.PutBlob(ctx, "someblob1234567812345678", gather.FromSlice([]byte{1, 2, 3}), blob.PutOptions{
|
||||
SetModTime: time.Date(2020, 1, 1, 12, 0, 0, 0, time.UTC),
|
||||
}))
|
||||
}
|
||||
|
||||
func TestFileStorage_DeleteBlob_ErrorHandling(t *testing.T) {
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
"io/fs"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
@@ -26,6 +27,7 @@ type mockOS struct {
|
||||
readDirRemainingFileDeletedDirEntry int32
|
||||
readDirRemainingFatalDirEntry int32
|
||||
statRemainingErrors int32
|
||||
chtimesRemainingErrors int32
|
||||
|
||||
effectiveUID int
|
||||
|
||||
@@ -98,6 +100,14 @@ func (osi *mockOS) Stat(fname string) (fs.FileInfo, error) {
|
||||
return osi.osInterface.Stat(fname)
|
||||
}
|
||||
|
||||
func (osi *mockOS) Chtimes(fname string, atime, mtime time.Time) error {
|
||||
if atomic.AddInt32(&osi.chtimesRemainingErrors, -1) >= 0 {
|
||||
return &os.PathError{Op: "chtimes", Err: errors.Errorf("underlying problem")}
|
||||
}
|
||||
|
||||
return osi.osInterface.Chtimes(fname, atime, mtime)
|
||||
}
|
||||
|
||||
func (osi *mockOS) Chown(fname string, uid, gid int) error {
|
||||
if atomic.AddInt32(&osi.chownRemainingErrors, -1) >= 0 {
|
||||
return &os.PathError{Op: "chown", Err: errors.Errorf("underlying problem")}
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
gcsclient "cloud.google.com/go/storage"
|
||||
"github.com/pkg/errors"
|
||||
@@ -19,6 +18,7 @@
|
||||
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/iocopy"
|
||||
"github.com/kopia/kopia/internal/timestampmeta"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/blob/retrying"
|
||||
)
|
||||
@@ -26,6 +26,8 @@
|
||||
const (
|
||||
gcsStorageType = "gcs"
|
||||
writerChunkSize = 1 << 20
|
||||
|
||||
timeMapKey = "Kopia-Mtime" // case is important, first letter must be capitalized.
|
||||
)
|
||||
|
||||
type gcsStorage struct {
|
||||
@@ -65,11 +67,17 @@ func (gcs *gcsStorage) GetMetadata(ctx context.Context, b blob.ID) (blob.Metadat
|
||||
return blob.Metadata{}, errors.Wrap(translateError(err), "Attrs")
|
||||
}
|
||||
|
||||
return blob.Metadata{
|
||||
bm := blob.Metadata{
|
||||
BlobID: b,
|
||||
Length: attrs.Size,
|
||||
Timestamp: attrs.Created,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if t, ok := timestampmeta.FromValue(attrs.Metadata[timeMapKey]); ok {
|
||||
bm.Timestamp = t
|
||||
}
|
||||
|
||||
return bm, nil
|
||||
}
|
||||
|
||||
func translateError(err error) error {
|
||||
@@ -102,6 +110,7 @@ func (gcs *gcsStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes,
|
||||
writer := obj.NewWriter(ctx)
|
||||
writer.ChunkSize = writerChunkSize
|
||||
writer.ContentType = "application/x-kopia"
|
||||
writer.ObjectAttrs.Metadata = timestampmeta.ToMap(opts.SetModTime, timeMapKey)
|
||||
|
||||
err := iocopy.JustCopy(writer, data.Reader())
|
||||
if err != nil {
|
||||
@@ -116,11 +125,15 @@ func (gcs *gcsStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes,
|
||||
defer cancel()
|
||||
|
||||
// calling close before cancel() causes it to commit the upload.
|
||||
return translateError(writer.Close())
|
||||
}
|
||||
if err := writer.Close(); err != nil {
|
||||
return translateError(err)
|
||||
}
|
||||
|
||||
func (gcs *gcsStorage) SetTime(ctx context.Context, b blob.ID, t time.Time) error {
|
||||
return blob.ErrSetTimeUnsupported
|
||||
if opts.GetModTime != nil {
|
||||
*opts.GetModTime = writer.Attrs().Updated
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gcs *gcsStorage) DeleteBlob(ctx context.Context, b blob.ID) error {
|
||||
@@ -143,11 +156,17 @@ func (gcs *gcsStorage) ListBlobs(ctx context.Context, prefix blob.ID, callback f
|
||||
|
||||
oa, err := lst.Next()
|
||||
for err == nil {
|
||||
if cberr := callback(blob.Metadata{
|
||||
bm := blob.Metadata{
|
||||
BlobID: blob.ID(oa.Name[len(gcs.Prefix):]),
|
||||
Length: oa.Size,
|
||||
Timestamp: oa.Created,
|
||||
}); cberr != nil {
|
||||
}
|
||||
|
||||
if t, ok := timestampmeta.FromValue(oa.Metadata[timeMapKey]); ok {
|
||||
bm.Timestamp = t
|
||||
}
|
||||
|
||||
if cberr := callback(bm); cberr != nil {
|
||||
return cberr
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/internal/timetrack"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
@@ -66,22 +65,6 @@ func (s *loggingStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Byte
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *loggingStorage) SetTime(ctx context.Context, id blob.ID, t time.Time) error {
|
||||
timer := timetrack.StartTimer()
|
||||
err := s.base.SetTime(ctx, id, t)
|
||||
dt := timer.Elapsed()
|
||||
|
||||
s.logger.Debugw(s.prefix+"SetTime",
|
||||
"blobID", id,
|
||||
"time", t,
|
||||
"error", err,
|
||||
"duration", dt,
|
||||
)
|
||||
|
||||
// nolint:wrapcheck
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *loggingStorage) DeleteBlob(ctx context.Context, id blob.ID) error {
|
||||
timer := timetrack.StartTimer()
|
||||
err := s.base.DeleteBlob(ctx, id)
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
@@ -28,10 +27,6 @@ func (s readonlyStorage) GetMetadata(ctx context.Context, id blob.ID) (blob.Meta
|
||||
return s.base.GetMetadata(ctx, id)
|
||||
}
|
||||
|
||||
func (s readonlyStorage) SetTime(ctx context.Context, id blob.ID, t time.Time) error {
|
||||
return ErrReadonly
|
||||
}
|
||||
|
||||
func (s readonlyStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes, opts blob.PutOptions) error {
|
||||
return ErrReadonly
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/internal/retry"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
@@ -38,15 +37,6 @@ func (s retryingStorage) GetMetadata(ctx context.Context, id blob.ID) (blob.Meta
|
||||
return v.(blob.Metadata), nil
|
||||
}
|
||||
|
||||
func (s retryingStorage) SetTime(ctx context.Context, id blob.ID, t time.Time) error {
|
||||
_, err := retry.WithExponentialBackoff(ctx, "GetMetadata("+string(id)+")", func() (interface{}, error) {
|
||||
// nolint:wrapcheck
|
||||
return true, s.Storage.SetTime(ctx, id, t)
|
||||
}, isRetriable)
|
||||
|
||||
return err // nolint:wrapcheck
|
||||
}
|
||||
|
||||
func (s retryingStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes, opts blob.PutOptions) error {
|
||||
_, err := retry.WithExponentialBackoff(ctx, "PutBlob("+string(id)+")", func() (interface{}, error) {
|
||||
// nolint:wrapcheck
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
@@ -26,7 +25,6 @@ func TestRetrying(t *testing.T) {
|
||||
fs.AddFault(blobtesting.MethodGetBlob).ErrorInstead(someError)
|
||||
fs.AddFault(blobtesting.MethodGetMetadata).ErrorInstead(someError)
|
||||
fs.AddFault(blobtesting.MethodDeleteBlob).ErrorInstead(someError)
|
||||
fs.AddFault(blobtesting.MethodSetTime).ErrorInstead(someError)
|
||||
|
||||
rs := retrying.NewWrapper(fs)
|
||||
blobID := blob.ID("deadcafe")
|
||||
@@ -36,8 +34,6 @@ func TestRetrying(t *testing.T) {
|
||||
|
||||
require.NoError(t, rs.PutBlob(ctx, blobID2, gather.FromSlice([]byte{1, 2, 3, 4}), blob.PutOptions{}))
|
||||
|
||||
require.NoError(t, rs.SetTime(ctx, blobID, clock.Now()))
|
||||
|
||||
var tmp gather.WriteBuffer
|
||||
defer tmp.Close()
|
||||
|
||||
@@ -62,12 +58,4 @@ func TestRetrying(t *testing.T) {
|
||||
}
|
||||
|
||||
fs.VerifyAllFaultsExercised(t)
|
||||
|
||||
fs.AddFault(blobtesting.MethodSetTime).ErrorInstead(blob.ErrSetTimeUnsupported)
|
||||
|
||||
if err := rs.SetTime(ctx, blobID, clock.Now()); !errors.Is(err, blob.ErrSetTimeUnsupported) {
|
||||
t.Fatalf("unexpected error from SetTime: %v", err)
|
||||
}
|
||||
|
||||
fs.VerifyAllFaultsExercised(t)
|
||||
}
|
||||
|
||||
@@ -122,8 +122,21 @@ func (s *s3Storage) getVersionMetadata(ctx context.Context, b blob.ID, version s
|
||||
}
|
||||
|
||||
func (s *s3Storage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes, opts blob.PutOptions) error {
|
||||
if !opts.SetModTime.IsZero() {
|
||||
return blob.ErrSetTimeUnsupported
|
||||
}
|
||||
|
||||
_, err := s.putBlob(ctx, b, data, opts)
|
||||
|
||||
if opts.GetModTime != nil {
|
||||
bm, err2 := s.GetMetadata(ctx, b)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
|
||||
*opts.GetModTime = bm.Timestamp
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -185,10 +198,6 @@ func (s *s3Storage) putBlob(ctx context.Context, b blob.ID, data blob.Bytes, opt
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *s3Storage) SetTime(ctx context.Context, b blob.ID, t time.Time) error {
|
||||
return blob.ErrSetTimeUnsupported
|
||||
}
|
||||
|
||||
func (s *s3Storage) DeleteBlob(ctx context.Context, b blob.ID) error {
|
||||
err := translateError(s.cli.RemoveObject(ctx, s.BucketName, s.getObjectNameString(b), minio.RemoveObjectOptions{}))
|
||||
if errors.Is(err, blob.ErrBlobNotFound) {
|
||||
|
||||
@@ -96,12 +96,14 @@ func toBlobID(blobName, prefix string) blob.ID {
|
||||
}
|
||||
|
||||
func infoToVersionMetadata(prefix string, oi *minio.ObjectInfo) versionMetadata {
|
||||
bm := blob.Metadata{
|
||||
BlobID: toBlobID(oi.Key, prefix),
|
||||
Length: oi.Size,
|
||||
Timestamp: oi.LastModified,
|
||||
}
|
||||
|
||||
return versionMetadata{
|
||||
Metadata: blob.Metadata{
|
||||
BlobID: toBlobID(oi.Key, prefix),
|
||||
Length: oi.Size,
|
||||
Timestamp: oi.LastModified,
|
||||
},
|
||||
Metadata: bm,
|
||||
IsLatest: oi.IsLatest,
|
||||
IsDeleteMarker: oi.IsDeleteMarker,
|
||||
Version: oi.VersionID,
|
||||
|
||||
@@ -11,7 +11,6 @@
|
||||
"os/exec"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/pkg/sftp"
|
||||
@@ -170,7 +169,7 @@ func (s *sftpImpl) GetMetadataFromPath(ctx context.Context, dirPath, fullPath st
|
||||
return v.(blob.Metadata), nil
|
||||
}
|
||||
|
||||
func (s *sftpImpl) PutBlobInPath(ctx context.Context, dirPath, fullPath string, data blob.Bytes) error {
|
||||
func (s *sftpImpl) PutBlobInPath(ctx context.Context, dirPath, fullPath string, data blob.Bytes, opts blob.PutOptions) error {
|
||||
// nolint:wrapcheck
|
||||
return s.rec.UsingConnectionNoResult(ctx, "PutBlobInPath", func(conn connection.Connection) error {
|
||||
randSuffix := make([]byte, tempFileRandomSuffixLen)
|
||||
@@ -202,15 +201,22 @@ func (s *sftpImpl) PutBlobInPath(ctx context.Context, dirPath, fullPath string,
|
||||
return errors.Wrap(err, "unexpected error renaming file on SFTP")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if t := opts.SetModTime; !t.IsZero() {
|
||||
if chtimesErr := sftpClientFromConnection(conn).Chtimes(fullPath, t, t); err != nil {
|
||||
return errors.Wrap(chtimesErr, "can't change file times")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *sftpImpl) SetTimeInPath(ctx context.Context, dirPath, fullPath string, n time.Time) error {
|
||||
// nolint:wrapcheck
|
||||
return s.rec.UsingConnectionNoResult(ctx, "SetTimeInPath", func(conn connection.Connection) error {
|
||||
// nolint:wrapcheck
|
||||
return sftpClientFromConnection(conn).Chtimes(fullPath, n, n)
|
||||
if t := opts.GetModTime; t != nil {
|
||||
fi, err := sftpClientFromConnection(conn).Stat(fullPath)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't get mod time")
|
||||
}
|
||||
|
||||
*t = fi.ModTime()
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/sync/errgroup"
|
||||
@@ -27,8 +26,7 @@
|
||||
type Impl interface {
|
||||
GetBlobFromPath(ctx context.Context, dirPath, filePath string, offset, length int64, output blob.OutputBuffer) error
|
||||
GetMetadataFromPath(ctx context.Context, dirPath, filePath string) (blob.Metadata, error)
|
||||
PutBlobInPath(ctx context.Context, dirPath, filePath string, dataSlices blob.Bytes) error
|
||||
SetTimeInPath(ctx context.Context, dirPath, filePath string, t time.Time) error
|
||||
PutBlobInPath(ctx context.Context, dirPath, filePath string, dataSlices blob.Bytes, opts blob.PutOptions) error
|
||||
DeleteBlobInPath(ctx context.Context, dirPath, filePath string) error
|
||||
ReadDir(ctx context.Context, path string) ([]os.FileInfo, error)
|
||||
}
|
||||
@@ -190,18 +188,7 @@ func (s *Storage) PutBlob(ctx context.Context, blobID blob.ID, data blob.Bytes,
|
||||
}
|
||||
|
||||
// nolint:wrapcheck
|
||||
return s.Impl.PutBlobInPath(ctx, dirPath, filePath, data)
|
||||
}
|
||||
|
||||
// SetTime implements blob.Storage.
|
||||
func (s *Storage) SetTime(ctx context.Context, blobID blob.ID, n time.Time) error {
|
||||
dirPath, filePath, err := s.GetShardedPathAndFilePath(ctx, blobID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error determining sharded path")
|
||||
}
|
||||
|
||||
// nolint:wrapcheck
|
||||
return s.Impl.SetTimeInPath(ctx, dirPath, filePath, n)
|
||||
return s.Impl.PutBlobInPath(ctx, dirPath, filePath, data, opts)
|
||||
}
|
||||
|
||||
// DeleteBlob implements blob.Storage.
|
||||
@@ -243,7 +230,7 @@ func (s *Storage) getParameters(ctx context.Context) (*Parameters, error) {
|
||||
return nil, errors.Wrap(err, "error serializing sharding parameters")
|
||||
}
|
||||
|
||||
if err := s.Impl.PutBlobInPath(ctx, s.RootPath, dotShardsFile, tmp.Bytes()); err != nil {
|
||||
if err := s.Impl.PutBlobInPath(ctx, s.RootPath, dotShardsFile, tmp.Bytes(), blob.PutOptions{}); err != nil {
|
||||
log(ctx).Warnf("unable to persist sharding parameters: %v", err)
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -61,6 +61,11 @@ type Reader interface {
|
||||
type PutOptions struct {
|
||||
RetentionMode string
|
||||
RetentionPeriod time.Duration
|
||||
|
||||
// if not empty, set the provided timestamp on the blob instead of server-assigned,
|
||||
// if unsupported by the server return ErrSetTimeUnsupported
|
||||
SetModTime time.Time
|
||||
GetModTime *time.Time // if != nil, populate the value pointed at with the actual modification time
|
||||
}
|
||||
|
||||
// HasRetentionOptions returns true when blob-retention settings have been
|
||||
@@ -87,9 +92,6 @@ type Storage interface {
|
||||
// id with contents gathered from the specified list of slices.
|
||||
PutBlob(ctx context.Context, blobID ID, data Bytes, opts PutOptions) error
|
||||
|
||||
// SetTime changes last modification time of a given blob, if supported, returns ErrSetTimeUnsupported otherwise.
|
||||
SetTime(ctx context.Context, blobID ID, t time.Time) error
|
||||
|
||||
// DeleteBlob removes the blob from storage. Future Get() operations will fail with ErrNotFound.
|
||||
DeleteBlob(ctx context.Context, blobID ID) error
|
||||
|
||||
@@ -258,3 +260,19 @@ func DeleteMultiple(ctx context.Context, st Storage, ids []ID, parallelism int)
|
||||
|
||||
return errors.Wrap(eg.Wait(), "error deleting blobs")
|
||||
}
|
||||
|
||||
// PutBlobAndGetMetadata invokes PutBlob and returns the resulting Metadata.
|
||||
func PutBlobAndGetMetadata(ctx context.Context, st Storage, blobID ID, data Bytes, opts PutOptions) (Metadata, error) {
|
||||
// ensure GetModTime is set, or reuse existing one.
|
||||
if opts.GetModTime == nil {
|
||||
opts.GetModTime = new(time.Time)
|
||||
}
|
||||
|
||||
err := st.PutBlob(ctx, blobID, data, opts)
|
||||
|
||||
return Metadata{
|
||||
BlobID: blobID,
|
||||
Length: int64(data.Length()),
|
||||
Timestamp: *opts.GetModTime,
|
||||
}, err // nolint:wrapcheck
|
||||
}
|
||||
|
||||
@@ -185,3 +185,17 @@ func TestMetataJSONString(t *testing.T) {
|
||||
|
||||
require.Equal(t, `{"id":"foo","length":12345,"timestamp":"2000-01-02T03:04:05.000000006Z"}`, bm.String())
|
||||
}
|
||||
|
||||
func TestPutBlobAndGetMetadata(t *testing.T) {
|
||||
data := blobtesting.DataMap{}
|
||||
|
||||
fixedTime := time.Date(2000, 1, 2, 3, 4, 5, 6, time.UTC)
|
||||
|
||||
st := blobtesting.NewMapStorage(data, nil, func() time.Time {
|
||||
return fixedTime
|
||||
})
|
||||
|
||||
bm, err := blob.PutBlobAndGetMetadata(context.Background(), st, "foo", gather.FromSlice([]byte{1, 2, 3}), blob.PutOptions{})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, fixedTime, bm.Timestamp)
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
)
|
||||
@@ -18,7 +17,6 @@
|
||||
operationGetBlob = "GetBlob"
|
||||
operationGetMetadata = "GetMetadata"
|
||||
operationListBlobs = "ListBlobs"
|
||||
operationSetTime = "SetTime"
|
||||
operationPutBlob = "PutBlob"
|
||||
operationDeleteBlob = "DeleteBlob"
|
||||
)
|
||||
@@ -84,11 +82,6 @@ func (s *throttlingStorage) ListBlobs(ctx context.Context, blobIDPrefix blob.ID,
|
||||
return s.Storage.ListBlobs(ctx, blobIDPrefix, cb) // nolint:wrapcheck
|
||||
}
|
||||
|
||||
func (s *throttlingStorage) SetTime(ctx context.Context, id blob.ID, t time.Time) error {
|
||||
s.throttler.BeforeOperation(ctx, operationSetTime)
|
||||
return s.Storage.SetTime(ctx, id, t) // nolint:wrapcheck
|
||||
}
|
||||
|
||||
func (s *throttlingStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes, opts blob.PutOptions) error {
|
||||
s.throttler.BeforeOperation(ctx, operationPutBlob)
|
||||
s.throttler.BeforeUpload(ctx, int64(data.Length()))
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
@@ -121,13 +120,6 @@ func TestThrottling(t *testing.T) {
|
||||
"inner.GetMetadata",
|
||||
}, m.activity)
|
||||
|
||||
m.Reset()
|
||||
require.NoError(t, wrapped.SetTime(ctx, "blob1", clock.Now()))
|
||||
require.Equal(t, []string{
|
||||
"BeforeOperation(SetTime)",
|
||||
"inner.SetTime",
|
||||
}, m.activity)
|
||||
|
||||
m.Reset()
|
||||
require.NoError(t, wrapped.DeleteBlob(ctx, "blob1"))
|
||||
require.Equal(t, []string{
|
||||
|
||||
@@ -11,7 +11,6 @@
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/studio-b12/gowebdav"
|
||||
@@ -134,7 +133,11 @@ func (d *davStorageImpl) ReadDir(ctx context.Context, dir string) ([]os.FileInfo
|
||||
return nil, errors.Wrap(err, "error reading WebDAV dir")
|
||||
}
|
||||
|
||||
func (d *davStorageImpl) PutBlobInPath(ctx context.Context, dirPath, filePath string, data blob.Bytes) error {
|
||||
func (d *davStorageImpl) PutBlobInPath(ctx context.Context, dirPath, filePath string, data blob.Bytes, opts blob.PutOptions) error {
|
||||
if !opts.SetModTime.IsZero() {
|
||||
return blob.ErrSetTimeUnsupported
|
||||
}
|
||||
|
||||
var writePath string
|
||||
|
||||
if d.Options.AtomicWrites {
|
||||
@@ -150,7 +153,7 @@ func (d *davStorageImpl) PutBlobInPath(ctx context.Context, dirPath, filePath st
|
||||
b := buf.Bytes()
|
||||
|
||||
// nolint:wrapcheck
|
||||
return retry.WithExponentialBackoffNoValue(ctx, "WriteTemporaryFileAndCreateParentDirs", func() error {
|
||||
if err := retry.WithExponentialBackoffNoValue(ctx, "WriteTemporaryFileAndCreateParentDirs", func() error {
|
||||
mkdirAttempted := false
|
||||
|
||||
for {
|
||||
@@ -181,11 +184,20 @@ func (d *davStorageImpl) PutBlobInPath(ctx context.Context, dirPath, filePath st
|
||||
|
||||
return err
|
||||
}
|
||||
}, isRetriable)
|
||||
}
|
||||
}, isRetriable); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
func (d *davStorageImpl) SetTimeInPath(ctx context.Context, dirPath, filePath string, n time.Time) error {
|
||||
return blob.ErrSetTimeUnsupported
|
||||
if opts.GetModTime != nil {
|
||||
bm, err := d.GetMetadataFromPath(ctx, dirPath, filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*opts.GetModTime = bm.Timestamp
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *davStorageImpl) DeleteBlobInPath(ctx context.Context, dirPath, filePath string) error {
|
||||
|
||||
@@ -37,18 +37,12 @@ func (m *encryptedBlobMgr) encryptAndWriteBlob(ctx context.Context, data gather.
|
||||
return blob.Metadata{}, errors.Wrap(err, "error encrypting")
|
||||
}
|
||||
|
||||
err = m.st.PutBlob(ctx, blobID, data2.Bytes(), blob.PutOptions{})
|
||||
bm, err := blob.PutBlobAndGetMetadata(ctx, m.st, blobID, data2.Bytes(), blob.PutOptions{})
|
||||
if err != nil {
|
||||
m.log.Debugf("write-index-blob %v failed %v", blobID, err)
|
||||
return blob.Metadata{}, errors.Wrapf(err, "error writing blob %v", blobID)
|
||||
}
|
||||
|
||||
bm, err := m.st.GetMetadata(ctx, blobID)
|
||||
if err != nil {
|
||||
m.log.Debugf("write-index-blob-get-metadata %v failed %v", blobID, err)
|
||||
return blob.Metadata{}, errors.Wrap(err, "unable to get blob metadata")
|
||||
}
|
||||
|
||||
m.log.Debugf("write-index-blob %v %v %v", blobID, bm.Length, bm.Timestamp)
|
||||
|
||||
return bm, nil
|
||||
|
||||
@@ -78,11 +78,6 @@ func TestEncryptedBlobManager(t *testing.T) {
|
||||
_, err = ebm.encryptAndWriteBlob(ctx, gather.FromSlice([]byte{1, 2, 3, 4}), "x", "session1")
|
||||
require.ErrorIs(t, err, someError)
|
||||
|
||||
fs.AddFault(blobtesting.MethodGetMetadata).ErrorInstead(someError)
|
||||
|
||||
_, err = ebm.encryptAndWriteBlob(ctx, gather.FromSlice([]byte{1, 2, 3, 4}), "x", "session1")
|
||||
require.ErrorIs(t, err, someError)
|
||||
|
||||
someError2 := errors.Errorf("some error 2")
|
||||
|
||||
cr.Encryptor = failingEncryptor{nil, someError2}
|
||||
|
||||
Reference in New Issue
Block a user