mirror of
https://github.com/kopia/kopia.git
synced 2026-05-07 22:32:45 -04:00
chore(snapshots): unify sparse and normal FS output paths (#1981)
* Unify sparse and normal IO output This commit refactors the code paths that excercise normal and sparse writing of restored content. The goal is to expose sparsefile.Copy() and iocopy.Copy() to be interchangeable, thereby allowing us to wrap or transform their behavior more easily in the future. * Introduce getStreamCopier() * Pull ioCopy() into getStreamCopier() * Fix small nit in E2E test We should be getting the block size of the destination file, not the source file. * Call stat.GetBlockSize() once per FilesystemOutput A tiny refactor to pull this call out of the generated stream copier, as the block size should not change from one file to the next within a restore entry. NOTE: as a side effect, if block size could not be found (an error is returned), we will return the default stream copier instead of letting the sparse copier fail. A warning will be logged, but this error will not cause the restore to fail; it will proceed silently.
This commit is contained in:
@@ -212,7 +212,7 @@ func (c *commandRestore) restoreOutput(ctx context.Context) (restore.Output, err
|
||||
m := c.detectRestoreMode(ctx, c.restoreMode, targetpath)
|
||||
switch m {
|
||||
case restoreModeLocal:
|
||||
return &restore.FilesystemOutput{
|
||||
o := &restore.FilesystemOutput{
|
||||
TargetPath: targetpath,
|
||||
OverwriteDirectories: c.restoreOverwriteDirectories,
|
||||
OverwriteFiles: c.restoreOverwriteFiles,
|
||||
@@ -223,7 +223,13 @@ func (c *commandRestore) restoreOutput(ctx context.Context) (restore.Output, err
|
||||
SkipPermissions: c.restoreSkipPermissions,
|
||||
SkipTimes: c.restoreSkipTimes,
|
||||
Sparse: c.restoreSparse,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if err := o.Init(); err != nil {
|
||||
return nil, errors.Wrap(err, "unable to create output file")
|
||||
}
|
||||
|
||||
return o, nil
|
||||
|
||||
case restoreModeZip, restoreModeZipNoCompress:
|
||||
f, err := os.Create(targetpath) //nolint:gosec
|
||||
|
||||
@@ -51,7 +51,11 @@ func handleRestore(ctx context.Context, rc requestContext) (interface{}, *apiErr
|
||||
|
||||
switch {
|
||||
case req.Filesystem != nil:
|
||||
out = req.Filesystem
|
||||
out := req.Filesystem
|
||||
if err := out.Init(); err != nil {
|
||||
return nil, internalServerError(err)
|
||||
}
|
||||
|
||||
description = "Destination: " + req.Filesystem.TargetPath
|
||||
|
||||
case req.ZipFile != "":
|
||||
|
||||
@@ -8,49 +8,20 @@
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/iocopy"
|
||||
"github.com/kopia/kopia/internal/stat"
|
||||
)
|
||||
|
||||
// Write writes the contents of src to the given targetPath, omitting any holes.
|
||||
func Write(targetPath string, src io.Reader, size int64) error {
|
||||
dst, err := os.OpenFile(targetPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o600) //nolint:gosec,gomnd
|
||||
if err != nil {
|
||||
return err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
// ensure we always close f. Note that this does not conflict with the
|
||||
// close below, as close is idempotent.
|
||||
defer dst.Close() //nolint:errcheck,gosec
|
||||
|
||||
if err = dst.Truncate(size); err != nil {
|
||||
return errors.Wrap(err, "error writing sparse file")
|
||||
}
|
||||
|
||||
s, err := stat.GetBlockSize(targetPath)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error writing sparse file")
|
||||
}
|
||||
|
||||
// Copy copies a file sparsely (omitting holes) from src to dst, while recycling
|
||||
// shared buffers.
|
||||
func Copy(dst io.WriteSeeker, src io.Reader, bufSize uint64) (int64, error) {
|
||||
buf := iocopy.GetBuffer()
|
||||
defer iocopy.ReleaseBuffer(buf)
|
||||
|
||||
w, err := copySparse(dst, src, buf[0:s])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error writing sparse file")
|
||||
}
|
||||
|
||||
if w != size {
|
||||
return errors.Errorf("")
|
||||
}
|
||||
|
||||
if err := dst.Close(); err != nil {
|
||||
return err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
return nil
|
||||
return copyBuffer(dst, src, buf[0:bufSize])
|
||||
}
|
||||
|
||||
func copySparse(dst io.WriteSeeker, src io.Reader, buf []byte) (written int64, err error) {
|
||||
// Copy copies bits from src to dst, seeking past blocks of zero bits in src. These
|
||||
// blocks are omitted, creating a file with holes in dst.
|
||||
func copyBuffer(dst io.WriteSeeker, src io.Reader, buf []byte) (written int64, err error) {
|
||||
for {
|
||||
nr, er := src.Read(buf)
|
||||
if nr > 0 { // nolint:nestif
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
"github.com/kopia/kopia/internal/stat"
|
||||
)
|
||||
|
||||
func TestSparseWrite(t *testing.T) {
|
||||
func TestSparseCopy(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if runtime.GOOS == "windows" {
|
||||
@@ -67,16 +67,30 @@ type chunk struct {
|
||||
src := filepath.Join(dir, "src"+c.name)
|
||||
dst := filepath.Join(dir, "dst"+c.name)
|
||||
|
||||
fd, err := os.Create(src)
|
||||
sf, err := os.Create(src)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, d := range c.data {
|
||||
fd.WriteAt(bytes.Repeat(d.slice, int(d.rep)), int64(d.off))
|
||||
sf.WriteAt(bytes.Repeat(d.slice, int(d.rep)), int64(d.off))
|
||||
}
|
||||
|
||||
err = Write(dst, fd, int64(c.size))
|
||||
df, err := os.Create(dst)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err = df.Truncate(int64(c.size)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
blk, err := stat.GetBlockSize(dst)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = Copy(df, sf, blk)
|
||||
if err != nil {
|
||||
t.Fatalf("error writing %s: %v", dst, err)
|
||||
}
|
||||
|
||||
@@ -5,12 +5,18 @@
|
||||
// common stat commands.
|
||||
package stat
|
||||
|
||||
import "syscall"
|
||||
import (
|
||||
"syscall"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
diskBlockSize uint64 = 512
|
||||
)
|
||||
|
||||
var errInvalidBlockSize = errors.New("invalid disk block size")
|
||||
|
||||
// GetFileAllocSize gets the space allocated on disk for the file.
|
||||
// 'fname' in bytes.
|
||||
func GetFileAllocSize(fname string) (uint64, error) {
|
||||
@@ -33,5 +39,9 @@ func GetBlockSize(path string) (uint64, error) {
|
||||
return 0, err // nolint:wrapcheck
|
||||
}
|
||||
|
||||
if st.F_bsize <= 0 {
|
||||
return 0, errors.Wrapf(errInvalidBlockSize, "%d", st.F_bsize)
|
||||
}
|
||||
|
||||
return uint64(st.F_bsize), nil
|
||||
}
|
||||
|
||||
@@ -5,12 +5,18 @@
|
||||
// common stat commands.
|
||||
package stat
|
||||
|
||||
import "syscall"
|
||||
import (
|
||||
"syscall"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
diskBlockSize uint64 = 512
|
||||
)
|
||||
|
||||
var errInvalidBlockSize = errors.New("invalid disk block size")
|
||||
|
||||
// GetFileAllocSize gets the space allocated on disk for the file
|
||||
// 'fname' in bytes.
|
||||
func GetFileAllocSize(fname string) (uint64, error) {
|
||||
@@ -33,5 +39,9 @@ func GetBlockSize(path string) (uint64, error) {
|
||||
return 0, err // nolint:wrapcheck
|
||||
}
|
||||
|
||||
if st.Bsize <= 0 {
|
||||
return 0, errors.Wrapf(errInvalidBlockSize, "%d", st.Bsize)
|
||||
}
|
||||
|
||||
return uint64(st.Bsize), nil // nolint:unconvert,nolintlint
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
"github.com/kopia/kopia/internal/atomicfile"
|
||||
"github.com/kopia/kopia/internal/iocopy"
|
||||
"github.com/kopia/kopia/internal/sparsefile"
|
||||
"github.com/kopia/kopia/internal/stat"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
)
|
||||
|
||||
@@ -25,6 +26,35 @@
|
||||
maxTimeDeltaToConsiderFileTheSame = 2 * time.Second
|
||||
)
|
||||
|
||||
// streamCopier is a generic function type to perform the actual copying of data bits
|
||||
// from a source stream to a destination stream.
|
||||
type streamCopier func(io.WriteSeeker, io.Reader) (int64, error)
|
||||
|
||||
// getStreamCopier returns a function that can copy data from a source stream to a destination stream.
|
||||
func getStreamCopier(ctx context.Context, targetpath string, sparse bool) (streamCopier, error) {
|
||||
if sparse {
|
||||
if !isWindows() {
|
||||
dirpath := filepath.Dir(targetpath)
|
||||
|
||||
s, err := stat.GetBlockSize(dirpath)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error getting disk block size for target %v", dirpath)
|
||||
}
|
||||
|
||||
return func(w io.WriteSeeker, r io.Reader) (int64, error) {
|
||||
return sparsefile.Copy(w, r, s) //nolint:wrapcheck
|
||||
}, nil
|
||||
}
|
||||
|
||||
log(ctx).Debugf("sparse copying is not supported on Windows, falling back to regular copying")
|
||||
}
|
||||
|
||||
// Wrap iocopy.Copy to conform to StreamCopier type.
|
||||
return func(w io.WriteSeeker, r io.Reader) (int64, error) {
|
||||
return iocopy.Copy(w, r) //nolint:wrapcheck
|
||||
}, nil
|
||||
}
|
||||
|
||||
// FilesystemOutput contains the options for outputting a file system tree.
|
||||
type FilesystemOutput struct {
|
||||
// TargetPath for restore.
|
||||
@@ -58,8 +88,25 @@ type FilesystemOutput struct {
|
||||
// SkipTimes when set to true causes restore to skip restoring modification times.
|
||||
SkipTimes bool `json:"skipTimes"`
|
||||
|
||||
// Sparse when set to true causes restore files sparsely-not writing any holes (zero regions) to disk.
|
||||
// Sparse when set to true causes the restored files to be sparse.
|
||||
Sparse bool `json:"sparse"`
|
||||
|
||||
// copier is the StreamCopier to use for copying the actual bit stream to output.
|
||||
// It is assigned at runtime based on the target filesystem and restore options.
|
||||
copier streamCopier
|
||||
}
|
||||
|
||||
// Init initializes the internal members of the filesystem writer output.
|
||||
// This method must be called before FilesystemOutput can be used.
|
||||
func (o *FilesystemOutput) Init() error {
|
||||
c, err := getStreamCopier(context.TODO(), o.TargetPath, o.Sparse)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to get stream copier")
|
||||
}
|
||||
|
||||
o.copier = c
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Parallelizable implements restore.Output interface.
|
||||
@@ -307,20 +354,23 @@ func (o *FilesystemOutput) createDirectory(ctx context.Context, path string) err
|
||||
}
|
||||
}
|
||||
|
||||
func write(targetPath string, r fs.Reader) error {
|
||||
func write(targetPath string, r fs.Reader, size int64, c streamCopier) error {
|
||||
f, err := os.OpenFile(targetPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o600) //nolint:gosec,gomnd
|
||||
if err != nil {
|
||||
return err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
if err := f.Truncate(size); err != nil {
|
||||
return err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
// ensure we always close f. Note that this does not conflict with the
|
||||
// close below, as close is idempotent.
|
||||
defer f.Close() //nolint:errcheck,gosec
|
||||
|
||||
name := f.Name()
|
||||
|
||||
err = iocopy.JustCopy(f, r)
|
||||
if err != nil {
|
||||
if _, err := c(f, r); err != nil {
|
||||
return errors.Wrap(err, "cannot write data to file %q "+name)
|
||||
}
|
||||
|
||||
@@ -358,16 +408,7 @@ func (o *FilesystemOutput) copyFileContent(ctx context.Context, targetPath strin
|
||||
return atomicfile.Write(targetPath, r)
|
||||
}
|
||||
|
||||
if o.Sparse {
|
||||
if isWindows() {
|
||||
log(ctx).Infof("sparse files are not supported on Windows, restoring normally")
|
||||
} else {
|
||||
// nolint:wrapcheck
|
||||
return sparsefile.Write(targetPath, r, f.Size())
|
||||
}
|
||||
}
|
||||
|
||||
return write(targetPath, r)
|
||||
return write(targetPath, r, f.Size(), o.copier)
|
||||
}
|
||||
|
||||
func isEmptyDirectory(name string) (bool, error) {
|
||||
|
||||
@@ -508,7 +508,7 @@ func TestSnapshotSparseRestore(t *testing.T) {
|
||||
|
||||
bufSize := uint64(iocopy.BufSize)
|
||||
|
||||
blkSize, err := stat.GetBlockSize(sourceDir)
|
||||
blkSize, err := stat.GetBlockSize(restoreDir)
|
||||
if err != nil {
|
||||
t.Fatalf("error getting disk block size: %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user