diff --git a/cli/command_restore.go b/cli/command_restore.go index 5801e2859..3b45cd151 100644 --- a/cli/command_restore.go +++ b/cli/command_restore.go @@ -53,6 +53,7 @@ restoreOverwriteFiles = true restoreConsistentAttributes = false restoreMode = restoreModeAuto + restoreParallel = 8 ) const ( @@ -71,6 +72,7 @@ func addRestoreFlags(cmd *kingpin.CmdClause) { cmd.Flag("overwrite-files", "Specifies whether or not to overwrite already existing files").BoolVar(&restoreOverwriteFiles) cmd.Flag("consistent-attributes", "When multiple snapshots match, fail if they have inconsistent attributes").Envar("KOPIA_RESTORE_CONSISTENT_ATTRIBUTES").BoolVar(&restoreConsistentAttributes) cmd.Flag("mode", "Override restore mode").EnumVar(&restoreMode, restoreModeAuto, restoreModeLocal, restoreModeZip, restoreModeZipNoCompress, restoreModeTar, restoreModeTgz) + cmd.Flag("parallel", "Restore parallelism (1=disable)").IntVar(&restoreParallel) } func restoreOutput() (restore.Output, error) { @@ -141,7 +143,7 @@ func detectRestoreMode(m string) string { return restoreModeTgz default: - printStderr("Restoring to local filesystem (%v)...\n", restoreTargetPath) + printStderr("Restoring to local filesystem (%v) with parallelism=%v...\n", restoreTargetPath, restoreParallel) return restoreModeLocal } } @@ -161,7 +163,12 @@ func runRestoreCommand(ctx context.Context, rep repo.Repository) error { return errors.Wrap(err, "unable to get filesystem entry") } - st, err := restore.Entry(ctx, rep, output, rootEntry) + st, err := restore.Entry(ctx, rep, output, rootEntry, restore.Options{ + Parallel: restoreParallel, + ProgressCallback: func(enqueued, processing, completed int64) { + log(ctx).Infof("Restored %v/%v. Processing %v...", completed, enqueued, processing) + }, + }) if err != nil { return err } diff --git a/internal/parallelwork/parallel_work_queue.go b/internal/parallelwork/parallel_work_queue.go index f52ad68a7..440698b10 100644 --- a/internal/parallelwork/parallel_work_queue.go +++ b/internal/parallelwork/parallel_work_queue.go @@ -138,6 +138,24 @@ func (v *Queue) maybeReportProgress() { cb(v.enqueuedWork, v.activeWorkerCount, v.completedWork) } +// OnNthCompletion invokes the provided callback once the returned callback function has been invoked exactly n times. +func OnNthCompletion(n int, callback CallbackFunc) CallbackFunc { + var mu sync.Mutex + + return func() error { + mu.Lock() + n-- + call := n == 0 + mu.Unlock() + + if call { + return callback() + } + + return nil + } +} + // NewQueue returns new parallel work queue. func NewQueue() *Queue { return &Queue{ diff --git a/snapshot/restore/local_fs_output.go b/snapshot/restore/local_fs_output.go index a1599b096..136162822 100644 --- a/snapshot/restore/local_fs_output.go +++ b/snapshot/restore/local_fs_output.go @@ -29,6 +29,11 @@ type FilesystemOutput struct { OverwriteFiles bool } +// Parallelizable implements restore.Output interface. +func (o *FilesystemOutput) Parallelizable() bool { + return true +} + // BeginDirectory implements restore.Output interface. func (o *FilesystemOutput) BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error { path := filepath.Join(o.TargetPath, filepath.FromSlash(relativePath)) @@ -57,7 +62,7 @@ func (o *FilesystemOutput) Close(ctx context.Context) error { // WriteFile implements restore.Output interface. func (o *FilesystemOutput) WriteFile(ctx context.Context, relativePath string, f fs.File) error { - log(ctx).Infof("WriteFile %v (%v bytes) %v", filepath.Join(o.TargetPath, relativePath), f.Size(), f.Mode()) + log(ctx).Debugf("WriteFile %v (%v bytes) %v", filepath.Join(o.TargetPath, relativePath), f.Size(), f.Mode()) path := filepath.Join(o.TargetPath, filepath.FromSlash(relativePath)) if err := o.copyFileContent(ctx, path, f); err != nil { @@ -78,7 +83,7 @@ func (o *FilesystemOutput) CreateSymlink(ctx context.Context, relativePath strin return errors.Wrap(err, "error reading link target") } - log(ctx).Infof("CreateSymlink %v => %v", filepath.Join(o.TargetPath, relativePath), targetPath) + log(ctx).Debugf("CreateSymlink %v => %v", filepath.Join(o.TargetPath, relativePath), targetPath) path := filepath.Join(o.TargetPath, filepath.FromSlash(relativePath)) diff --git a/snapshot/restore/restore.go b/snapshot/restore/restore.go index c259fe604..a1a8acac1 100644 --- a/snapshot/restore/restore.go +++ b/snapshot/restore/restore.go @@ -3,11 +3,13 @@ import ( "context" "path" + "runtime" "sync/atomic" "github.com/pkg/errors" "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/internal/parallelwork" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/logging" ) @@ -16,6 +18,7 @@ // Output encapsulates output for restore operation. type Output interface { + Parallelizable() bool BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error FinishDirectory(ctx context.Context, relativePath string, e fs.Directory) error WriteFile(ctx context.Context, relativePath string, e fs.File) error @@ -31,16 +34,33 @@ type Stats struct { SymlinkCount int32 } -// Entry walks a snapshot root with given root entry and restores it to the provided output. -func Entry(ctx context.Context, rep repo.Repository, output Output, rootEntry fs.Entry) (Stats, error) { - return copyToOutput(ctx, output, rootEntry) +// Options provides optional restore parameters. +type Options struct { + Parallel int + ProgressCallback func(enqueued, active, completed int64) } -func copyToOutput(ctx context.Context, output Output, rootEntry fs.Entry) (Stats, error) { - c := copier{output: output} +// Entry walks a snapshot root with given root entry and restores it to the provided output. +func Entry(ctx context.Context, rep repo.Repository, output Output, rootEntry fs.Entry, options Options) (Stats, error) { + c := copier{output: output, q: parallelwork.NewQueue()} - if err := c.copyEntry(ctx, rootEntry, ""); err != nil { - return Stats{}, errors.Wrap(err, "error copying") + c.q.ProgressCallback = options.ProgressCallback + + c.q.EnqueueBack(func() error { + return errors.Wrap(c.copyEntry(ctx, rootEntry, "", func() error { return nil }), "error copying") + }) + + numWorkers := options.Parallel + if numWorkers == 0 { + numWorkers = runtime.NumCPU() + } + + if !output.Parallelizable() { + numWorkers = 1 + } + + if err := c.q.Process(numWorkers); err != nil { + return Stats{}, errors.Wrap(err, "restore error") } if err := c.output.Close(ctx); err != nil { @@ -53,58 +73,75 @@ func copyToOutput(ctx context.Context, output Output, rootEntry fs.Entry) (Stats type copier struct { stats Stats output Output + q *parallelwork.Queue } -func (c *copier) copyEntry(ctx context.Context, e fs.Entry, targetPath string) error { +func (c *copier) copyEntry(ctx context.Context, e fs.Entry, targetPath string, onCompletion func() error) error { switch e := e.(type) { case fs.Directory: log(ctx).Debugf("dir: '%v'", targetPath) - return c.copyDirectory(ctx, e, targetPath) + return c.copyDirectory(ctx, e, targetPath, onCompletion) case fs.File: log(ctx).Debugf("file: '%v'", targetPath) atomic.AddInt32(&c.stats.FileCount, 1) atomic.AddInt64(&c.stats.TotalFileSize, e.Size()) - return c.output.WriteFile(ctx, targetPath, e) + if err := c.output.WriteFile(ctx, targetPath, e); err != nil { + return errors.Wrap(err, "copy file") + } + + return onCompletion() + case fs.Symlink: atomic.AddInt32(&c.stats.SymlinkCount, 1) log(ctx).Debugf("symlink: '%v'", targetPath) - return c.output.CreateSymlink(ctx, targetPath, e) + if err := c.output.CreateSymlink(ctx, targetPath, e); err != nil { + return errors.Wrap(err, "create symlink") + } + + return onCompletion() + default: return errors.Errorf("invalid FS entry type for %q: %#v", targetPath, e) } } -func (c *copier) copyDirectory(ctx context.Context, d fs.Directory, targetPath string) error { +func (c *copier) copyDirectory(ctx context.Context, d fs.Directory, targetPath string, onCompletion parallelwork.CallbackFunc) error { atomic.AddInt32(&c.stats.DirCount, 1) if err := c.output.BeginDirectory(ctx, targetPath, d); err != nil { return errors.Wrap(err, "create directory") } - if err := c.copyDirectoryContent(ctx, d, targetPath); err != nil { - return errors.Wrap(err, "copy directory contents") - } + return errors.Wrap(c.copyDirectoryContent(ctx, d, targetPath, func() error { + if err := c.output.FinishDirectory(ctx, targetPath, d); err != nil { + return errors.Wrap(err, "finish directory") + } - if err := c.output.FinishDirectory(ctx, targetPath, d); err != nil { - return errors.Wrap(err, "finish directory") - } - - return nil + return onCompletion() + }), "copy directory contents") } -func (c *copier) copyDirectoryContent(ctx context.Context, d fs.Directory, targetPath string) error { +func (c *copier) copyDirectoryContent(ctx context.Context, d fs.Directory, targetPath string, onCompletion parallelwork.CallbackFunc) error { entries, err := d.Readdir(ctx) if err != nil { return err } + if len(entries) == 0 { + return onCompletion() + } + + onItemCompletion := parallelwork.OnNthCompletion(len(entries), onCompletion) + for _, e := range entries { - if err := c.copyEntry(ctx, e, path.Join(targetPath, e.Name())); err != nil { - return err - } + e := e + + c.q.EnqueueBack(func() error { + return c.copyEntry(ctx, e, path.Join(targetPath, e.Name()), onItemCompletion) + }) } return nil diff --git a/snapshot/restore/tar_output.go b/snapshot/restore/tar_output.go index a62ece882..db6fb68a9 100644 --- a/snapshot/restore/tar_output.go +++ b/snapshot/restore/tar_output.go @@ -16,6 +16,11 @@ type TarOutput struct { tf *tar.Writer } +// Parallelizable implements restore.Output interface. +func (o *TarOutput) Parallelizable() bool { + return false +} + // BeginDirectory implements restore.Output interface. func (o *TarOutput) BeginDirectory(ctx context.Context, relativePath string, d fs.Directory) error { if relativePath == "" { diff --git a/snapshot/restore/zip_output.go b/snapshot/restore/zip_output.go index 65e109c8e..8f2daf0fa 100644 --- a/snapshot/restore/zip_output.go +++ b/snapshot/restore/zip_output.go @@ -17,6 +17,11 @@ type ZipOutput struct { method uint16 } +// Parallelizable implements restore.Output interface. +func (o *ZipOutput) Parallelizable() bool { + return false +} + // BeginDirectory implements restore.Output interface. func (o *ZipOutput) BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error { return nil