From eddde91f2d2178f18e4ad95525fba56dba719ff8 Mon Sep 17 00:00:00 2001 From: Ali Dowair Date: Tue, 14 Jun 2022 21:09:45 +0300 Subject: [PATCH] chore(snapshots): unify sparse and normal FS output paths (#1981) * Unify sparse and normal IO output This commit refactors the code paths that excercise normal and sparse writing of restored content. The goal is to expose sparsefile.Copy() and iocopy.Copy() to be interchangeable, thereby allowing us to wrap or transform their behavior more easily in the future. * Introduce getStreamCopier() * Pull ioCopy() into getStreamCopier() * Fix small nit in E2E test We should be getting the block size of the destination file, not the source file. * Call stat.GetBlockSize() once per FilesystemOutput A tiny refactor to pull this call out of the generated stream copier, as the block size should not change from one file to the next within a restore entry. NOTE: as a side effect, if block size could not be found (an error is returned), we will return the default stream copier instead of letting the sparse copier fail. A warning will be logged, but this error will not cause the restore to fail; it will proceed silently. --- cli/command_restore.go | 10 +++- internal/server/api_restore.go | 6 ++- internal/sparsefile/sparsefile.go | 43 +++------------- internal/sparsefile/sparsefile_test.go | 22 ++++++-- internal/stat/stat_bsd.go | 12 ++++- internal/stat/stat_unix.go | 12 ++++- snapshot/restore/local_fs_output.go | 69 ++++++++++++++++++++------ tests/end_to_end_test/restore_test.go | 2 +- 8 files changed, 116 insertions(+), 60 deletions(-) diff --git a/cli/command_restore.go b/cli/command_restore.go index f1760dcb5..1e1eca124 100644 --- a/cli/command_restore.go +++ b/cli/command_restore.go @@ -212,7 +212,7 @@ func (c *commandRestore) restoreOutput(ctx context.Context) (restore.Output, err m := c.detectRestoreMode(ctx, c.restoreMode, targetpath) switch m { case restoreModeLocal: - return &restore.FilesystemOutput{ + o := &restore.FilesystemOutput{ TargetPath: targetpath, OverwriteDirectories: c.restoreOverwriteDirectories, OverwriteFiles: c.restoreOverwriteFiles, @@ -223,7 +223,13 @@ func (c *commandRestore) restoreOutput(ctx context.Context) (restore.Output, err SkipPermissions: c.restoreSkipPermissions, SkipTimes: c.restoreSkipTimes, Sparse: c.restoreSparse, - }, nil + } + + if err := o.Init(); err != nil { + return nil, errors.Wrap(err, "unable to create output file") + } + + return o, nil case restoreModeZip, restoreModeZipNoCompress: f, err := os.Create(targetpath) //nolint:gosec diff --git a/internal/server/api_restore.go b/internal/server/api_restore.go index d672d0d42..719aef756 100644 --- a/internal/server/api_restore.go +++ b/internal/server/api_restore.go @@ -51,7 +51,11 @@ func handleRestore(ctx context.Context, rc requestContext) (interface{}, *apiErr switch { case req.Filesystem != nil: - out = req.Filesystem + out := req.Filesystem + if err := out.Init(); err != nil { + return nil, internalServerError(err) + } + description = "Destination: " + req.Filesystem.TargetPath case req.ZipFile != "": diff --git a/internal/sparsefile/sparsefile.go b/internal/sparsefile/sparsefile.go index 25ec58529..24839af66 100644 --- a/internal/sparsefile/sparsefile.go +++ b/internal/sparsefile/sparsefile.go @@ -8,49 +8,20 @@ "github.com/pkg/errors" "github.com/kopia/kopia/internal/iocopy" - "github.com/kopia/kopia/internal/stat" ) -// Write writes the contents of src to the given targetPath, omitting any holes. -func Write(targetPath string, src io.Reader, size int64) error { - dst, err := os.OpenFile(targetPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o600) //nolint:gosec,gomnd - if err != nil { - return err //nolint:wrapcheck - } - - // ensure we always close f. Note that this does not conflict with the - // close below, as close is idempotent. - defer dst.Close() //nolint:errcheck,gosec - - if err = dst.Truncate(size); err != nil { - return errors.Wrap(err, "error writing sparse file") - } - - s, err := stat.GetBlockSize(targetPath) - if err != nil { - return errors.Wrap(err, "error writing sparse file") - } - +// Copy copies a file sparsely (omitting holes) from src to dst, while recycling +// shared buffers. +func Copy(dst io.WriteSeeker, src io.Reader, bufSize uint64) (int64, error) { buf := iocopy.GetBuffer() defer iocopy.ReleaseBuffer(buf) - w, err := copySparse(dst, src, buf[0:s]) - if err != nil { - return errors.Wrap(err, "error writing sparse file") - } - - if w != size { - return errors.Errorf("") - } - - if err := dst.Close(); err != nil { - return err //nolint:wrapcheck - } - - return nil + return copyBuffer(dst, src, buf[0:bufSize]) } -func copySparse(dst io.WriteSeeker, src io.Reader, buf []byte) (written int64, err error) { +// Copy copies bits from src to dst, seeking past blocks of zero bits in src. These +// blocks are omitted, creating a file with holes in dst. +func copyBuffer(dst io.WriteSeeker, src io.Reader, buf []byte) (written int64, err error) { for { nr, er := src.Read(buf) if nr > 0 { // nolint:nestif diff --git a/internal/sparsefile/sparsefile_test.go b/internal/sparsefile/sparsefile_test.go index 56305fd13..2096b8d73 100644 --- a/internal/sparsefile/sparsefile_test.go +++ b/internal/sparsefile/sparsefile_test.go @@ -10,7 +10,7 @@ "github.com/kopia/kopia/internal/stat" ) -func TestSparseWrite(t *testing.T) { +func TestSparseCopy(t *testing.T) { t.Parallel() if runtime.GOOS == "windows" { @@ -67,16 +67,30 @@ type chunk struct { src := filepath.Join(dir, "src"+c.name) dst := filepath.Join(dir, "dst"+c.name) - fd, err := os.Create(src) + sf, err := os.Create(src) if err != nil { t.Fatal(err) } for _, d := range c.data { - fd.WriteAt(bytes.Repeat(d.slice, int(d.rep)), int64(d.off)) + sf.WriteAt(bytes.Repeat(d.slice, int(d.rep)), int64(d.off)) } - err = Write(dst, fd, int64(c.size)) + df, err := os.Create(dst) + if err != nil { + t.Fatal(err) + } + + if err = df.Truncate(int64(c.size)); err != nil { + t.Fatal(err) + } + + blk, err := stat.GetBlockSize(dst) + if err != nil { + t.Fatal(err) + } + + _, err = Copy(df, sf, blk) if err != nil { t.Fatalf("error writing %s: %v", dst, err) } diff --git a/internal/stat/stat_bsd.go b/internal/stat/stat_bsd.go index 52e27e407..57fdf7202 100644 --- a/internal/stat/stat_bsd.go +++ b/internal/stat/stat_bsd.go @@ -5,12 +5,18 @@ // common stat commands. package stat -import "syscall" +import ( + "syscall" + + "github.com/pkg/errors" +) const ( diskBlockSize uint64 = 512 ) +var errInvalidBlockSize = errors.New("invalid disk block size") + // GetFileAllocSize gets the space allocated on disk for the file. // 'fname' in bytes. func GetFileAllocSize(fname string) (uint64, error) { @@ -33,5 +39,9 @@ func GetBlockSize(path string) (uint64, error) { return 0, err // nolint:wrapcheck } + if st.F_bsize <= 0 { + return 0, errors.Wrapf(errInvalidBlockSize, "%d", st.F_bsize) + } + return uint64(st.F_bsize), nil } diff --git a/internal/stat/stat_unix.go b/internal/stat/stat_unix.go index b7543e71e..a5f5386ea 100644 --- a/internal/stat/stat_unix.go +++ b/internal/stat/stat_unix.go @@ -5,12 +5,18 @@ // common stat commands. package stat -import "syscall" +import ( + "syscall" + + "github.com/pkg/errors" +) const ( diskBlockSize uint64 = 512 ) +var errInvalidBlockSize = errors.New("invalid disk block size") + // GetFileAllocSize gets the space allocated on disk for the file // 'fname' in bytes. func GetFileAllocSize(fname string) (uint64, error) { @@ -33,5 +39,9 @@ func GetBlockSize(path string) (uint64, error) { return 0, err // nolint:wrapcheck } + if st.Bsize <= 0 { + return 0, errors.Wrapf(errInvalidBlockSize, "%d", st.Bsize) + } + return uint64(st.Bsize), nil // nolint:unconvert,nolintlint } diff --git a/snapshot/restore/local_fs_output.go b/snapshot/restore/local_fs_output.go index 89d829227..fc8f64e83 100644 --- a/snapshot/restore/local_fs_output.go +++ b/snapshot/restore/local_fs_output.go @@ -16,6 +16,7 @@ "github.com/kopia/kopia/internal/atomicfile" "github.com/kopia/kopia/internal/iocopy" "github.com/kopia/kopia/internal/sparsefile" + "github.com/kopia/kopia/internal/stat" "github.com/kopia/kopia/snapshot" ) @@ -25,6 +26,35 @@ maxTimeDeltaToConsiderFileTheSame = 2 * time.Second ) +// streamCopier is a generic function type to perform the actual copying of data bits +// from a source stream to a destination stream. +type streamCopier func(io.WriteSeeker, io.Reader) (int64, error) + +// getStreamCopier returns a function that can copy data from a source stream to a destination stream. +func getStreamCopier(ctx context.Context, targetpath string, sparse bool) (streamCopier, error) { + if sparse { + if !isWindows() { + dirpath := filepath.Dir(targetpath) + + s, err := stat.GetBlockSize(dirpath) + if err != nil { + return nil, errors.Wrapf(err, "error getting disk block size for target %v", dirpath) + } + + return func(w io.WriteSeeker, r io.Reader) (int64, error) { + return sparsefile.Copy(w, r, s) //nolint:wrapcheck + }, nil + } + + log(ctx).Debugf("sparse copying is not supported on Windows, falling back to regular copying") + } + + // Wrap iocopy.Copy to conform to StreamCopier type. + return func(w io.WriteSeeker, r io.Reader) (int64, error) { + return iocopy.Copy(w, r) //nolint:wrapcheck + }, nil +} + // FilesystemOutput contains the options for outputting a file system tree. type FilesystemOutput struct { // TargetPath for restore. @@ -58,8 +88,25 @@ type FilesystemOutput struct { // SkipTimes when set to true causes restore to skip restoring modification times. SkipTimes bool `json:"skipTimes"` - // Sparse when set to true causes restore files sparsely-not writing any holes (zero regions) to disk. + // Sparse when set to true causes the restored files to be sparse. Sparse bool `json:"sparse"` + + // copier is the StreamCopier to use for copying the actual bit stream to output. + // It is assigned at runtime based on the target filesystem and restore options. + copier streamCopier +} + +// Init initializes the internal members of the filesystem writer output. +// This method must be called before FilesystemOutput can be used. +func (o *FilesystemOutput) Init() error { + c, err := getStreamCopier(context.TODO(), o.TargetPath, o.Sparse) + if err != nil { + return errors.Wrap(err, "unable to get stream copier") + } + + o.copier = c + + return nil } // Parallelizable implements restore.Output interface. @@ -307,20 +354,23 @@ func (o *FilesystemOutput) createDirectory(ctx context.Context, path string) err } } -func write(targetPath string, r fs.Reader) error { +func write(targetPath string, r fs.Reader, size int64, c streamCopier) error { f, err := os.OpenFile(targetPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o600) //nolint:gosec,gomnd if err != nil { return err //nolint:wrapcheck } + if err := f.Truncate(size); err != nil { + return err //nolint:wrapcheck + } + // ensure we always close f. Note that this does not conflict with the // close below, as close is idempotent. defer f.Close() //nolint:errcheck,gosec name := f.Name() - err = iocopy.JustCopy(f, r) - if err != nil { + if _, err := c(f, r); err != nil { return errors.Wrap(err, "cannot write data to file %q "+name) } @@ -358,16 +408,7 @@ func (o *FilesystemOutput) copyFileContent(ctx context.Context, targetPath strin return atomicfile.Write(targetPath, r) } - if o.Sparse { - if isWindows() { - log(ctx).Infof("sparse files are not supported on Windows, restoring normally") - } else { - // nolint:wrapcheck - return sparsefile.Write(targetPath, r, f.Size()) - } - } - - return write(targetPath, r) + return write(targetPath, r, f.Size(), o.copier) } func isEmptyDirectory(name string) (bool, error) { diff --git a/tests/end_to_end_test/restore_test.go b/tests/end_to_end_test/restore_test.go index b0fb6224d..aea51f9f4 100644 --- a/tests/end_to_end_test/restore_test.go +++ b/tests/end_to_end_test/restore_test.go @@ -508,7 +508,7 @@ func TestSnapshotSparseRestore(t *testing.T) { bufSize := uint64(iocopy.BufSize) - blkSize, err := stat.GetBlockSize(sourceDir) + blkSize, err := stat.GetBlockSize(restoreDir) if err != nil { t.Fatalf("error getting disk block size: %v", err) }