From c242235a32cc88a88c31f006cea765bdfb93d2f7 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Mon, 31 Aug 2020 19:50:15 -0700 Subject: [PATCH] blob: added SetTime() method which may be optionally implemented by blob.Storage (#575) cli: added --times option to 'repository sync' --- cli/command_repository_sync.go | 15 ++++++++++++++ internal/blobtesting/eventually_consistent.go | 4 ++++ internal/blobtesting/faulty.go | 9 +++++++++ internal/blobtesting/map.go | 9 +++++++++ internal/blobtesting/verify.go | 20 +++++++++++++++++++ repo/blob/azure/azure_storage.go | 5 +++++ repo/blob/b2/b2_storage.go | 4 ++++ repo/blob/filesystem/filesystem_storage.go | 7 +++++++ repo/blob/gcs/gcs_storage.go | 5 +++++ repo/blob/logging/logging_storage.go | 10 ++++++++++ repo/blob/logging/logging_storage_test.go | 5 ++++- repo/blob/readonly/readonly_storage.go | 5 +++++ repo/blob/s3/s3_storage.go | 5 +++++ repo/blob/sftp/sftp_storage.go | 5 +++++ repo/blob/sharded/sharded.go | 9 +++++++++ repo/blob/storage.go | 6 ++++++ repo/blob/webdav/webdav_storage.go | 5 +++++ tests/end_to_end_test/repository_sync_test.go | 2 +- 18 files changed, 128 insertions(+), 2 deletions(-) diff --git a/cli/command_repository_sync.go b/cli/command_repository_sync.go index 94b5ca171..694c596a6 100644 --- a/cli/command_repository_sync.go +++ b/cli/command_repository_sync.go @@ -27,6 +27,7 @@ repositorySyncDryRun = repositorySyncCommand.Flag("dry-run", "Do not perform copying.").Short('n').Bool() repositorySyncParallelism = repositorySyncCommand.Flag("parallel", "Copy parallelism.").Default("1").Int() repositorySyncDestinationMustExist = repositorySyncCommand.Flag("must-exist", "Fail if destination does not have repository format blob.").Bool() + repositorySyncTimes = repositorySyncCommand.Flag("times", "Synchronize blob times if supported.").Bool() ) const syncProgressInterval = 300 * time.Millisecond @@ -255,6 +256,8 @@ func sliceToChannel(ctx context.Context, md []blob.Metadata) chan blob.Metadata return ch } +var setTimeUnsupportedOnce sync.Once + func syncCopyBlob(ctx context.Context, m blob.Metadata, src, dst blob.Storage) error { data, err := src.GetBlob(ctx, m.BlobID, 0, -1) if err != nil { @@ -270,6 +273,18 @@ func syncCopyBlob(ctx context.Context, m blob.Metadata, src, dst blob.Storage) e return errors.Wrapf(err, "error writing blob '%v' to destination", m.BlobID) } + if *repositorySyncTimes { + if err := dst.SetTime(ctx, m.BlobID, m.Timestamp); err != nil { + if errors.Is(err, blob.ErrSetTimeUnsupported) { + setTimeUnsupportedOnce.Do(func() { + log(ctx).Warningf("destination repository does not support setting time") + }) + } + + return errors.Wrapf(err, "error setting time on destination '%v'", m.BlobID) + } + } + return nil } diff --git a/internal/blobtesting/eventually_consistent.go b/internal/blobtesting/eventually_consistent.go index 56800226c..87d799842 100644 --- a/internal/blobtesting/eventually_consistent.go +++ b/internal/blobtesting/eventually_consistent.go @@ -165,6 +165,10 @@ func (s *eventuallyConsistentStorage) PutBlob(ctx context.Context, id blob.ID, d return nil } +func (s *eventuallyConsistentStorage) SetTime(ctx context.Context, id blob.ID, t time.Time) error { + return s.realStorage.SetTime(ctx, id, t) +} + func (s *eventuallyConsistentStorage) DeleteBlob(ctx context.Context, id blob.ID) error { s.randomFrontendCache().put(id, nil) diff --git a/internal/blobtesting/faulty.go b/internal/blobtesting/faulty.go index 26b16e2b9..644cbb020 100644 --- a/internal/blobtesting/faulty.go +++ b/internal/blobtesting/faulty.go @@ -55,6 +55,15 @@ func (s *FaultyStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes return s.Base.PutBlob(ctx, id, data) } +// SetTime implements blob.Storage. +func (s *FaultyStorage) SetTime(ctx context.Context, id blob.ID, t time.Time) error { + if err := s.getNextFault(ctx, "SetTime", id); err != nil { + return err + } + + return s.Base.SetTime(ctx, id, t) +} + // DeleteBlob implements blob.Storage. func (s *FaultyStorage) DeleteBlob(ctx context.Context, id blob.ID) error { if err := s.getNextFault(ctx, "DeleteBlob", id); err != nil { diff --git a/internal/blobtesting/map.go b/internal/blobtesting/map.go index 00f532190..ce49230d3 100644 --- a/internal/blobtesting/map.go +++ b/internal/blobtesting/map.go @@ -135,6 +135,15 @@ func (s *mapStorage) Close(ctx context.Context) error { return nil } +func (s *mapStorage) SetTime(ctx context.Context, blobID blob.ID, t time.Time) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.keyTime[blobID] = t + + return nil +} + func (s *mapStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) error { s.mutex.Lock() defer s.mutex.Unlock() diff --git a/internal/blobtesting/verify.go b/internal/blobtesting/verify.go index 8464194dd..38af10faa 100644 --- a/internal/blobtesting/verify.go +++ b/internal/blobtesting/verify.go @@ -4,6 +4,9 @@ "bytes" "context" "reflect" + "time" + + "github.com/pkg/errors" "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/repo/blob" @@ -50,6 +53,23 @@ func VerifyStorage(ctx context.Context, t testingT, r blob.Storage) { AssertGetBlob(ctx, t, r, b.blk, b.contents) } + ts := time.Date(2020, 1, 1, 15, 30, 45, 0, time.UTC) + + for _, b := range blocks { + if err := r.SetTime(ctx, b.blk, ts); errors.Is(err, blob.ErrSetTimeUnsupported) { + break + } + + md, err := r.GetMetadata(ctx, b.blk) + if err != nil { + t.Errorf("unable to get blob metadata") + } + + if got, want := md.Timestamp, ts; !got.Equal(want) { + t.Errorf("invalid time after SetTme(): %vm want %v", got, want) + } + } + if err := r.DeleteBlob(ctx, blocks[0].blk); err != nil { t.Errorf("unable to delete block: %v", err) } diff --git a/repo/blob/azure/azure_storage.go b/repo/blob/azure/azure_storage.go index 080e88540..a486d1ad7 100644 --- a/repo/blob/azure/azure_storage.go +++ b/repo/blob/azure/azure_storage.go @@ -6,6 +6,7 @@ "fmt" "io" "io/ioutil" + "time" "github.com/Azure/azure-storage-blob-go/azblob" "github.com/efarrer/iothrottler" @@ -157,6 +158,10 @@ func (az *azStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes) er return translateError(writer.Close()) } +func (az *azStorage) SetTime(ctx context.Context, b blob.ID, t time.Time) error { + return blob.ErrSetTimeUnsupported +} + // DeleteBlob deletes azure blob from container with given ID. func (az *azStorage) DeleteBlob(ctx context.Context, b blob.ID) error { attempt := func() (interface{}, error) { diff --git a/repo/blob/b2/b2_storage.go b/repo/blob/b2/b2_storage.go index 8e5b2e80f..aa20ead4b 100644 --- a/repo/blob/b2/b2_storage.go +++ b/repo/blob/b2/b2_storage.go @@ -182,6 +182,10 @@ func (s *b2Storage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes) er return translateError(err) } +func (s *b2Storage) SetTime(ctx context.Context, b blob.ID, t time.Time) error { + return blob.ErrSetTimeUnsupported +} + func (s *b2Storage) DeleteBlob(ctx context.Context, id blob.ID) error { fileName := s.getObjectNameString(id) _, err := s.bucket.HideFile(fileName) diff --git a/repo/blob/filesystem/filesystem_storage.go b/repo/blob/filesystem/filesystem_storage.go index c0c3309e3..cff8b8a7e 100644 --- a/repo/blob/filesystem/filesystem_storage.go +++ b/repo/blob/filesystem/filesystem_storage.go @@ -228,6 +228,13 @@ func (fs *fsImpl) ReadDir(ctx context.Context, dirname string) ([]os.FileInfo, e return v.([]os.FileInfo), nil } +// SetTime updates file modification time to the provided time. +func (fs *fsImpl) SetTimeInPath(ctx context.Context, dirPath, filePath string, n time.Time) error { + log(ctx).Debugf("updating timestamp on %v to %v", filePath, n) + + return os.Chtimes(filePath, n, n) +} + // TouchBlob updates file modification time to current time if it's sufficiently old. func (fs *fsStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) error { _, path := fs.Storage.GetShardedPathAndFilePath(blobID) diff --git a/repo/blob/gcs/gcs_storage.go b/repo/blob/gcs/gcs_storage.go index 32247de8c..68740c8dd 100644 --- a/repo/blob/gcs/gcs_storage.go +++ b/repo/blob/gcs/gcs_storage.go @@ -6,6 +6,7 @@ "encoding/json" "fmt" "io/ioutil" + "time" gcsclient "cloud.google.com/go/storage" "github.com/efarrer/iothrottler" @@ -159,6 +160,10 @@ func (gcs *gcsStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes) return translateError(writer.Close()) } +func (gcs *gcsStorage) SetTime(ctx context.Context, b blob.ID, t time.Time) error { + return blob.ErrSetTimeUnsupported +} + func (gcs *gcsStorage) DeleteBlob(ctx context.Context, b blob.ID) error { attempt := func() (interface{}, error) { return nil, gcs.bucket.Object(gcs.getObjectNameString(b)).Delete(gcs.ctx) diff --git a/repo/blob/logging/logging_storage.go b/repo/blob/logging/logging_storage.go index bb185c261..b31c7f420 100644 --- a/repo/blob/logging/logging_storage.go +++ b/repo/blob/logging/logging_storage.go @@ -3,6 +3,7 @@ import ( "context" + "time" "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/repo/blob" @@ -49,6 +50,15 @@ func (s *loggingStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Byte return err } +func (s *loggingStorage) SetTime(ctx context.Context, id blob.ID, t time.Time) error { + t0 := clock.Now() + err := s.base.SetTime(ctx, id, t) + dt := clock.Since(t0) + s.printf(s.prefix+"SetTime(%q,%v)=%#v took %v", id, t, err, dt) + + return err +} + func (s *loggingStorage) DeleteBlob(ctx context.Context, id blob.ID) error { t0 := clock.Now() err := s.base.DeleteBlob(ctx, id) diff --git a/repo/blob/logging/logging_storage_test.go b/repo/blob/logging/logging_storage_test.go index aba4940a4..728c06e13 100644 --- a/repo/blob/logging/logging_storage_test.go +++ b/repo/blob/logging/logging_storage_test.go @@ -3,9 +3,11 @@ import ( "strings" "testing" + "time" "github.com/kopia/kopia/internal/blobtesting" "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/repo/blob" ) func TestLoggingStorage(t *testing.T) { @@ -20,7 +22,8 @@ func TestLoggingStorage(t *testing.T) { } data := blobtesting.DataMap{} - underlying := blobtesting.NewMapStorage(data, nil, nil) + kt := map[blob.ID]time.Time{} + underlying := blobtesting.NewMapStorage(data, kt, nil) st := NewWrapper(underlying, myOutput, myPrefix) if st == nil { diff --git a/repo/blob/readonly/readonly_storage.go b/repo/blob/readonly/readonly_storage.go index 2eda49613..f6ade4847 100644 --- a/repo/blob/readonly/readonly_storage.go +++ b/repo/blob/readonly/readonly_storage.go @@ -3,6 +3,7 @@ import ( "context" + "time" "github.com/pkg/errors" @@ -25,6 +26,10 @@ func (s readonlyStorage) GetMetadata(ctx context.Context, id blob.ID) (blob.Meta return s.base.GetMetadata(ctx, id) } +func (s readonlyStorage) SetTime(ctx context.Context, id blob.ID, t time.Time) error { + return ErrReadonly +} + func (s readonlyStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes) error { return ErrReadonly } diff --git a/repo/blob/s3/s3_storage.go b/repo/blob/s3/s3_storage.go index c1f2abfac..b0cada573 100644 --- a/repo/blob/s3/s3_storage.go +++ b/repo/blob/s3/s3_storage.go @@ -10,6 +10,7 @@ "io/ioutil" "net/http" "strings" + "time" "github.com/efarrer/iothrottler" minio "github.com/minio/minio-go/v6" @@ -161,6 +162,10 @@ func (s *s3Storage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes) err }, isRetriableError)) } +func (s *s3Storage) SetTime(ctx context.Context, b blob.ID, t time.Time) error { + return blob.ErrSetTimeUnsupported +} + func (s *s3Storage) DeleteBlob(ctx context.Context, b blob.ID) error { attempt := func() (interface{}, error) { return nil, s.cli.RemoveObject(s.BucketName, s.getObjectNameString(b)) diff --git a/repo/blob/sftp/sftp_storage.go b/repo/blob/sftp/sftp_storage.go index 4b8d3a422..06edfe64e 100644 --- a/repo/blob/sftp/sftp_storage.go +++ b/repo/blob/sftp/sftp_storage.go @@ -12,6 +12,7 @@ "os" "path" "strings" + "time" "github.com/pkg/errors" psftp "github.com/pkg/sftp" @@ -135,6 +136,10 @@ func (s *sftpImpl) PutBlobInPath(ctx context.Context, dirPath, fullPath string, return nil } +func (s *sftpImpl) SetTimeInPath(ctx context.Context, dirPath, fullPath string, n time.Time) error { + return s.cli.Chtimes(fullPath, n, n) +} + func (s *sftpImpl) createTempFileAndDir(tempFile string) (*psftp.File, error) { flags := os.O_CREATE | os.O_WRONLY | os.O_EXCL diff --git a/repo/blob/sharded/sharded.go b/repo/blob/sharded/sharded.go index d0de9769b..3becb01db 100644 --- a/repo/blob/sharded/sharded.go +++ b/repo/blob/sharded/sharded.go @@ -6,6 +6,7 @@ "os" "path" "strings" + "time" "github.com/kopia/kopia/repo/blob" ) @@ -17,6 +18,7 @@ type Impl interface { GetBlobFromPath(ctx context.Context, dirPath, filePath string, offset, length int64) ([]byte, error) GetMetadataFromPath(ctx context.Context, dirPath, filePath string) (blob.Metadata, error) PutBlobInPath(ctx context.Context, dirPath, filePath string, dataSlices blob.Bytes) error + SetTimeInPath(ctx context.Context, dirPath, filePath string, t time.Time) error DeleteBlobInPath(ctx context.Context, dirPath, filePath string) error ReadDir(ctx context.Context, path string) ([]os.FileInfo, error) } @@ -111,6 +113,13 @@ func (s Storage) PutBlob(ctx context.Context, blobID blob.ID, data blob.Bytes) e return s.Impl.PutBlobInPath(ctx, dirPath, filePath, data) } +// SetTime implements blob.Storage. +func (s Storage) SetTime(ctx context.Context, blobID blob.ID, n time.Time) error { + dirPath, filePath := s.GetShardedPathAndFilePath(blobID) + + return s.Impl.SetTimeInPath(ctx, dirPath, filePath, n) +} + // DeleteBlob implements blob.Storage. func (s Storage) DeleteBlob(ctx context.Context, blobID blob.ID) error { dirPath, filePath := s.GetShardedPathAndFilePath(blobID) diff --git a/repo/blob/storage.go b/repo/blob/storage.go index 9aad37915..d314a93b3 100644 --- a/repo/blob/storage.go +++ b/repo/blob/storage.go @@ -10,6 +10,9 @@ "github.com/pkg/errors" ) +// ErrSetTimeUnsupported is returned by implementations of Storage that don't support SetTime. +var ErrSetTimeUnsupported = errors.Errorf("SetTime is not supported") + // Bytes encapsulates a sequence of bytes, possibly stored in a non-contiguous buffers, // which can be written sequentially or treated as a io.Reader. type Bytes interface { @@ -35,6 +38,9 @@ type Storage interface { // id with contents gathered from the specified list of slices. PutBlob(ctx context.Context, blobID ID, data Bytes) error + // SetTime changes last modification time of a given blob, if supported, returns ErrSetTimeUnsupported otherwise. + SetTime(ctx context.Context, blobID ID, t time.Time) error + // DeleteBlob removes the blob from storage. Future Get() operations will fail with ErrNotFound. DeleteBlob(ctx context.Context, blobID ID) error diff --git a/repo/blob/webdav/webdav_storage.go b/repo/blob/webdav/webdav_storage.go index 1015f9b5d..17c0f581b 100644 --- a/repo/blob/webdav/webdav_storage.go +++ b/repo/blob/webdav/webdav_storage.go @@ -10,6 +10,7 @@ "os" "strconv" "strings" + "time" "github.com/pkg/errors" "github.com/studio-b12/gowebdav" @@ -152,6 +153,10 @@ func (d *davStorageImpl) PutBlobInPath(ctx context.Context, dirPath, filePath st return d.translateError(d.cli.Rename(tmpPath, filePath, true)) } +func (d *davStorageImpl) SetTimeInPath(ctx context.Context, dirPath, filePath string, n time.Time) error { + return blob.ErrSetTimeUnsupported +} + func (d *davStorageImpl) DeleteBlobInPath(ctx context.Context, dirPath, filePath string) error { return d.translateError(retry.WithExponentialBackoffNoValue(ctx, "DeleteBlobInPath", func() error { return d.cli.Remove(filePath) diff --git a/tests/end_to_end_test/repository_sync_test.go b/tests/end_to_end_test/repository_sync_test.go index 3c12e7588..f5a62f218 100644 --- a/tests/end_to_end_test/repository_sync_test.go +++ b/tests/end_to_end_test/repository_sync_test.go @@ -22,7 +22,7 @@ func TestRepositorySync(t *testing.T) { // synchronize repository blobs to another directory dir2 := makeScratchDir(t) - e.RunAndExpectSuccess(t, "repo", "sync-to", "filesystem", "--path", dir2) + e.RunAndExpectSuccess(t, "repo", "sync-to", "filesystem", "--path", dir2, "--times") // synchronizing to empty directory fails with --must-exist dir3 := makeScratchDir(t)