revamped progress bar by including hashing and uploading part separately

This commit is contained in:
Jarek Kowalski
2018-09-13 21:41:11 -07:00
parent 636837317b
commit 08d62d60f1
7 changed files with 137 additions and 69 deletions

View File

@@ -60,7 +60,7 @@ func repositoryAction(act func(ctx context.Context, rep *repo.Repository) error)
ctx = block.UsingBlockCache(ctx, *enableCaching)
ctx = block.UsingListCache(ctx, *enableListCaching)
ctx = storage.WithUploadProgressCallback(ctx, func(desc string, progress, total int64) {
log.Infof("block upload progress %q: %v/%v", desc, progress, total)
cliProgress.Report("upload '"+desc+"'", progress, total)
})
t0 := time.Now()

123
cli/cli_progress.go Normal file
View File

@@ -0,0 +1,123 @@
package cli
import (
"fmt"
"strings"
"sync"
"time"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/snapshot"
)
type singleProgress struct {
desc string
startTime time.Time
progress int64
total int64
}
func (p *singleProgress) update(progress int64, total int64) {
p.total = total
p.progress = progress
}
func (p *singleProgress) toString(details bool) string {
if p.total == 0 {
return fmt.Sprintf("empty %v", p.desc)
}
dur := time.Since(p.startTime)
extraInfo := ""
if dur > 1*time.Second && details {
extraInfo = " " + units.BitsPerSecondsString(8*float64(p.progress)/time.Since(p.startTime).Seconds())
}
if p.progress == p.total {
return fmt.Sprintf("completed %v %v",
p.desc,
units.BytesStringBase10(p.progress),
)
}
return fmt.Sprintf("processing %v %v of %v (%v%%)%v",
p.desc,
units.BytesStringBase10(p.progress),
units.BytesStringBase10(p.total),
100*p.progress/p.total,
extraInfo,
)
}
type multiProgress struct {
mu sync.Mutex
items []*singleProgress
}
func (mp *multiProgress) findLocked(desc string) (*singleProgress, int) {
for i, p := range mp.items {
if p.desc == desc {
return p, i
}
}
return nil, 0
}
func (mp *multiProgress) Report(desc string, progress int64, total int64) {
mp.mu.Lock()
defer mp.mu.Unlock()
found, foundPos := mp.findLocked(desc)
if found != nil && found.progress == progress && found.total == total {
// do not print redundant progress
return
}
if found == nil {
found = &singleProgress{
desc: desc,
startTime: time.Now(),
}
foundPos = len(mp.items)
mp.items = append(mp.items, found)
}
found.update(progress, total)
var segments []string
for i, p := range mp.items {
segments = append(segments, p.toString(i > 0))
}
if found.progress >= found.total && foundPos == len(segments)-1 {
mp.items = append(mp.items[0:foundPos], mp.items[foundPos+1:]...)
if len(segments) > 0 {
log.Notice(segments[len(segments)-1])
}
} else {
if len(segments) > 0 {
log.Info(segments[len(segments)-1])
}
}
}
func (mp *multiProgress) Progress(path string, numFiles int, dirCompleted, dirTotal int64, stats *snapshot.Stats) {
mp.Report(
fmt.Sprintf("directory '%v' (%v files)", shortenPath(strings.TrimPrefix(path, "./")), numFiles),
dirCompleted,
dirTotal)
}
func (mp *multiProgress) UploadFinished() {
}
func shortenPath(s string) string {
if len(s) < 60 {
return s
}
return s[0:30] + "..." + s[len(s)-27:]
}
var cliProgress = &multiProgress{}

View File

