Files
kopia/cli/cli_progress.go
Eugene Sumin 2b92388286 refactor(general): Increase restore progress granularity (#3655)
When restoring huge file(s), the progress reporting is done in a bit
weird way:

```
kopia_test % kopia snapshot restore ka2084d263182164b6cf3456668e6b6da /Users/eugen.sumin/kopia_test/2
Restoring to local filesystem (/Users/eugen.sumin/kopia_test/2) with parallelism=8...
Processed 6 (5.4 GB) of 5 (5.4 GB) 1.6 MB/s (100.0%) remaining 0s.
Processed 6 (5.4 GB) of 5 (5.4 GB) 1.6 MB/s (100.0%) remaining 0s.
Processed 6 (5.4 GB) of 5 (5.4 GB) 1.6 MB/s (100.0%) remaining 0s.
Processed 6 (5.4 GB) of 5 (5.4 GB) 1.5 MB/s (100.0%) remaining 0s.
Processed 6 (5.4 GB) of 5 (5.4 GB) 1.5 MB/s (100.0%) remaining 0s.
Processed 6 (5.4 GB) of 5 (5.4 GB) 1.5 MB/s (100.0%) remaining 0s.
Restored 5 files, 1 directories and 0 symbolic links (5.4 GB).
```
In fact, the amount of restored data is dumped when particular file
completely restored.

This PR contains the least invasive change, which allows us to see
progress update while file is downloaded from object storage.
```
Restoring to local filesystem (/Users/eugen.sumin/kopia_test/55) with parallelism=8...
Processed 2 (3.1 MB) of 5 (1.8 GB).
Processed 4 (459.6 MB) of 5 (1.8 GB) 270.3 MB/s (25.2%) remaining 4s.
Processed 4 (468.7 MB) of 5 (1.8 GB) 269 MB/s (25.7%) remaining 4s.
Processed 4 (741.6 MB) of 5 (1.8 GB) 269 MB/s (40.6%) remaining 3s.
Processed 4 (1.1 GB) of 5 (1.8 GB) 280 MB/s (57.6%) remaining 2s.
Processed 5 (1.4 GB) of 5 (1.8 GB) 291.1 MB/s (75.2%) remaining 1s.
Processed 5 (1.4 GB) of 5 (1.8 GB) 289.8 MB/s (75.6%) remaining 1s.
Processed 5 (1.6 GB) of 5 (1.8 GB) 270.2 MB/s (85.3%) remaining 0s.
Processed 5 (1.7 GB) of 5 (1.8 GB) 256.3 MB/s (95.0%) remaining 0s.
Processed 6 (1.8 GB) of 5 (1.8 GB) 251 MB/s (100.0%) remaining 0s.
Processed 6 (1.8 GB) of 5 (1.8 GB) 251 MB/s (100.0%) remaining 0s.
Restored 5 files, 1 directories and 0 symbolic links (1.8 GB).
```

---------

Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com>
2024-05-10 09:47:13 -07:00

383 lines
8.8 KiB
Go

package cli
import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/alecthomas/kingpin/v2"
"github.com/fatih/color"
"github.com/kopia/kopia/internal/timetrack"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/snapshot/snapshotfs"
)
const (
spinner = `|/-\`
)
type progressFlags struct {
enableProgress bool
progressUpdateInterval time.Duration
out textOutput
}
func (p *progressFlags) setup(svc appServices, app *kingpin.Application) {
app.Flag("progress", "Enable progress bar").Hidden().Default("true").BoolVar(&p.enableProgress)
app.Flag("progress-update-interval", "How often to update progress information").Hidden().Default("300ms").DurationVar(&p.progressUpdateInterval)
p.out.setup(svc)
}
type cliProgress struct {
snapshotfs.NullUploadProgress
// all int64 must precede all int32 due to alignment requirements on ARM
uploadedBytes atomic.Int64
cachedBytes atomic.Int64
hashedBytes atomic.Int64
outputThrottle timetrack.Throttle
cachedFiles atomic.Int32
inProgressHashing atomic.Int32
hashedFiles atomic.Int32
uploadedFiles atomic.Int32
ignoredErrorCount atomic.Int32
fatalErrorCount atomic.Int32
uploading atomic.Bool
uploadFinished atomic.Bool
outputMutex sync.Mutex
// +checklocks:outputMutex
lastLineLength int
// +checklocks:outputMutex
spinPhase int
uploadStartTime timetrack.Estimator // +checklocksignore
estimatedFileCount int // +checklocksignore
estimatedTotalBytes int64 // +checklocksignore
// indicates shared instance that does not reset counters at the beginning of upload.
shared bool
progressFlags
}
func (p *cliProgress) HashingFile(_ string) {
p.inProgressHashing.Add(1)
}
func (p *cliProgress) FinishedHashingFile(_ string, _ int64) {
p.hashedFiles.Add(1)
p.inProgressHashing.Add(-1)
p.maybeOutput()
}
func (p *cliProgress) UploadedBytes(numBytes int64) {
p.uploadedBytes.Add(numBytes)
p.uploadedFiles.Add(1)
p.maybeOutput()
}
func (p *cliProgress) HashedBytes(numBytes int64) {
p.hashedBytes.Add(numBytes)
p.maybeOutput()
}
func (p *cliProgress) Error(path string, err error, isIgnored bool) {
if isIgnored {
p.ignoredErrorCount.Add(1)
p.output(warningColor, fmt.Sprintf("Ignored error when processing \"%v\": %v\n", path, err))
} else {
p.fatalErrorCount.Add(1)
p.output(errorColor, fmt.Sprintf("Error when processing \"%v\": %v\n", path, err))
}
}
func (p *cliProgress) CachedFile(_ string, numBytes int64) {
p.cachedBytes.Add(numBytes)
p.cachedFiles.Add(1)
p.maybeOutput()
}
func (p *cliProgress) maybeOutput() {
if !p.uploading.Load() {
return
}
if p.outputThrottle.ShouldOutput(p.progressUpdateInterval) {
p.output(defaultColor, "")
}
}
func (p *cliProgress) output(col *color.Color, msg string) {
p.outputMutex.Lock()
defer p.outputMutex.Unlock()
hashedBytes := p.hashedBytes.Load()
cachedBytes := p.cachedBytes.Load()
uploadedBytes := p.uploadedBytes.Load()
cachedFiles := p.cachedFiles.Load()
inProgressHashing := p.inProgressHashing.Load()
hashedFiles := p.hashedFiles.Load()
ignoredErrorCount := p.ignoredErrorCount.Load()
fatalErrorCount := p.fatalErrorCount.Load()
line := fmt.Sprintf(
" %v %v hashing, %v hashed (%v), %v cached (%v), uploaded %v",
p.spinnerCharacter(),
inProgressHashing,
hashedFiles,
units.BytesString(hashedBytes),
cachedFiles,
units.BytesString(cachedBytes),
units.BytesString(uploadedBytes),
)
if fatalErrorCount > 0 {
line += fmt.Sprintf(" (%v fatal errors)", fatalErrorCount)
}
if ignoredErrorCount > 0 {
line += fmt.Sprintf(" (%v errors ignored)", ignoredErrorCount)
}
if msg != "" {
prefix := "\n ! "
if !p.enableProgress {
prefix = ""
}
col.Fprintf(p.out.stderr(), "%v%v", prefix, msg) //nolint:errcheck
}
if !p.enableProgress {
return
}
if est, ok := p.uploadStartTime.Estimate(float64(hashedBytes+cachedBytes), float64(p.estimatedTotalBytes)); ok {
line += fmt.Sprintf(", estimated %v", units.BytesString(p.estimatedTotalBytes))
line += fmt.Sprintf(" (%.1f%%)", est.PercentComplete)
line += fmt.Sprintf(" %v left", est.Remaining)
} else {
line += ", estimating..."
}
var extraSpaces string
if len(line) < p.lastLineLength {
// add extra spaces to wipe over previous line if it was longer than current
extraSpaces = strings.Repeat(" ", p.lastLineLength-len(line))
}
p.lastLineLength = len(line)
p.out.printStderr("\r%v%v", line, extraSpaces)
}
// +checklocks:p.outputMutex
func (p *cliProgress) spinnerCharacter() string {
if p.uploadFinished.Load() {
return "*"
}
x := p.spinPhase % len(spinner)
s := spinner[x : x+1]
p.spinPhase = (p.spinPhase + 1) % len(spinner)
return s
}
// +checklocksignore.
func (p *cliProgress) StartShared() {
*p = cliProgress{
uploadStartTime: timetrack.Start(),
shared: true,
progressFlags: p.progressFlags,
}
p.uploading.Store(true)
}
func (p *cliProgress) FinishShared() {
p.uploadFinished.Store(true)
p.output(defaultColor, "")
}
// +checklocksignore.
func (p *cliProgress) UploadStarted() {
if p.shared {
// do nothing
return
}
*p = cliProgress{
uploadStartTime: timetrack.Start(),
progressFlags: p.progressFlags,
}
p.uploading.Store(true)
}
func (p *cliProgress) EstimatedDataSize(fileCount int, totalBytes int64) {
if p.shared {
// do nothing
return
}
p.outputMutex.Lock()
defer p.outputMutex.Unlock()
p.estimatedFileCount = fileCount
p.estimatedTotalBytes = totalBytes
}
func (p *cliProgress) UploadFinished() {
// do nothing here, we still want to report the files flushed after the Upload has completed.
// instead, Finish() will be called.
}
func (p *cliProgress) Finish() {
if p.shared {
return
}
p.uploadFinished.Store(true)
p.uploading.Store(false)
p.output(defaultColor, "")
if p.enableProgress {
p.out.printStderr("\n")
}
}
type cliRestoreProgress struct {
restoredCount atomic.Int32
enqueuedCount atomic.Int32
skippedCount atomic.Int32
ignoredErrorsCount atomic.Int32
restoredTotalFileSize atomic.Int64
enqueuedTotalFileSize atomic.Int64
skippedTotalFileSize atomic.Int64
progressUpdateInterval time.Duration
enableProgress bool
svc appServices
outputThrottle timetrack.Throttle
outputMutex sync.Mutex
out textOutput
eta timetrack.Estimator
// +checklocks:outputMutex
lastLineLength int
}
func (p *cliRestoreProgress) setup(svc appServices, _ *kingpin.Application) {
cp := svc.getProgress()
if cp == nil {
return
}
p.progressUpdateInterval = cp.progressUpdateInterval
p.enableProgress = cp.enableProgress
p.out = cp.out
p.svc = svc
p.eta = timetrack.Start()
}
func (p *cliRestoreProgress) SetCounters(
enqueuedCount, restoredCount, skippedCount, ignoredErrors int32,
enqueuedBytes, restoredBytes, skippedBytes int64,
) {
p.enqueuedCount.Store(enqueuedCount)
p.enqueuedTotalFileSize.Store(enqueuedBytes)
p.restoredCount.Store(restoredCount)
p.restoredTotalFileSize.Store(restoredBytes)
p.skippedCount.Store(skippedCount)
p.skippedTotalFileSize.Store(skippedBytes)
p.ignoredErrorsCount.Store(ignoredErrors)
p.maybeOutput()
}
func (p *cliRestoreProgress) Flush() {
p.outputThrottle.Reset()
p.output("\n")
}
func (p *cliRestoreProgress) maybeOutput() {
if p.outputThrottle.ShouldOutput(p.svc.getProgress().progressUpdateInterval) {
p.output("")
}
}
func (p *cliRestoreProgress) output(suffix string) {
if !p.svc.getProgress().enableProgress {
return
}
p.outputMutex.Lock()
defer p.outputMutex.Unlock()
restoredCount := p.restoredCount.Load()
enqueuedCount := p.enqueuedCount.Load()
skippedCount := p.skippedCount.Load()
ignoredCount := p.ignoredErrorsCount.Load()
restoredSize := p.restoredTotalFileSize.Load()
enqueuedSize := p.enqueuedTotalFileSize.Load()
skippedSize := p.skippedTotalFileSize.Load()
if restoredSize == 0 {
return
}
var maybeRemaining, maybeSkipped, maybeErrors string
if est, ok := p.eta.Estimate(float64(restoredSize), float64(enqueuedSize)); ok {
maybeRemaining = fmt.Sprintf(" %v (%.1f%%) remaining %v",
units.BytesPerSecondsString(est.SpeedPerSecond),
est.PercentComplete,
est.Remaining)
}
if skippedCount > 0 {
maybeSkipped = fmt.Sprintf(", skipped %v (%v)", skippedCount, units.BytesString(skippedSize))
}
if ignoredCount > 0 {
maybeErrors = fmt.Sprintf(", ignored %v errors", ignoredCount)
}
line := fmt.Sprintf("Processed %v (%v) of %v (%v)%v%v%v.",
restoredCount+skippedCount, units.BytesString(restoredSize),
enqueuedCount, units.BytesString(enqueuedSize),
maybeSkipped, maybeErrors, maybeRemaining,
)
var extraSpaces string
if len(line) < p.lastLineLength {
// add extra spaces to wipe over previous line if it was longer than current
extraSpaces = strings.Repeat(" ", p.lastLineLength-len(line))
}
p.lastLineLength = len(line)
p.out.printStderr("\r%v%v%v", line, extraSpaces, suffix)
}
var _ snapshotfs.UploadProgress = (*cliProgress)(nil)