restore: support for parallelization (#668)

This commit is contained in:
Jarek Kowalski
2020-10-07 21:41:32 -07:00
committed by GitHub
parent 4fd0bcf7dc
commit ec9c4d6095
6 changed files with 105 additions and 28 deletions

View File

@@ -53,6 +53,7 @@
restoreOverwriteFiles = true
restoreConsistentAttributes = false
restoreMode = restoreModeAuto
restoreParallel = 8
)
const (
@@ -71,6 +72,7 @@ func addRestoreFlags(cmd *kingpin.CmdClause) {
cmd.Flag("overwrite-files", "Specifies whether or not to overwrite already existing files").BoolVar(&restoreOverwriteFiles)
cmd.Flag("consistent-attributes", "When multiple snapshots match, fail if they have inconsistent attributes").Envar("KOPIA_RESTORE_CONSISTENT_ATTRIBUTES").BoolVar(&restoreConsistentAttributes)
cmd.Flag("mode", "Override restore mode").EnumVar(&restoreMode, restoreModeAuto, restoreModeLocal, restoreModeZip, restoreModeZipNoCompress, restoreModeTar, restoreModeTgz)
cmd.Flag("parallel", "Restore parallelism (1=disable)").IntVar(&restoreParallel)
}
func restoreOutput() (restore.Output, error) {
@@ -141,7 +143,7 @@ func detectRestoreMode(m string) string {
return restoreModeTgz
default:
printStderr("Restoring to local filesystem (%v)...\n", restoreTargetPath)
printStderr("Restoring to local filesystem (%v) with parallelism=%v...\n", restoreTargetPath, restoreParallel)
return restoreModeLocal
}
}
@@ -161,7 +163,12 @@ func runRestoreCommand(ctx context.Context, rep repo.Repository) error {
return errors.Wrap(err, "unable to get filesystem entry")
}
st, err := restore.Entry(ctx, rep, output, rootEntry)
st, err := restore.Entry(ctx, rep, output, rootEntry, restore.Options{
Parallel: restoreParallel,
ProgressCallback: func(enqueued, processing, completed int64) {
log(ctx).Infof("Restored %v/%v. Processing %v...", completed, enqueued, processing)
},
})
if err != nil {
return err
}

View File

@@ -138,6 +138,24 @@ func (v *Queue) maybeReportProgress() {
cb(v.enqueuedWork, v.activeWorkerCount, v.completedWork)
}
// OnNthCompletion invokes the provided callback once the returned callback function has been invoked exactly n times.
func OnNthCompletion(n int, callback CallbackFunc) CallbackFunc {
var mu sync.Mutex
return func() error {
mu.Lock()
n--
call := n == 0
mu.Unlock()
if call {
return callback()
}
return nil
}
}
// NewQueue returns new parallel work queue.
func NewQueue() *Queue {
return &Queue{

View File

@@ -29,6 +29,11 @@ type FilesystemOutput struct {
OverwriteFiles bool
}
// Parallelizable implements restore.Output interface.
func (o *FilesystemOutput) Parallelizable() bool {
return true
}
// BeginDirectory implements restore.Output interface.
func (o *FilesystemOutput) BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error {
path := filepath.Join(o.TargetPath, filepath.FromSlash(relativePath))
@@ -57,7 +62,7 @@ func (o *FilesystemOutput) Close(ctx context.Context) error {
// WriteFile implements restore.Output interface.
func (o *FilesystemOutput) WriteFile(ctx context.Context, relativePath string, f fs.File) error {
log(ctx).Infof("WriteFile %v (%v bytes) %v", filepath.Join(o.TargetPath, relativePath), f.Size(), f.Mode())
log(ctx).Debugf("WriteFile %v (%v bytes) %v", filepath.Join(o.TargetPath, relativePath), f.Size(), f.Mode())
path := filepath.Join(o.TargetPath, filepath.FromSlash(relativePath))
if err := o.copyFileContent(ctx, path, f); err != nil {
@@ -78,7 +83,7 @@ func (o *FilesystemOutput) CreateSymlink(ctx context.Context, relativePath strin
return errors.Wrap(err, "error reading link target")
}
log(ctx).Infof("CreateSymlink %v => %v", filepath.Join(o.TargetPath, relativePath), targetPath)
log(ctx).Debugf("CreateSymlink %v => %v", filepath.Join(o.TargetPath, relativePath), targetPath)
path := filepath.Join(o.TargetPath, filepath.FromSlash(relativePath))

View File

@@ -3,11 +3,13 @@
import (
"context"
"path"
"runtime"
"sync/atomic"
"github.com/pkg/errors"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/parallelwork"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/logging"
)
@@ -16,6 +18,7 @@
// Output encapsulates output for restore operation.
type Output interface {
Parallelizable() bool
BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error
FinishDirectory(ctx context.Context, relativePath string, e fs.Directory) error
WriteFile(ctx context.Context, relativePath string, e fs.File) error
@@ -31,16 +34,33 @@ type Stats struct {
SymlinkCount int32
}
// Entry walks a snapshot root with given root entry and restores it to the provided output.
func Entry(ctx context.Context, rep repo.Repository, output Output, rootEntry fs.Entry) (Stats, error) {
return copyToOutput(ctx, output, rootEntry)
// Options provides optional restore parameters.
type Options struct {
Parallel int
ProgressCallback func(enqueued, active, completed int64)
}
func copyToOutput(ctx context.Context, output Output, rootEntry fs.Entry) (Stats, error) {
c := copier{output: output}
// Entry walks a snapshot root with given root entry and restores it to the provided output.
func Entry(ctx context.Context, rep repo.Repository, output Output, rootEntry fs.Entry, options Options) (Stats, error) {
c := copier{output: output, q: parallelwork.NewQueue()}
if err := c.copyEntry(ctx, rootEntry, ""); err != nil {
return Stats{}, errors.Wrap(err, "error copying")
c.q.ProgressCallback = options.ProgressCallback
c.q.EnqueueBack(func() error {
return errors.Wrap(c.copyEntry(ctx, rootEntry, "", func() error { return nil }), "error copying")
})
numWorkers := options.Parallel
if numWorkers == 0 {
numWorkers = runtime.NumCPU()
}
if !output.Parallelizable() {
numWorkers = 1
}
if err := c.q.Process(numWorkers); err != nil {
return Stats{}, errors.Wrap(err, "restore error")
}
if err := c.output.Close(ctx); err != nil {
@@ -53,58 +73,75 @@ func copyToOutput(ctx context.Context, output Output, rootEntry fs.Entry) (Stats
type copier struct {
stats Stats
output Output
q *parallelwork.Queue
}
func (c *copier) copyEntry(ctx context.Context, e fs.Entry, targetPath string) error {
func (c *copier) copyEntry(ctx context.Context, e fs.Entry, targetPath string, onCompletion func() error) error {
switch e := e.(type) {
case fs.Directory:
log(ctx).Debugf("dir: '%v'", targetPath)
return c.copyDirectory(ctx, e, targetPath)
return c.copyDirectory(ctx, e, targetPath, onCompletion)
case fs.File:
log(ctx).Debugf("file: '%v'", targetPath)
atomic.AddInt32(&c.stats.FileCount, 1)
atomic.AddInt64(&c.stats.TotalFileSize, e.Size())
return c.output.WriteFile(ctx, targetPath, e)
if err := c.output.WriteFile(ctx, targetPath, e); err != nil {
return errors.Wrap(err, "copy file")
}
return onCompletion()
case fs.Symlink:
atomic.AddInt32(&c.stats.SymlinkCount, 1)
log(ctx).Debugf("symlink: '%v'", targetPath)
return c.output.CreateSymlink(ctx, targetPath, e)
if err := c.output.CreateSymlink(ctx, targetPath, e); err != nil {
return errors.Wrap(err, "create symlink")
}
return onCompletion()
default:
return errors.Errorf("invalid FS entry type for %q: %#v", targetPath, e)
}
}
func (c *copier) copyDirectory(ctx context.Context, d fs.Directory, targetPath string) error {
func (c *copier) copyDirectory(ctx context.Context, d fs.Directory, targetPath string, onCompletion parallelwork.CallbackFunc) error {
atomic.AddInt32(&c.stats.DirCount, 1)
if err := c.output.BeginDirectory(ctx, targetPath, d); err != nil {
return errors.Wrap(err, "create directory")
}
if err := c.copyDirectoryContent(ctx, d, targetPath); err != nil {
return errors.Wrap(err, "copy directory contents")
}
return errors.Wrap(c.copyDirectoryContent(ctx, d, targetPath, func() error {
if err := c.output.FinishDirectory(ctx, targetPath, d); err != nil {
return errors.Wrap(err, "finish directory")
}
if err := c.output.FinishDirectory(ctx, targetPath, d); err != nil {
return errors.Wrap(err, "finish directory")
}
return nil
return onCompletion()
}), "copy directory contents")
}
func (c *copier) copyDirectoryContent(ctx context.Context, d fs.Directory, targetPath string) error {
func (c *copier) copyDirectoryContent(ctx context.Context, d fs.Directory, targetPath string, onCompletion parallelwork.CallbackFunc) error {
entries, err := d.Readdir(ctx)
if err != nil {
return err
}
if len(entries) == 0 {
return onCompletion()
}
onItemCompletion := parallelwork.OnNthCompletion(len(entries), onCompletion)
for _, e := range entries {
if err := c.copyEntry(ctx, e, path.Join(targetPath, e.Name())); err != nil {
return err
}
e := e
c.q.EnqueueBack(func() error {
return c.copyEntry(ctx, e, path.Join(targetPath, e.Name()), onItemCompletion)
})
}
return nil

View File

@@ -16,6 +16,11 @@ type TarOutput struct {
tf *tar.Writer
}
// Parallelizable implements restore.Output interface.
func (o *TarOutput) Parallelizable() bool {
return false
}
// BeginDirectory implements restore.Output interface.
func (o *TarOutput) BeginDirectory(ctx context.Context, relativePath string, d fs.Directory) error {
if relativePath == "" {

View File

@@ -17,6 +17,11 @@ type ZipOutput struct {
method uint16
}
// Parallelizable implements restore.Output interface.
func (o *ZipOutput) Parallelizable() bool {
return false
}
// BeginDirectory implements restore.Output interface.
func (o *ZipOutput) BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error {
return nil