mirror of
https://github.com/kopia/kopia.git
synced 2026-05-16 10:44:40 -04:00
blob: added SetTime() method which may be optionally implemented by blob.Storage (#575)
cli: added --times option to 'repository sync'
This commit is contained in:
@@ -27,6 +27,7 @@
|
||||
repositorySyncDryRun = repositorySyncCommand.Flag("dry-run", "Do not perform copying.").Short('n').Bool()
|
||||
repositorySyncParallelism = repositorySyncCommand.Flag("parallel", "Copy parallelism.").Default("1").Int()
|
||||
repositorySyncDestinationMustExist = repositorySyncCommand.Flag("must-exist", "Fail if destination does not have repository format blob.").Bool()
|
||||
repositorySyncTimes = repositorySyncCommand.Flag("times", "Synchronize blob times if supported.").Bool()
|
||||
)
|
||||
|
||||
const syncProgressInterval = 300 * time.Millisecond
|
||||
@@ -255,6 +256,8 @@ func sliceToChannel(ctx context.Context, md []blob.Metadata) chan blob.Metadata
|
||||
return ch
|
||||
}
|
||||
|
||||
var setTimeUnsupportedOnce sync.Once
|
||||
|
||||
func syncCopyBlob(ctx context.Context, m blob.Metadata, src, dst blob.Storage) error {
|
||||
data, err := src.GetBlob(ctx, m.BlobID, 0, -1)
|
||||
if err != nil {
|
||||
@@ -270,6 +273,18 @@ func syncCopyBlob(ctx context.Context, m blob.Metadata, src, dst blob.Storage) e
|
||||
return errors.Wrapf(err, "error writing blob '%v' to destination", m.BlobID)
|
||||
}
|
||||
|
||||
if *repositorySyncTimes {
|
||||
if err := dst.SetTime(ctx, m.BlobID, m.Timestamp); err != nil {
|
||||
if errors.Is(err, blob.ErrSetTimeUnsupported) {
|
||||
setTimeUnsupportedOnce.Do(func() {
|
||||
log(ctx).Warningf("destination repository does not support setting time")
|
||||
})
|
||||
}
|
||||
|
||||
return errors.Wrapf(err, "error setting time on destination '%v'", m.BlobID)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -165,6 +165,10 @@ func (s *eventuallyConsistentStorage) PutBlob(ctx context.Context, id blob.ID, d
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *eventuallyConsistentStorage) SetTime(ctx context.Context, id blob.ID, t time.Time) error {
|
||||
return s.realStorage.SetTime(ctx, id, t)
|
||||
}
|
||||
|
||||
func (s *eventuallyConsistentStorage) DeleteBlob(ctx context.Context, id blob.ID) error {
|
||||
s.randomFrontendCache().put(id, nil)
|
||||
|
||||
|
||||
@@ -55,6 +55,15 @@ func (s *FaultyStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes
|
||||
return s.Base.PutBlob(ctx, id, data)
|
||||
}
|
||||
|
||||
// SetTime implements blob.Storage.
|
||||
func (s *FaultyStorage) SetTime(ctx context.Context, id blob.ID, t time.Time) error {
|
||||
if err := s.getNextFault(ctx, "SetTime", id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.Base.SetTime(ctx, id, t)
|
||||
}
|
||||
|
||||
// DeleteBlob implements blob.Storage.
|
||||
func (s *FaultyStorage) DeleteBlob(ctx context.Context, id blob.ID) error {
|
||||
if err := s.getNextFault(ctx, "DeleteBlob", id); err != nil {
|
||||
|
||||
@@ -135,6 +135,15 @@ func (s *mapStorage) Close(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *mapStorage) SetTime(ctx context.Context, blobID blob.ID, t time.Time) error {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
s.keyTime[blobID] = t
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *mapStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) error {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
@@ -4,6 +4,9 @@
|
||||
"bytes"
|
||||
"context"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
@@ -50,6 +53,23 @@ func VerifyStorage(ctx context.Context, t testingT, r blob.Storage) {
|
||||
AssertGetBlob(ctx, t, r, b.blk, b.contents)
|
||||
}
|
||||
|
||||
ts := time.Date(2020, 1, 1, 15, 30, 45, 0, time.UTC)
|
||||
|
||||
for _, b := range blocks {
|
||||
if err := r.SetTime(ctx, b.blk, ts); errors.Is(err, blob.ErrSetTimeUnsupported) {
|
||||
break
|
||||
}
|
||||
|
||||
md, err := r.GetMetadata(ctx, b.blk)
|
||||
if err != nil {
|
||||
t.Errorf("unable to get blob metadata")
|
||||
}
|
||||
|
||||
if got, want := md.Timestamp, ts; !got.Equal(want) {
|
||||
t.Errorf("invalid time after SetTme(): %vm want %v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
if err := r.DeleteBlob(ctx, blocks[0].blk); err != nil {
|
||||
t.Errorf("unable to delete block: %v", err)
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-storage-blob-go/azblob"
|
||||
"github.com/efarrer/iothrottler"
|
||||
@@ -157,6 +158,10 @@ func (az *azStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes) er
|
||||
return translateError(writer.Close())
|
||||
}
|
||||
|
||||
func (az *azStorage) SetTime(ctx context.Context, b blob.ID, t time.Time) error {
|
||||
return blob.ErrSetTimeUnsupported
|
||||
}
|
||||
|
||||
// DeleteBlob deletes azure blob from container with given ID.
|
||||
func (az *azStorage) DeleteBlob(ctx context.Context, b blob.ID) error {
|
||||
attempt := func() (interface{}, error) {
|
||||
|
||||
@@ -182,6 +182,10 @@ func (s *b2Storage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes) er
|
||||
return translateError(err)
|
||||
}
|
||||
|
||||
func (s *b2Storage) SetTime(ctx context.Context, b blob.ID, t time.Time) error {
|
||||
return blob.ErrSetTimeUnsupported
|
||||
}
|
||||
|
||||
func (s *b2Storage) DeleteBlob(ctx context.Context, id blob.ID) error {
|
||||
fileName := s.getObjectNameString(id)
|
||||
_, err := s.bucket.HideFile(fileName)
|
||||
|
||||
@@ -228,6 +228,13 @@ func (fs *fsImpl) ReadDir(ctx context.Context, dirname string) ([]os.FileInfo, e
|
||||
return v.([]os.FileInfo), nil
|
||||
}
|
||||
|
||||
// SetTime updates file modification time to the provided time.
|
||||
func (fs *fsImpl) SetTimeInPath(ctx context.Context, dirPath, filePath string, n time.Time) error {
|
||||
log(ctx).Debugf("updating timestamp on %v to %v", filePath, n)
|
||||
|
||||
return os.Chtimes(filePath, n, n)
|
||||
}
|
||||
|
||||
// TouchBlob updates file modification time to current time if it's sufficiently old.
|
||||
func (fs *fsStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) error {
|
||||
_, path := fs.Storage.GetShardedPathAndFilePath(blobID)
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"time"
|
||||
|
||||
gcsclient "cloud.google.com/go/storage"
|
||||
"github.com/efarrer/iothrottler"
|
||||
@@ -159,6 +160,10 @@ func (gcs *gcsStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes)
|
||||
return translateError(writer.Close())
|
||||
}
|
||||
|
||||
func (gcs *gcsStorage) SetTime(ctx context.Context, b blob.ID, t time.Time) error {
|
||||
return blob.ErrSetTimeUnsupported
|
||||
}
|
||||
|
||||
func (gcs *gcsStorage) DeleteBlob(ctx context.Context, b blob.ID) error {
|
||||
attempt := func() (interface{}, error) {
|
||||
return nil, gcs.bucket.Object(gcs.getObjectNameString(b)).Delete(gcs.ctx)
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
@@ -49,6 +50,15 @@ func (s *loggingStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Byte
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *loggingStorage) SetTime(ctx context.Context, id blob.ID, t time.Time) error {
|
||||
t0 := clock.Now()
|
||||
err := s.base.SetTime(ctx, id, t)
|
||||
dt := clock.Since(t0)
|
||||
s.printf(s.prefix+"SetTime(%q,%v)=%#v took %v", id, t, err, dt)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *loggingStorage) DeleteBlob(ctx context.Context, id blob.ID) error {
|
||||
t0 := clock.Now()
|
||||
err := s.base.DeleteBlob(ctx, id)
|
||||
|
||||
@@ -3,9 +3,11 @@
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
)
|
||||
|
||||
func TestLoggingStorage(t *testing.T) {
|
||||
@@ -20,7 +22,8 @@ func TestLoggingStorage(t *testing.T) {
|
||||
}
|
||||
|
||||
data := blobtesting.DataMap{}
|
||||
underlying := blobtesting.NewMapStorage(data, nil, nil)
|
||||
kt := map[blob.ID]time.Time{}
|
||||
underlying := blobtesting.NewMapStorage(data, kt, nil)
|
||||
|
||||
st := NewWrapper(underlying, myOutput, myPrefix)
|
||||
if st == nil {
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
@@ -25,6 +26,10 @@ func (s readonlyStorage) GetMetadata(ctx context.Context, id blob.ID) (blob.Meta
|
||||
return s.base.GetMetadata(ctx, id)
|
||||
}
|
||||
|
||||
func (s readonlyStorage) SetTime(ctx context.Context, id blob.ID, t time.Time) error {
|
||||
return ErrReadonly
|
||||
}
|
||||
|
||||
func (s readonlyStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes) error {
|
||||
return ErrReadonly
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/efarrer/iothrottler"
|
||||
minio "github.com/minio/minio-go/v6"
|
||||
@@ -161,6 +162,10 @@ func (s *s3Storage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes) err
|
||||
}, isRetriableError))
|
||||
}
|
||||
|
||||
func (s *s3Storage) SetTime(ctx context.Context, b blob.ID, t time.Time) error {
|
||||
return blob.ErrSetTimeUnsupported
|
||||
}
|
||||
|
||||
func (s *s3Storage) DeleteBlob(ctx context.Context, b blob.ID) error {
|
||||
attempt := func() (interface{}, error) {
|
||||
return nil, s.cli.RemoveObject(s.BucketName, s.getObjectNameString(b))
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
psftp "github.com/pkg/sftp"
|
||||
@@ -135,6 +136,10 @@ func (s *sftpImpl) PutBlobInPath(ctx context.Context, dirPath, fullPath string,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *sftpImpl) SetTimeInPath(ctx context.Context, dirPath, fullPath string, n time.Time) error {
|
||||
return s.cli.Chtimes(fullPath, n, n)
|
||||
}
|
||||
|
||||
func (s *sftpImpl) createTempFileAndDir(tempFile string) (*psftp.File, error) {
|
||||
flags := os.O_CREATE | os.O_WRONLY | os.O_EXCL
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
)
|
||||
@@ -17,6 +18,7 @@ type Impl interface {
|
||||
GetBlobFromPath(ctx context.Context, dirPath, filePath string, offset, length int64) ([]byte, error)
|
||||
GetMetadataFromPath(ctx context.Context, dirPath, filePath string) (blob.Metadata, error)
|
||||
PutBlobInPath(ctx context.Context, dirPath, filePath string, dataSlices blob.Bytes) error
|
||||
SetTimeInPath(ctx context.Context, dirPath, filePath string, t time.Time) error
|
||||
DeleteBlobInPath(ctx context.Context, dirPath, filePath string) error
|
||||
ReadDir(ctx context.Context, path string) ([]os.FileInfo, error)
|
||||
}
|
||||
@@ -111,6 +113,13 @@ func (s Storage) PutBlob(ctx context.Context, blobID blob.ID, data blob.Bytes) e
|
||||
return s.Impl.PutBlobInPath(ctx, dirPath, filePath, data)
|
||||
}
|
||||
|
||||
// SetTime implements blob.Storage.
|
||||
func (s Storage) SetTime(ctx context.Context, blobID blob.ID, n time.Time) error {
|
||||
dirPath, filePath := s.GetShardedPathAndFilePath(blobID)
|
||||
|
||||
return s.Impl.SetTimeInPath(ctx, dirPath, filePath, n)
|
||||
}
|
||||
|
||||
// DeleteBlob implements blob.Storage.
|
||||
func (s Storage) DeleteBlob(ctx context.Context, blobID blob.ID) error {
|
||||
dirPath, filePath := s.GetShardedPathAndFilePath(blobID)
|
||||
|
||||
@@ -10,6 +10,9 @@
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// ErrSetTimeUnsupported is returned by implementations of Storage that don't support SetTime.
|
||||
var ErrSetTimeUnsupported = errors.Errorf("SetTime is not supported")
|
||||
|
||||
// Bytes encapsulates a sequence of bytes, possibly stored in a non-contiguous buffers,
|
||||
// which can be written sequentially or treated as a io.Reader.
|
||||
type Bytes interface {
|
||||
@@ -35,6 +38,9 @@ type Storage interface {
|
||||
// id with contents gathered from the specified list of slices.
|
||||
PutBlob(ctx context.Context, blobID ID, data Bytes) error
|
||||
|
||||
// SetTime changes last modification time of a given blob, if supported, returns ErrSetTimeUnsupported otherwise.
|
||||
SetTime(ctx context.Context, blobID ID, t time.Time) error
|
||||
|
||||
// DeleteBlob removes the blob from storage. Future Get() operations will fail with ErrNotFound.
|
||||
DeleteBlob(ctx context.Context, blobID ID) error
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/studio-b12/gowebdav"
|
||||
@@ -152,6 +153,10 @@ func (d *davStorageImpl) PutBlobInPath(ctx context.Context, dirPath, filePath st
|
||||
return d.translateError(d.cli.Rename(tmpPath, filePath, true))
|
||||
}
|
||||
|
||||
func (d *davStorageImpl) SetTimeInPath(ctx context.Context, dirPath, filePath string, n time.Time) error {
|
||||
return blob.ErrSetTimeUnsupported
|
||||
}
|
||||
|
||||
func (d *davStorageImpl) DeleteBlobInPath(ctx context.Context, dirPath, filePath string) error {
|
||||
return d.translateError(retry.WithExponentialBackoffNoValue(ctx, "DeleteBlobInPath", func() error {
|
||||
return d.cli.Remove(filePath)
|
||||
|
||||
@@ -22,7 +22,7 @@ func TestRepositorySync(t *testing.T) {
|
||||
|
||||
// synchronize repository blobs to another directory
|
||||
dir2 := makeScratchDir(t)
|
||||
e.RunAndExpectSuccess(t, "repo", "sync-to", "filesystem", "--path", dir2)
|
||||
e.RunAndExpectSuccess(t, "repo", "sync-to", "filesystem", "--path", dir2, "--times")
|
||||
|
||||
// synchronizing to empty directory fails with --must-exist
|
||||
dir3 := makeScratchDir(t)
|
||||
|
||||
Reference in New Issue
Block a user