refactor(general): minor cleanups (#4003)

Followups to #3655

* wrap fs.Reader
* nit: remove unnecessary intermediate variable
* nit: rename local variable
* cleanup: move restore.Progress interface to cli pkg
* move cliRestoreProgress to a separate file
* refactor(general): replace switch with if/else for clarity
  Removes a tautology for `err == nil`, which was guaranteed
  to be true in the second case statement for the switch.
  Replacing the switch statement with and if/else block is clearer.
* initialize restoreProgress in restore command
* fix: use error.Wrapf with format string and args


Simplify SetCounters signature:

Pass arguments in a `restore.Stats` struct.
  `SetCounters(s restore.Stats)`
Simplifies call sites and implementation.
In this case it makes sense to pass all the values
using the restore.Stats struct as it simplifies
the calls.
However, this pattern should be avoided in general
as it essentially makes all the arguments "optional".
This makes it easy to miss setting a value and simply
passing 0 (the default value), thus it becomes error
prone.
In this particular case, the struct is being passed
through verbatim, thus eliminating the risk of
missing a value, at least in the current state of
the code.
This commit is contained in:
Julio López
2024-08-27 09:42:58 -07:00
committed by GitHub
parent d37de8316e
commit 5dbc8a478a
8 changed files with 165 additions and 199 deletions

View File

@@ -23,7 +23,6 @@
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/logging"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/snapshot/restore"
"github.com/kopia/kopia/snapshot/snapshotmaintenance"
)
@@ -87,7 +86,7 @@ type appServices interface {
advancedCommand(ctx context.Context)
repositoryConfigFileName() string
getProgress() *cliProgress
getRestoreProgress() restore.Progress
getRestoreProgress() RestoreProgress
stdout() io.Writer
Stderr() io.Writer
@@ -120,7 +119,7 @@ type App struct {
enableAutomaticMaintenance bool
pf profileFlags
progress *cliProgress
restoreProgress restore.Progress
restoreProgress RestoreProgress
initialUpdateCheckDelay time.Duration
updateCheckInterval time.Duration
updateAvailableNotifyInterval time.Duration
@@ -186,11 +185,11 @@ func (c *App) getProgress() *cliProgress {
}
// SetRestoreProgress is used to set custom restore progress, purposed to be used in tests.
func (c *App) SetRestoreProgress(p restore.Progress) {
func (c *App) SetRestoreProgress(p RestoreProgress) {
c.restoreProgress = p
}
func (c *App) getRestoreProgress() restore.Progress {
func (c *App) getRestoreProgress() RestoreProgress {
return c.restoreProgress
}
@@ -293,10 +292,6 @@ func (c *App) setup(app *kingpin.Application) {
c.pf.setup(app)
c.progress.setup(c, app)
if rp, ok := c.restoreProgress.(*cliRestoreProgress); ok {
rp.setup(c, app)
}
c.blob.setup(c, app)
c.benchmark.setup(c, app)
c.cache.setup(c, app)
@@ -325,8 +320,7 @@ type commandParent interface {
// NewApp creates a new instance of App.
func NewApp() *App {
return &App{
progress: &cliProgress{},
restoreProgress: &cliRestoreProgress{},
progress: &cliProgress{},
cliStorageProviders: []StorageProvider{
{"from-config", "the provided configuration file", func() StorageFlags { return &storageFromConfigFlags{} }},

View File

@@ -259,124 +259,4 @@ func (p *cliProgress) Finish() {
}
}
type cliRestoreProgress struct {
restoredCount atomic.Int32
enqueuedCount atomic.Int32
skippedCount atomic.Int32
ignoredErrorsCount atomic.Int32
restoredTotalFileSize atomic.Int64
enqueuedTotalFileSize atomic.Int64
skippedTotalFileSize atomic.Int64
progressUpdateInterval time.Duration
enableProgress bool
svc appServices
outputThrottle timetrack.Throttle
outputMutex sync.Mutex
out textOutput
eta timetrack.Estimator
// +checklocks:outputMutex
lastLineLength int
}
func (p *cliRestoreProgress) setup(svc appServices, _ *kingpin.Application) {
cp := svc.getProgress()
if cp == nil {
return
}
p.progressUpdateInterval = cp.progressUpdateInterval
p.enableProgress = cp.enableProgress
p.out = cp.out
p.svc = svc
p.eta = timetrack.Start()
}
func (p *cliRestoreProgress) SetCounters(
enqueuedCount, restoredCount, skippedCount, ignoredErrors int32,
enqueuedBytes, restoredBytes, skippedBytes int64,
) {
p.enqueuedCount.Store(enqueuedCount)
p.enqueuedTotalFileSize.Store(enqueuedBytes)
p.restoredCount.Store(restoredCount)
p.restoredTotalFileSize.Store(restoredBytes)
p.skippedCount.Store(skippedCount)
p.skippedTotalFileSize.Store(skippedBytes)
p.ignoredErrorsCount.Store(ignoredErrors)
p.maybeOutput()
}
func (p *cliRestoreProgress) Flush() {
p.outputThrottle.Reset()
p.output("\n")
}
func (p *cliRestoreProgress) maybeOutput() {
if p.outputThrottle.ShouldOutput(p.svc.getProgress().progressUpdateInterval) {
p.output("")
}
}
func (p *cliRestoreProgress) output(suffix string) {
if !p.svc.getProgress().enableProgress {
return
}
p.outputMutex.Lock()
defer p.outputMutex.Unlock()
restoredCount := p.restoredCount.Load()
enqueuedCount := p.enqueuedCount.Load()
skippedCount := p.skippedCount.Load()
ignoredCount := p.ignoredErrorsCount.Load()
restoredSize := p.restoredTotalFileSize.Load()
enqueuedSize := p.enqueuedTotalFileSize.Load()
skippedSize := p.skippedTotalFileSize.Load()
if restoredSize == 0 {
return
}
var maybeRemaining, maybeSkipped, maybeErrors string
if est, ok := p.eta.Estimate(float64(restoredSize), float64(enqueuedSize)); ok {
maybeRemaining = fmt.Sprintf(" %v (%.1f%%) remaining %v",
units.BytesPerSecondsString(est.SpeedPerSecond),
est.PercentComplete,
est.Remaining)
}
if skippedCount > 0 {
maybeSkipped = fmt.Sprintf(", skipped %v (%v)", skippedCount, units.BytesString(skippedSize))
}
if ignoredCount > 0 {
maybeErrors = fmt.Sprintf(", ignored %v errors", ignoredCount)
}
line := fmt.Sprintf("Processed %v (%v) of %v (%v)%v%v%v.",
restoredCount+skippedCount, units.BytesString(restoredSize),
enqueuedCount, units.BytesString(enqueuedSize),
maybeSkipped, maybeErrors, maybeRemaining,
)
var extraSpaces string
if len(line) < p.lastLineLength {
// add extra spaces to wipe over previous line if it was longer than current
extraSpaces = strings.Repeat(" ", p.lastLineLength-len(line))
}
p.lastLineLength = len(line)
p.out.printStderr("\r%v%v%v", line, extraSpaces, suffix)
}
var _ snapshotfs.UploadProgress = (*cliProgress)(nil)

View File

@@ -18,6 +18,7 @@
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/localfs"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/timetrack"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/object"
@@ -95,6 +96,12 @@
unlimitedDepth = math.MaxInt32
)
// RestoreProgress is invoked to report progress during a restore.
type RestoreProgress interface {
SetCounters(s restore.Stats)
Flush()
}
type restoreSourceTarget struct {
source string
target string
@@ -366,6 +373,21 @@ func (c *commandRestore) setupPlaceholderExpansion(ctx context.Context, rep repo
return rootEntry, nil
}
func (c *commandRestore) getRestoreProgress() RestoreProgress {
if rp := c.svc.getRestoreProgress(); rp != nil {
return rp
}
pf := c.svc.getProgress().progressFlags
return &cliRestoreProgress{
enableProgress: pf.enableProgress,
out: pf.out,
progressUpdateInterval: pf.progressUpdateInterval,
eta: timetrack.Start(),
}
}
func (c *commandRestore) run(ctx context.Context, rep repo.Repository) error {
output, oerr := c.restoreOutput(ctx, rep)
if oerr != nil {
@@ -396,17 +418,9 @@ func (c *commandRestore) run(ctx context.Context, rep repo.Repository) error {
rootEntry = re
}
restoreProgress := c.svc.getRestoreProgress()
restoreProgress := c.getRestoreProgress()
progressCallback := func(ctx context.Context, stats restore.Stats) {
restoreProgress.SetCounters(
stats.EnqueuedFileCount+stats.EnqueuedDirCount+stats.EnqueuedSymlinkCount,
stats.RestoredFileCount+stats.RestoredDirCount+stats.RestoredSymlinkCount,
stats.SkippedCount,
stats.IgnoredErrorCount,
stats.EnqueuedTotalFileSize,
stats.RestoredTotalFileSize,
stats.SkippedTotalFileSize,
)
restoreProgress.SetCounters(stats)
}
st, err := restore.Entry(ctx, rep, output, rootEntry, restore.Options{

116
cli/restore_progress.go Normal file
View File

@@ -0,0 +1,116 @@
package cli
import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/kopia/kopia/internal/timetrack"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/snapshot/restore"
)
type cliRestoreProgress struct {
restoredCount atomic.Int32
enqueuedCount atomic.Int32
skippedCount atomic.Int32
ignoredErrorsCount atomic.Int32
restoredTotalFileSize atomic.Int64
enqueuedTotalFileSize atomic.Int64
skippedTotalFileSize atomic.Int64
progressUpdateInterval time.Duration
enableProgress bool
outputThrottle timetrack.Throttle
outputMutex sync.Mutex
out textOutput // +checklocksignore: outputMutex just happens to be held always.
eta timetrack.Estimator // +checklocksignore: outputMutex just happens to be held always.
// +checklocks:outputMutex
lastLineLength int
}
func (p *cliRestoreProgress) SetCounters(s restore.Stats) {
p.enqueuedCount.Store(s.EnqueuedFileCount + s.EnqueuedDirCount + s.EnqueuedSymlinkCount)
p.enqueuedTotalFileSize.Store(s.EnqueuedTotalFileSize)
p.restoredCount.Store(s.RestoredFileCount + s.RestoredDirCount + s.RestoredSymlinkCount)
p.restoredTotalFileSize.Store(s.RestoredTotalFileSize)
p.skippedCount.Store(s.SkippedCount)
p.skippedTotalFileSize.Store(s.SkippedTotalFileSize)
p.ignoredErrorsCount.Store(s.IgnoredErrorCount)
p.maybeOutput()
}
func (p *cliRestoreProgress) Flush() {
p.outputThrottle.Reset()
p.output("\n")
}
func (p *cliRestoreProgress) maybeOutput() {
if p.outputThrottle.ShouldOutput(p.progressUpdateInterval) {
p.output("")
}
}
func (p *cliRestoreProgress) output(suffix string) {
if !p.enableProgress {
return
}
// ensure the counters are not going back in an output line compared to the previous one
p.outputMutex.Lock()
defer p.outputMutex.Unlock()
restoredCount := p.restoredCount.Load()
enqueuedCount := p.enqueuedCount.Load()
skippedCount := p.skippedCount.Load()
ignoredCount := p.ignoredErrorsCount.Load()
restoredSize := p.restoredTotalFileSize.Load()
enqueuedSize := p.enqueuedTotalFileSize.Load()
skippedSize := p.skippedTotalFileSize.Load()
if restoredSize == 0 {
return
}
var maybeRemaining, maybeSkipped, maybeErrors string
if est, ok := p.eta.Estimate(float64(restoredSize), float64(enqueuedSize)); ok {
maybeRemaining = fmt.Sprintf(" %v (%.1f%%) remaining %v",
units.BytesPerSecondsString(est.SpeedPerSecond),
est.PercentComplete,
est.Remaining)
}
if skippedCount > 0 {
maybeSkipped = fmt.Sprintf(", skipped %v (%v)", skippedCount, units.BytesString(skippedSize))
}
if ignoredCount > 0 {
maybeErrors = fmt.Sprintf(", ignored %v errors", ignoredCount)
}
line := fmt.Sprintf("Processed %v (%v) of %v (%v)%v%v%v.",
restoredCount+skippedCount, units.BytesString(restoredSize),
enqueuedCount, units.BytesString(enqueuedSize),
maybeSkipped, maybeErrors, maybeRemaining,
)
var extraSpaces string
if len(line) < p.lastLineLength {
// add extra spaces to wipe over previous line if it was longer than current
extraSpaces = strings.Repeat(" ", p.lastLineLength-len(line))
}
p.lastLineLength = len(line)
p.out.printStderr("\r%v%v%v", line, extraSpaces, suffix)
}

View File

@@ -54,27 +54,16 @@ func getStreamCopier(ctx context.Context, targetpath string, sparse bool) (strea
}, nil
}
// progressReportingReader is just a wrapper for fs.Reader which is used to capture and pass to cb number of bytes read.
// progressReportingReader wraps fs.Reader Read function to capture the and pass
// the number of bytes read to the callback cb.
type progressReportingReader struct {
r fs.Reader
fs.Reader
cb FileWriteProgress
}
func (r *progressReportingReader) Entry() (fs.Entry, error) {
return r.r.Entry() //nolint:wrapcheck
}
func (r *progressReportingReader) Seek(offset int64, whence int) (int64, error) {
return r.r.Seek(offset, whence) //nolint:wrapcheck
}
func (r *progressReportingReader) Close() error {
return r.r.Close() //nolint:wrapcheck
}
func (r *progressReportingReader) Read(p []byte) (int, error) {
bytesRead, err := r.r.Read(p)
bytesRead, err := r.Reader.Read(p)
if err == nil && r.cb != nil {
r.cb(int64(bytesRead))
}
@@ -399,10 +388,8 @@ func write(targetPath string, r fs.Reader, size int64, c streamCopier) error {
// close below, as close is idempotent.
defer f.Close() //nolint:errcheck
name := f.Name()
if _, err := c(f, r); err != nil {
return errors.Wrap(err, "cannot write data to file %q "+name)
return errors.Wrapf(err, "cannot write data to file %q", f.Name())
}
if err := f.Close(); err != nil {
@@ -431,9 +418,9 @@ func (o *FilesystemOutput) copyFileContent(ctx context.Context, targetPath strin
}
defer r.Close() //nolint:errcheck
wr := &progressReportingReader{
r: r,
cb: progressCb,
rr := &progressReportingReader{
Reader: r,
cb: progressCb,
}
log(ctx).Debugf("copying file contents to: %v", targetPath)
@@ -441,10 +428,10 @@ func (o *FilesystemOutput) copyFileContent(ctx context.Context, targetPath strin
if o.WriteFilesAtomically {
//nolint:wrapcheck
return atomicfile.Write(targetPath, wr)
return atomicfile.Write(targetPath, rr)
}
return write(targetPath, wr, f.Size(), o.copier)
return write(targetPath, rr, f.Size(), o.copier)
}
func isEmptyDirectory(name string) (bool, error) {

View File

@@ -1,10 +0,0 @@
package restore
// Progress is invoked by copier to report status of snapshot restoration.
type Progress interface {
SetCounters(
enqueuedCount, restoredCount, skippedCount, ignoredErrors int32,
enqueuedBytes, restoredBytes, skippedBytes int64,
)
Flush()
}

View File

@@ -145,10 +145,9 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis
defer u.Progress.FinishedHashingFile(relativePath, f.Size())
if pf, ok := f.(snapshot.HasDirEntryOrNil); ok {
switch de, err := pf.DirEntryOrNil(ctx); {
case err != nil:
if de, err := pf.DirEntryOrNil(ctx); err != nil {
return nil, errors.Wrap(err, "can't read placeholder")
case err == nil && de != nil:
} else if de != nil {
// We have read sufficient information from the shallow file's extended
// attribute to construct DirEntry.
_, err := u.repo.VerifyObject(ctx, de.ObjectID)
@@ -1073,10 +1072,9 @@ type dirReadError struct {
func uploadShallowDirInternal(ctx context.Context, directory fs.Directory, u *Uploader) (*snapshot.DirEntry, error) {
if pf, ok := directory.(snapshot.HasDirEntryOrNil); ok {
switch de, err := pf.DirEntryOrNil(ctx); {
case err != nil:
if de, err := pf.DirEntryOrNil(ctx); err != nil {
return nil, errors.Wrapf(err, "error reading placeholder for %q", directory.Name())
case err == nil && de != nil:
} else if de != nil {
if _, err := u.repo.VerifyObject(ctx, de.ObjectID); err != nil {
return nil, errors.Wrapf(err, "invalid placeholder for %q contains foreign object.ID", directory.Name())
}

View File

@@ -29,6 +29,7 @@
"github.com/kopia/kopia/internal/stat"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/internal/testutil"
"github.com/kopia/kopia/snapshot/restore"
"github.com/kopia/kopia/tests/clitestutil"
"github.com/kopia/kopia/tests/testdirtree"
"github.com/kopia/kopia/tests/testenv"
@@ -42,33 +43,19 @@
overriddenDirPermissions = 0o752
)
type restoreProgressInvocation struct {
enqueuedCount, restoredCount, skippedCount, ignoredErrors int32
enqueuedBytes, restoredBytes, skippedBytes int64
}
type fakeRestoreProgress struct {
mtx sync.Mutex
invocations []restoreProgressInvocation
invocations []restore.Stats
flushesCount int
invocationAfterFlush bool
}
func (p *fakeRestoreProgress) SetCounters(
enqueuedCount, restoredCount, skippedCount, ignoredErrors int32,
enqueuedBytes, restoredBytes, skippedBytes int64,
) {
func (p *fakeRestoreProgress) SetCounters(s restore.Stats) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.invocations = append(p.invocations, restoreProgressInvocation{
enqueuedCount: enqueuedCount,
restoredCount: restoredCount,
skippedCount: skippedCount,
ignoredErrors: ignoredErrors,
enqueuedBytes: enqueuedBytes,
restoredBytes: restoredBytes,
skippedBytes: skippedBytes,
})
p.invocations = append(p.invocations, s)
if p.flushesCount > 0 {
p.invocationAfterFlush = true
}