From 08d62d60f1dfd82cb3cac7dc6cf1b48c6bda6dc4 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Thu, 13 Sep 2018 21:41:11 -0700 Subject: [PATCH] revamped progress bar by including hashing and uploading part separately --- cli/app.go | 2 +- cli/cli_progress.go | 123 ++++++++++++++++++++++++++++++ cli/command_repository_migrate.go | 2 +- cli/command_snapshot_create.go | 2 +- cli/progress.go | 63 --------------- repo/storage/gcs/gcs_storage.go | 7 +- repo/storage/s3/s3_storage.go | 7 +- 7 files changed, 137 insertions(+), 69 deletions(-) create mode 100644 cli/cli_progress.go delete mode 100644 cli/progress.go diff --git a/cli/app.go b/cli/app.go index c71a9d592..e41363793 100644 --- a/cli/app.go +++ b/cli/app.go @@ -60,7 +60,7 @@ func repositoryAction(act func(ctx context.Context, rep *repo.Repository) error) ctx = block.UsingBlockCache(ctx, *enableCaching) ctx = block.UsingListCache(ctx, *enableListCaching) ctx = storage.WithUploadProgressCallback(ctx, func(desc string, progress, total int64) { - log.Infof("block upload progress %q: %v/%v", desc, progress, total) + cliProgress.Report("upload '"+desc+"'", progress, total) }) t0 := time.Now() diff --git a/cli/cli_progress.go b/cli/cli_progress.go new file mode 100644 index 000000000..183be54ed --- /dev/null +++ b/cli/cli_progress.go @@ -0,0 +1,123 @@ +package cli + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/kopia/kopia/internal/units" + "github.com/kopia/kopia/snapshot" +) + +type singleProgress struct { + desc string + startTime time.Time + progress int64 + total int64 +} + +func (p *singleProgress) update(progress int64, total int64) { + p.total = total + p.progress = progress +} + +func (p *singleProgress) toString(details bool) string { + if p.total == 0 { + return fmt.Sprintf("empty %v", p.desc) + } + + dur := time.Since(p.startTime) + extraInfo := "" + if dur > 1*time.Second && details { + extraInfo = " " + units.BitsPerSecondsString(8*float64(p.progress)/time.Since(p.startTime).Seconds()) + } + + if p.progress == p.total { + return fmt.Sprintf("completed %v %v", + p.desc, + units.BytesStringBase10(p.progress), + ) + } + + return fmt.Sprintf("processing %v %v of %v (%v%%)%v", + p.desc, + units.BytesStringBase10(p.progress), + units.BytesStringBase10(p.total), + 100*p.progress/p.total, + extraInfo, + ) +} + +type multiProgress struct { + mu sync.Mutex + items []*singleProgress +} + +func (mp *multiProgress) findLocked(desc string) (*singleProgress, int) { + for i, p := range mp.items { + if p.desc == desc { + return p, i + } + } + + return nil, 0 +} + +func (mp *multiProgress) Report(desc string, progress int64, total int64) { + mp.mu.Lock() + defer mp.mu.Unlock() + + found, foundPos := mp.findLocked(desc) + + if found != nil && found.progress == progress && found.total == total { + // do not print redundant progress + return + } + + if found == nil { + found = &singleProgress{ + desc: desc, + startTime: time.Now(), + } + foundPos = len(mp.items) + mp.items = append(mp.items, found) + } + + found.update(progress, total) + + var segments []string + for i, p := range mp.items { + segments = append(segments, p.toString(i > 0)) + } + if found.progress >= found.total && foundPos == len(segments)-1 { + mp.items = append(mp.items[0:foundPos], mp.items[foundPos+1:]...) + if len(segments) > 0 { + log.Notice(segments[len(segments)-1]) + } + } else { + if len(segments) > 0 { + log.Info(segments[len(segments)-1]) + } + } +} + +func (mp *multiProgress) Progress(path string, numFiles int, dirCompleted, dirTotal int64, stats *snapshot.Stats) { + mp.Report( + fmt.Sprintf("directory '%v' (%v files)", shortenPath(strings.TrimPrefix(path, "./")), numFiles), + dirCompleted, + dirTotal) +} + +func (mp *multiProgress) UploadFinished() { +} + +func shortenPath(s string) string { + if len(s) < 60 { + return s + } + + return s[0:30] + "..." + s[len(s)-27:] +} + +var cliProgress = &multiProgress{} diff --git a/cli/command_repository_migrate.go b/cli/command_repository_migrate.go index 4b3341927..b5aa5c225 100644 --- a/cli/command_repository_migrate.go +++ b/cli/command_repository_migrate.go @@ -23,7 +23,7 @@ func runMigrateCommand(ctx context.Context, destRepo *repo.Repository) error { uploader := upload.NewUploader(destRepo) - uploader.Progress = &uploadProgress{} + uploader.Progress = cliProgress uploader.IgnoreFileErrors = *migrateIgnoreErrors onCtrlC(uploader.Cancel) diff --git a/cli/command_snapshot_create.go b/cli/command_snapshot_create.go index 886fda8d6..a8daf3314 100644 --- a/cli/command_snapshot_create.go +++ b/cli/command_snapshot_create.go @@ -55,7 +55,7 @@ func runBackupCommand(ctx context.Context, rep *repo.Repository) error { u.ParallelUploads = *snapshotCreateParallelUploads onCtrlC(u.Cancel) - u.Progress = &uploadProgress{} + u.Progress = cliProgress if len(*snapshotCreateDescription) > maxSnapshotDescriptionLength { return fmt.Errorf("description too long") diff --git a/cli/progress.go b/cli/progress.go deleted file mode 100644 index a05992c55..000000000 --- a/cli/progress.go +++ /dev/null @@ -1,63 +0,0 @@ -package cli - -import ( - "fmt" - "os" - "strings" - "sync" - "time" - - "github.com/kopia/kopia/internal/upload" - "github.com/kopia/kopia/snapshot" - - pb "gopkg.in/cheggaaa/pb.v1" -) - -type uploadProgress struct { - currentDir string - mu sync.Mutex - bar *pb.ProgressBar -} - -func (p *uploadProgress) Progress(path string, numFiles int, dirCompleted, dirTotal int64, stats *snapshot.Stats) { - p.mu.Lock() - defer p.mu.Unlock() - - if p.currentDir != path || p.bar == nil { - p.currentDir = path - if p.bar != nil { - p.bar.Finish() - p.bar = nil - } - - p.bar = pb.New64(dirTotal).Prefix(fmt.Sprintf("%4v files in '%v'", numFiles, shortenPath(strings.TrimPrefix(path, "./")))) - p.bar.Output = os.Stderr - p.bar.SetRefreshRate(time.Second) - p.bar.ShowSpeed = true - p.bar.ShowTimeLeft = true - p.bar.SetUnits(pb.U_BYTES) - p.bar.Start() - } - - p.bar.Set64(dirCompleted) -} - -func (p *uploadProgress) UploadFinished() { - p.mu.Lock() - defer p.mu.Unlock() - - if p.bar != nil { - p.bar.Finish() - p.bar = nil - } -} - -func shortenPath(s string) string { - if len(s) < 60 { - return s - } - - return s[0:30] + "..." + s[len(s)-27:] -} - -var _ upload.Progress = &uploadProgress{} diff --git a/repo/storage/gcs/gcs_storage.go b/repo/storage/gcs/gcs_storage.go index 0164a9894..d7cfafb81 100644 --- a/repo/storage/gcs/gcs_storage.go +++ b/repo/storage/gcs/gcs_storage.go @@ -95,8 +95,13 @@ func (gcs *gcsStorage) PutBlock(ctx context.Context, b string, data []byte) erro progressCallback := storage.ProgressCallback(ctx) if progressCallback != nil { + progressCallback(b, 0, int64(len(data))) + defer progressCallback(b, int64(len(data)), int64(len(data))) + writer.ProgressFunc = func(completed int64) { - progressCallback(b, completed, int64(len(data))) + if completed != int64(len(data)) { + progressCallback(b, completed, int64(len(data))) + } } } diff --git a/repo/storage/s3/s3_storage.go b/repo/storage/s3/s3_storage.go index 974190f86..f73c6637b 100644 --- a/repo/storage/s3/s3_storage.go +++ b/repo/storage/s3/s3_storage.go @@ -104,6 +104,10 @@ func (s *s3Storage) PutBlock(ctx context.Context, b string, data []byte) error { } progressCallback := storage.ProgressCallback(ctx) + if progressCallback != nil { + progressCallback(b, 0, int64(len(data))) + defer progressCallback(b, int64(len(data)), int64(len(data))) + } n, err := s.cli.PutObject(s.BucketName, s.getObjectNameString(b), throttled, -1, minio.PutObjectOptions{ ContentType: "application/x-kopia", Progress: newProgressReader(progressCallback, b, int64(len(data))), @@ -177,7 +181,7 @@ type progressReader struct { func (r *progressReader) Read(b []byte) (int, error) { r.completed += int64(len(b)) - if r.completed >= r.lastReported+1000000 || r.completed == r.totalLength { + if r.completed >= r.lastReported+1000000 && r.completed < r.totalLength { r.cb(r.blockID, r.completed, r.totalLength) r.lastReported = r.completed } @@ -190,7 +194,6 @@ func newProgressReader(cb storage.ProgressFunc, blockID string, totalLength int6 } return &progressReader{cb: cb, blockID: blockID, totalLength: totalLength} - } func toBandwidth(bytesPerSecond int) iothrottler.Bandwidth {