diff --git a/go.mod b/go.mod index e6d172403..a03fcbf24 100644 --- a/go.mod +++ b/go.mod @@ -62,7 +62,7 @@ require ( google.golang.org/api v0.70.0 google.golang.org/grpc v1.44.0 google.golang.org/protobuf v1.27.1 - gopkg.in/ini.v1 v1.63.2 // indirect + gopkg.in/ini.v1 v1.66.2 // indirect gopkg.in/kothar/go-backblaze.v0 v0.0.0-20210124194846-35409b867216 gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect diff --git a/go.sum b/go.sum index 42ada0b70..74c6f40db 100644 --- a/go.sum +++ b/go.sum @@ -1128,8 +1128,8 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o= gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= -gopkg.in/ini.v1 v1.63.2 h1:tGK/CyBg7SMzb60vP1M03vNZ3VDu3wGQJwn7Sxi9r3c= -gopkg.in/ini.v1 v1.63.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/ini.v1 v1.66.2 h1:XfR1dOYubytKy4Shzc2LHrrGhU0lDCfDGG1yLPmpgsI= +gopkg.in/ini.v1 v1.66.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/kothar/go-backblaze.v0 v0.0.0-20210124194846-35409b867216 h1:2TSTkQ8PMvGOD5eeqqRVv6Z9+BYI+bowK97RCr3W+9M= gopkg.in/kothar/go-backblaze.v0 v0.0.0-20210124194846-35409b867216/go.mod h1:zJ2QpyDCYo1KvLXlmdnFlQAyF/Qfth0fB8239Qg7BIE= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= diff --git a/internal/blobtesting/eventually_consistent.go b/internal/blobtesting/eventually_consistent.go index 6fe842cba..3ad16cf46 100644 --- a/internal/blobtesting/eventually_consistent.go +++ b/internal/blobtesting/eventually_consistent.go @@ -106,6 +106,10 @@ func (s *eventuallyConsistentStorage) randomFrontendCache() *ecFrontendCache { return s.caches[n] } +func (s *eventuallyConsistentStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) { + return s.realStorage.GetCapacity(ctx) +} + func (s *eventuallyConsistentStorage) GetBlob(ctx context.Context, id blob.ID, offset, length int64, output blob.OutputBuffer) error { // don't bother caching partial reads if length >= 0 { diff --git a/internal/blobtesting/faulty.go b/internal/blobtesting/faulty.go index adb79520f..369e7060a 100644 --- a/internal/blobtesting/faulty.go +++ b/internal/blobtesting/faulty.go @@ -18,6 +18,7 @@ MethodListBlobsItem MethodClose MethodFlushCaches + MethodGetCapacity ) // FaultyStorage implements fault injection for FaultyStorage. @@ -34,6 +35,15 @@ func NewFaultyStorage(base blob.Storage) *FaultyStorage { } } +// GetCapacity implements blob.Volume. +func (s *FaultyStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) { + if ok, err := s.GetNextFault(ctx, MethodGetCapacity); ok { + return blob.Capacity{}, err + } + + return s.base.GetCapacity(ctx) +} + // GetBlob implements blob.Storage. func (s *FaultyStorage) GetBlob(ctx context.Context, id blob.ID, offset, length int64, output blob.OutputBuffer) error { if ok, err := s.GetNextFault(ctx, MethodGetBlob, id, offset, length); ok { diff --git a/internal/blobtesting/map.go b/internal/blobtesting/map.go index bb2e8b411..476cb8026 100644 --- a/internal/blobtesting/map.go +++ b/internal/blobtesting/map.go @@ -24,6 +24,10 @@ type mapStorage struct { mutex sync.RWMutex } +func (s *mapStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) { + return blob.Capacity{}, blob.ErrNotAVolume +} + func (s *mapStorage) GetBlob(ctx context.Context, id blob.ID, offset, length int64, output blob.OutputBuffer) error { s.mutex.RLock() defer s.mutex.RUnlock() diff --git a/internal/blobtesting/object_locking_map.go b/internal/blobtesting/object_locking_map.go index c32466c7f..8a844ca6a 100644 --- a/internal/blobtesting/object_locking_map.go +++ b/internal/blobtesting/object_locking_map.go @@ -63,6 +63,10 @@ func (s *objectLockingMap) getLatestForMutationLocked(id blob.ID) (*entry, error return e, nil } +func (s *objectLockingMap) GetCapacity(ctx context.Context) (blob.Capacity, error) { + return blob.Capacity{}, blob.ErrNotAVolume +} + // GetBlob works the same as map-storage GetBlob except that if the latest // version is a delete-marker then it will return ErrBlobNotFound. func (s *objectLockingMap) GetBlob(ctx context.Context, id blob.ID, offset, length int64, output blob.OutputBuffer) error { diff --git a/internal/providervalidation/providervalidation.go b/internal/providervalidation/providervalidation.go index 48f53c60b..9b4700455 100644 --- a/internal/providervalidation/providervalidation.go +++ b/internal/providervalidation/providervalidation.go @@ -63,6 +63,19 @@ func ValidateProvider(ctx context.Context, st blob.Storage, opt Options) error { prefix1 := uberPrefix + "a" prefix2 := uberPrefix + "b" + log(ctx).Infof("Validating storage capacity and usage") + + c, err := st.GetCapacity(ctx) + + switch { + case errors.Is(err, blob.ErrNotAVolume): + // This is okay. We expect some implementations to not support this method. + case c.FreeB > c.SizeB: + return errors.Errorf("expected volume's free space (%dB) to be at most volume size (%dB)", c.FreeB, c.SizeB) + case err != nil: + return errors.Wrapf(err, "unexpected error") + } + log(ctx).Infof("Validating blob list responses") if err := verifyBlobCount(ctx, st, uberPrefix, 0); err != nil { diff --git a/repo/blob/azure/azure_storage.go b/repo/blob/azure/azure_storage.go index d656c152a..da0bd42d7 100644 --- a/repo/blob/azure/azure_storage.go +++ b/repo/blob/azure/azure_storage.go @@ -28,6 +28,10 @@ type azStorage struct { bucket azblob.ContainerClient } +func (az *azStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) { + return blob.Capacity{}, blob.ErrNotAVolume +} + func (az *azStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int64, output blob.OutputBuffer) error { if offset < 0 { return errors.Wrap(blob.ErrInvalidRange, "invalid offset") diff --git a/repo/blob/b2/b2_storage.go b/repo/blob/b2/b2_storage.go index ef5c7f16f..2fca7c977 100644 --- a/repo/blob/b2/b2_storage.go +++ b/repo/blob/b2/b2_storage.go @@ -31,6 +31,10 @@ type b2Storage struct { bucket *backblaze.Bucket } +func (s *b2Storage) GetCapacity(ctx context.Context) (blob.Capacity, error) { + return blob.Capacity{}, blob.ErrNotAVolume +} + func (s *b2Storage) GetBlob(ctx context.Context, id blob.ID, offset, length int64, output blob.OutputBuffer) error { fileName := s.getObjectNameString(id) diff --git a/repo/blob/filesystem/filesystem_storage_capacity_openbsd.go b/repo/blob/filesystem/filesystem_storage_capacity_openbsd.go new file mode 100644 index 000000000..3bc892ba7 --- /dev/null +++ b/repo/blob/filesystem/filesystem_storage_capacity_openbsd.go @@ -0,0 +1,30 @@ +//go:build openbsd +// +build openbsd + +package filesystem + +import ( + "context" + "syscall" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/retry" + "github.com/kopia/kopia/repo/blob" +) + +func (fs *fsStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) { + c, err := retry.WithExponentialBackoff(ctx, "GetCapacity", func() (interface{}, error) { + var stat syscall.Statfs_t + if err := syscall.Statfs(fs.RootPath, &stat); err != nil { + return blob.Capacity{}, errors.Wrap(err, "GetCapacity") + } + + return blob.Capacity{ + SizeB: uint64(stat.F_blocks) * uint64(stat.F_bsize), // nolint:unconvert,nolintlint + FreeB: uint64(stat.F_bavail) * uint64(stat.F_bsize), // nolint:unconvert,nolintlint + }, nil + }, fs.Impl.(*fsImpl).isRetriable) + + return c.(blob.Capacity), err // nolint:forcetypeassert,wrapcheck +} diff --git a/repo/blob/filesystem/filesystem_storage_capacity_unix.go b/repo/blob/filesystem/filesystem_storage_capacity_unix.go new file mode 100644 index 000000000..aa7cec459 --- /dev/null +++ b/repo/blob/filesystem/filesystem_storage_capacity_unix.go @@ -0,0 +1,30 @@ +//go:build linux || freebsd || darwin +// +build linux freebsd darwin + +package filesystem + +import ( + "context" + "syscall" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/retry" + "github.com/kopia/kopia/repo/blob" +) + +func (fs *fsStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) { + c, err := retry.WithExponentialBackoff(ctx, "GetCapacity", func() (interface{}, error) { + var stat syscall.Statfs_t + if err := syscall.Statfs(fs.RootPath, &stat); err != nil { + return blob.Capacity{}, errors.Wrap(err, "GetCapacity") + } + + return blob.Capacity{ + SizeB: uint64(stat.Blocks) * uint64(stat.Bsize), // nolint:unconvert + FreeB: uint64(stat.Bavail) * uint64(stat.Bsize), // nolint:unconvert + }, nil + }, fs.Impl.(*fsImpl).isRetriable) + + return c.(blob.Capacity), err // nolint:forcetypeassert,wrapcheck +} diff --git a/repo/blob/filesystem/filesystem_storage_capacity_windows.go b/repo/blob/filesystem/filesystem_storage_capacity_windows.go new file mode 100644 index 000000000..4a2cf4439 --- /dev/null +++ b/repo/blob/filesystem/filesystem_storage_capacity_windows.go @@ -0,0 +1,29 @@ +//go:build windows +// +build windows + +package filesystem + +import ( + "context" + + "github.com/pkg/errors" + "golang.org/x/sys/windows" + + "github.com/kopia/kopia/repo/blob" +) + +func (fs *fsStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) { + var c blob.Capacity + + pathPtr, err := windows.UTF16PtrFromString(fs.RootPath) + if err != nil { + return blob.Capacity{}, errors.Wrap(err, "windows GetCapacity") + } + + err = windows.GetDiskFreeSpaceEx(pathPtr, nil, &c.SizeB, &c.FreeB) + if err != nil { + return blob.Capacity{}, errors.Wrap(err, "windows GetCapacity") + } + + return c, nil +} diff --git a/repo/blob/gcs/gcs_storage.go b/repo/blob/gcs/gcs_storage.go index 9ae195ee8..951e7b04a 100644 --- a/repo/blob/gcs/gcs_storage.go +++ b/repo/blob/gcs/gcs_storage.go @@ -37,6 +37,10 @@ type gcsStorage struct { bucket *gcsclient.BucketHandle } +func (gcs *gcsStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) { + return blob.Capacity{}, blob.ErrNotAVolume +} + func (gcs *gcsStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int64, output blob.OutputBuffer) error { if offset < 0 { return blob.ErrInvalidRange diff --git a/repo/blob/gdrive/gdrive_storage.go b/repo/blob/gdrive/gdrive_storage.go index cc3d6e2cb..27df9a781 100644 --- a/repo/blob/gdrive/gdrive_storage.go +++ b/repo/blob/gdrive/gdrive_storage.go @@ -47,10 +47,31 @@ type gdriveStorage struct { Options client *drive.FilesService + about *drive.AboutService folderID string fileIDCache *fileIDCache } +func (gdrive *gdriveStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) { + req := gdrive.about.Get().Fields("storageQuota") + + res, err := req.Context(ctx).Do() + if err != nil { + return blob.Capacity{}, errors.Wrap(err, "get about in GetCapacity()") + } + + q := res.StorageQuota + if q.Limit == 0 { + // If Limit is unset then the drive has no size limit. + return blob.Capacity{}, blob.ErrNotAVolume + } + + return blob.Capacity{ + SizeB: uint64(q.Limit), + FreeB: uint64(q.Limit) - uint64(q.Usage), + }, nil +} + func (gdrive *gdriveStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int64, output blob.OutputBuffer) error { if offset < 0 { return blob.ErrInvalidRange @@ -479,9 +500,10 @@ func tokenSourceFromCredentialsJSON(ctx context.Context, data json.RawMessage, s return cfg.TokenSource(ctx), nil } -// CreateDriveClient creates a new Google Drive client. +// CreateDriveService creates a new Google Drive service, which encapsulates multiple clients +// used to access different Google Drive functionality. // Exported for tests only. -func CreateDriveClient(ctx context.Context, opt *Options) (*drive.FilesService, error) { +func CreateDriveService(ctx context.Context, opt *Options) (*drive.Service, error) { var err error var ts oauth2.TokenSource @@ -510,7 +532,7 @@ func CreateDriveClient(ctx context.Context, opt *Options) (*drive.FilesService, return nil, errors.Wrap(err, "unable to create Drive client") } - return service.Files, nil + return service, nil } // New creates new Google Drive-backed storage with specified options: @@ -524,14 +546,15 @@ func New(ctx context.Context, opt *Options) (blob.Storage, error) { return nil, errors.New("folder-id must be specified") } - client, err := CreateDriveClient(ctx, opt) + service, err := CreateDriveService(ctx, opt) if err != nil { return nil, err } gdrive := &gdriveStorage{ Options: *opt, - client: client, + client: service.Files, + about: service.About, folderID: opt.FolderID, fileIDCache: newFileIDCache(), } diff --git a/repo/blob/gdrive/gdrive_storage_test.go b/repo/blob/gdrive/gdrive_storage_test.go index 9e9032dfa..6403219fa 100644 --- a/repo/blob/gdrive/gdrive_storage_test.go +++ b/repo/blob/gdrive/gdrive_storage_test.go @@ -115,10 +115,12 @@ func mustGetOptionsOrSkip(t *testing.T) *gdrive.Options { func createTestFolderOrSkip(ctx context.Context, t *testing.T, opt *gdrive.Options, folderName string) *gdrive.Options { t.Helper() - client, err := gdrive.CreateDriveClient(ctx, opt) + service, err := gdrive.CreateDriveService(ctx, opt) require.NoError(t, err) + client := service.Files + folder, err := client.Create(&drive.File{ Name: folderName, Parents: []string{opt.FolderID}, @@ -139,10 +141,11 @@ func createTestFolderOrSkip(ctx context.Context, t *testing.T, opt *gdrive.Optio func deleteTestFolder(ctx context.Context, t *testing.T, opt *gdrive.Options) { t.Helper() - client, err := gdrive.CreateDriveClient(ctx, opt) + service, err := gdrive.CreateDriveService(ctx, opt) require.NoError(t, err) + client := service.Files err = client.Delete(opt.FolderID).Context(ctx).Do() require.NoError(t, err) diff --git a/repo/blob/logging/logging_storage.go b/repo/blob/logging/logging_storage.go index 5be18723d..8dc9a5764 100644 --- a/repo/blob/logging/logging_storage.go +++ b/repo/blob/logging/logging_storage.go @@ -55,6 +55,22 @@ func (s *loggingStorage) GetBlob(ctx context.Context, id blob.ID, offset, length return err } +func (s *loggingStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) { + timer := timetrack.StartTimer() + c, err := s.base.GetCapacity(ctx) + dt := timer.Elapsed() + + s.logger.Debugw(s.prefix+"GetCapacity", + "sizeBytes", c.SizeB, + "freeBytes", c.FreeB, + "error", err, + "duration", dt, + ) + + // nolint:wrapcheck + return c, err +} + func (s *loggingStorage) GetMetadata(ctx context.Context, id blob.ID) (blob.Metadata, error) { s.beginConcurrency() defer s.endConcurrency() diff --git a/repo/blob/readonly/readonly_storage.go b/repo/blob/readonly/readonly_storage.go index d9fb43e07..2fa066add 100644 --- a/repo/blob/readonly/readonly_storage.go +++ b/repo/blob/readonly/readonly_storage.go @@ -17,6 +17,11 @@ type readonlyStorage struct { base blob.Storage } +func (s readonlyStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) { + // nolint:wrapcheck + return s.base.GetCapacity(ctx) +} + func (s readonlyStorage) GetBlob(ctx context.Context, id blob.ID, offset, length int64, output blob.OutputBuffer) error { // nolint:wrapcheck return s.base.GetBlob(ctx, id, offset, length, output) diff --git a/repo/blob/s3/s3_storage.go b/repo/blob/s3/s3_storage.go index f4c9e1607..f7b848889 100644 --- a/repo/blob/s3/s3_storage.go +++ b/repo/blob/s3/s3_storage.go @@ -35,6 +35,10 @@ type s3Storage struct { storageConfig *StorageConfig } +func (s *s3Storage) GetCapacity(ctx context.Context) (blob.Capacity, error) { + return blob.Capacity{}, blob.ErrNotAVolume +} + func (s *s3Storage) GetBlob(ctx context.Context, b blob.ID, offset, length int64, output blob.OutputBuffer) error { return s.getBlobWithVersion(ctx, b, latestVersionID, offset, length, output) } diff --git a/repo/blob/sftp/sftp_storage.go b/repo/blob/sftp/sftp_storage.go index d92764927..4633147c6 100644 --- a/repo/blob/sftp/sftp_storage.go +++ b/repo/blob/sftp/sftp_storage.go @@ -101,6 +101,22 @@ func (s *sftpImpl) IsConnectionClosedError(err error) bool { return false } +func (s *sftpStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) { + i, err := s.Impl.(*sftpImpl).rec.UsingConnection(ctx, "GetCapacity", func(conn connection.Connection) (interface{}, error) { + stat, err := sftpClientFromConnection(conn).StatVFS(s.RootPath) + if err != nil { + return blob.Capacity{}, errors.Wrap(err, "GetCapacity") + } + + return blob.Capacity{ + SizeB: stat.Blocks * stat.Bsize, + FreeB: stat.Bfree * stat.Bsize, + }, err // nolint:wrapcheck + }) + + return i.(blob.Capacity), err // nolint:forcetypeassert,wrapcheck +} + func (s *sftpImpl) GetBlobFromPath(ctx context.Context, dirPath, fullPath string, offset, length int64, output blob.OutputBuffer) error { // nolint:wrapcheck return s.rec.UsingConnectionNoResult(ctx, "GetBlobFromPath", func(conn connection.Connection) error { diff --git a/repo/blob/storage.go b/repo/blob/storage.go index 38330e6b7..20e6af00c 100644 --- a/repo/blob/storage.go +++ b/repo/blob/storage.go @@ -32,6 +32,10 @@ // by an implementation of Storage is specified in a PutBlob call. var ErrUnsupportedPutBlobOption = errors.New("unsupported put-blob option") +// ErrNotAVolume is returned when attempting to use a Volume method against a storage +// implementation that does not support the intended functionality. +var ErrNotAVolume = errors.New("unsupported method, storage is not a volume") + // Bytes encapsulates a sequence of bytes, possibly stored in a non-contiguous buffers, // which can be written sequentially or treated as a io.Reader. type Bytes interface { @@ -49,6 +53,20 @@ type OutputBuffer interface { Length() int } +// Capacity describes the storage capacity and usage of a Volume. +type Capacity struct { + // Size of volume in bytes. + SizeB uint64 + // Available (writeable) space in bytes. + FreeB uint64 +} + +// Volume defines disk/volume access API to blob storage. +type Volume interface { + // Capacity returns the capacity of a given volume. + GetCapacity(ctx context.Context) (Capacity, error) +} + // Reader defines read access API to blob storage. type Reader interface { // GetBlob returns full or partial contents of a blob with given ID. @@ -124,6 +142,7 @@ func (o PutOptions) HasRetentionOptions() bool { // // The required semantics are provided by existing commercial cloud storage products (Google Cloud, AWS, Azure). type Storage interface { + Volume Reader // PutBlob uploads the blob with given data to the repository or replaces existing blob with the provided diff --git a/repo/blob/webdav/webdav_storage.go b/repo/blob/webdav/webdav_storage.go index e98dea998..4ad31b48f 100644 --- a/repo/blob/webdav/webdav_storage.go +++ b/repo/blob/webdav/webdav_storage.go @@ -44,6 +44,10 @@ type davStorageImpl struct { cli *gowebdav.Client } +func (d *davStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) { + return blob.Capacity{}, blob.ErrNotAVolume +} + func (d *davStorageImpl) GetBlobFromPath(ctx context.Context, dirPath, path string, offset, length int64, output blob.OutputBuffer) error { output.Reset()