@@ -23,7 +23,7 @@
func runMigrateCommand(ctx context.Context, destRepo *repo.Repository) error {
uploader := upload.NewUploader(destRepo)
uploader.Progress = &uploadProgress{}
uploader.Progress = cliProgress
uploader.IgnoreFileErrors = *migrateIgnoreErrors
onCtrlC(uploader.Cancel)

View File

@@ -55,7 +55,7 @@ func runBackupCommand(ctx context.Context, rep *repo.Repository) error {
u.ParallelUploads = *snapshotCreateParallelUploads
onCtrlC(u.Cancel)
u.Progress = &uploadProgress{}
u.Progress = cliProgress
if len(*snapshotCreateDescription) > maxSnapshotDescriptionLength {
return fmt.Errorf("description too long")

View File

@@ -1,63 +0,0 @@
package cli
import (
"fmt"
"os"
"strings"
"sync"
"time"
"github.com/kopia/kopia/internal/upload"
"github.com/kopia/kopia/snapshot"
pb "gopkg.in/cheggaaa/pb.v1"
)
type uploadProgress struct {
currentDir string
mu sync.Mutex
bar *pb.ProgressBar
}
func (p *uploadProgress) Progress(path string, numFiles int, dirCompleted, dirTotal int64, stats *snapshot.Stats) {
p.mu.Lock()
defer p.mu.Unlock()
if p.currentDir != path || p.bar == nil {
p.currentDir = path
if p.bar != nil {
p.bar.Finish()
p.bar = nil
}
p.bar = pb.New64(dirTotal).Prefix(fmt.Sprintf("%4v files in '%v'", numFiles, shortenPath(strings.TrimPrefix(path, "./"))))
p.bar.Output = os.Stderr
p.bar.SetRefreshRate(time.Second)
p.bar.ShowSpeed = true
p.bar.ShowTimeLeft = true
p.bar.SetUnits(pb.U_BYTES)
p.bar.Start()
}
p.bar.Set64(dirCompleted)
}
func (p *uploadProgress) UploadFinished() {
p.mu.Lock()
defer p.mu.Unlock()
if p.bar != nil {
p.bar.Finish()
p.bar = nil
}
}
func shortenPath(s string) string {
if len(s) < 60 {
return s
}
return s[0:30] + "..." + s[len(s)-27:]
}
var _ upload.Progress = &uploadProgress{}

View File

@@ -95,8 +95,13 @@ func (gcs *gcsStorage) PutBlock(ctx context.Context, b string, data []byte) erro
progressCallback := storage.ProgressCallback(ctx)
if progressCallback != nil {
progressCallback(b, 0, int64(len(data)))
defer progressCallback(b, int64(len(data)), int64(len(data)))
writer.ProgressFunc = func(completed int64) {
progressCallback(b, completed, int64(len(data)))
if completed != int64(len(data)) {
progressCallback(b, completed, int64(len(data)))
}
}
}

View File

@@ -104,6 +104,10 @@ func (s *s3Storage) PutBlock(ctx context.Context, b string, data []byte) error {
}
progressCallback := storage.ProgressCallback(ctx)
if progressCallback != nil {
progressCallback(b, 0, int64(len(data)))
defer progressCallback(b, int64(len(data)), int64(len(data)))
}
n, err := s.cli.PutObject(s.BucketName, s.getObjectNameString(b), throttled, -1, minio.PutObjectOptions{
ContentType: "application/x-kopia",
Progress: newProgressReader(progressCallback, b, int64(len(data))),
@@ -177,7 +181,7 @@ type progressReader struct {
func (r *progressReader) Read(b []byte) (int, error) {
r.completed += int64(len(b))
if r.completed >= r.lastReported+1000000 || r.completed == r.totalLength {
if r.completed >= r.lastReported+1000000 && r.completed < r.totalLength {
r.cb(r.blockID, r.completed, r.totalLength)
r.lastReported = r.completed
}
@@ -190,7 +194,6 @@ func newProgressReader(cb storage.ProgressFunc, blockID string, totalLength int6
}
return &progressReader{cb: cb, blockID: blockID, totalLength: totalLength}
}
func toBandwidth(bytesPerSecond int) iothrottler.Bandwidth {