mirror of
https://github.com/kopia/kopia.git
synced 2026-04-04 14:23:21 -04:00
upload: scan the directory to be uploaded in parallel to estimate the amount of data to be uploaded (#622)
This allows better progress indicator in the CLI and UI. The percentage completed is not displayed until estimate is available. Quick demo: https://asciinema.org/a/O7ktcWSgaGUPfJwhzc65mMWM1
This commit is contained in:
@@ -47,8 +47,8 @@ type cliProgress struct {
|
||||
spinPhase int
|
||||
uploadStartTime time.Time
|
||||
|
||||
previousFileCount int
|
||||
previousTotalSize int64
|
||||
estimatedFileCount int
|
||||
estimatedTotalBytes int64
|
||||
|
||||
// indicates shared instance that does not reset counters at the beginning of upload.
|
||||
shared bool
|
||||
@@ -152,13 +152,29 @@ func (p *cliProgress) output(col *color.Color, msg string) {
|
||||
return
|
||||
}
|
||||
|
||||
if p.previousTotalSize > 0 {
|
||||
percent := (float64(hashedBytes+cachedBytes) * hundredPercent / float64(p.previousTotalSize))
|
||||
if percent > hundredPercent {
|
||||
percent = hundredPercent
|
||||
if p.estimatedTotalBytes > 0 {
|
||||
line += fmt.Sprintf(", estimated %v", units.BytesStringBase10(p.estimatedTotalBytes))
|
||||
|
||||
ratio := float64(hashedBytes+cachedBytes) / float64(p.estimatedTotalBytes)
|
||||
if ratio > 1 {
|
||||
ratio = 1
|
||||
}
|
||||
|
||||
line += fmt.Sprintf(" %.1f%%", percent)
|
||||
timeSoFarSeconds := clock.Since(p.uploadStartTime).Seconds()
|
||||
estimatedTotalTime := time.Second * time.Duration(timeSoFarSeconds/ratio)
|
||||
estimatedEndTime := p.uploadStartTime.Add(estimatedTotalTime)
|
||||
|
||||
remaining := clock.Until(estimatedEndTime)
|
||||
if remaining < 0 {
|
||||
remaining = 0
|
||||
}
|
||||
|
||||
remaining = remaining.Round(time.Second)
|
||||
|
||||
line += fmt.Sprintf(" (%.1f%%)", ratio*hundredPercent)
|
||||
line += fmt.Sprintf(" %v left", remaining)
|
||||
} else {
|
||||
line += ", estimating..."
|
||||
}
|
||||
|
||||
var extraSpaces string
|
||||
@@ -197,20 +213,31 @@ func (p *cliProgress) FinishShared() {
|
||||
p.output(defaultColor, "")
|
||||
}
|
||||
|
||||
func (p *cliProgress) UploadStarted(previousFileCount int, previousTotalSize int64) {
|
||||
func (p *cliProgress) UploadStarted() {
|
||||
if p.shared {
|
||||
// do nothing
|
||||
return
|
||||
}
|
||||
|
||||
*p = cliProgress{
|
||||
uploading: 1,
|
||||
uploadStartTime: clock.Now(),
|
||||
previousFileCount: previousFileCount,
|
||||
previousTotalSize: previousTotalSize,
|
||||
uploading: 1,
|
||||
uploadStartTime: clock.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *cliProgress) EstimatedDataSize(fileCount int, totalBytes int64) {
|
||||
if p.shared {
|
||||
// do nothing
|
||||
return
|
||||
}
|
||||
|
||||
p.outputMutex.Lock()
|
||||
defer p.outputMutex.Unlock()
|
||||
|
||||
p.estimatedFileCount = fileCount
|
||||
p.estimatedTotalBytes = totalBytes
|
||||
}
|
||||
|
||||
func (p *cliProgress) UploadFinished() {
|
||||
// do nothing here, we still want to report the files flushed after the Upload has completed.
|
||||
// instead, Finish() will be called.
|
||||
|
||||
@@ -203,8 +203,10 @@ export class SourcesTable extends Component {
|
||||
const totalBytes = u.hashedBytes + u.cachedBytes;
|
||||
|
||||
totals = sizeDisplayName(totalBytes);
|
||||
if (x.row.original.lastSnapshot) {
|
||||
const percent = Math.round(totalBytes * 1000.0 / x.row.original.lastSnapshot.stats.totalSize) / 10.0;
|
||||
if (u.estimatedBytes) {
|
||||
totals += "/" + sizeDisplayName(u.estimatedBytes);
|
||||
|
||||
const percent = Math.round(totalBytes * 1000.0 / u.estimatedBytes) / 10.0;
|
||||
if (percent <= 100) {
|
||||
totals += " " + percent + "%";
|
||||
}
|
||||
@@ -212,7 +214,7 @@ export class SourcesTable extends Component {
|
||||
}
|
||||
|
||||
return <>
|
||||
<Spinner animation="border" variant="primary" size="sm" title={title} /> Snapshotting {totals}
|
||||
<Spinner animation="border" variant="primary" size="sm" title={title} /> Uploading {totals}
|
||||
|
||||
<Button variant="danger" size="sm" onClick={() => {
|
||||
parent.cancelSnapshot(x.row.original.source);
|
||||
|
||||
@@ -78,6 +78,9 @@ type Uploader struct {
|
||||
|
||||
// for testing only, when set will write to a given channel whenever checkpoint completes
|
||||
checkpointFinished chan struct{}
|
||||
|
||||
// disable snapshot size estimation
|
||||
disableEstimation bool
|
||||
}
|
||||
|
||||
// IsCanceled returns true if the upload is canceled.
|
||||
@@ -694,6 +697,7 @@ func (u *Uploader) processNonDirectories(ctx context.Context, parentCheckpointRe
|
||||
// See if we had this name during either of previous passes.
|
||||
if cachedEntry := u.maybeIgnoreCachedEntry(ctx, findCachedEntry(ctx, entry, prevEntries)); cachedEntry != nil {
|
||||
atomic.AddInt32(&u.stats.CachedFiles, 1)
|
||||
atomic.AddInt64(&u.stats.TotalFileSize, entry.Size())
|
||||
u.Progress.CachedFile(filepath.Join(dirRelativePath, entry.Name()), entry.Size())
|
||||
|
||||
// compute entryResult now, cachedEntry is short-lived
|
||||
@@ -943,20 +947,8 @@ func (u *Uploader) Upload(
|
||||
Source: sourceInfo,
|
||||
}
|
||||
|
||||
maxPreviousTotalFileSize := int64(0)
|
||||
maxPreviousFileCount := 0
|
||||
u.Progress.UploadStarted()
|
||||
|
||||
for _, m := range previousManifests {
|
||||
if s := m.Stats.TotalFileSize; s > maxPreviousTotalFileSize {
|
||||
maxPreviousTotalFileSize = s
|
||||
}
|
||||
|
||||
if s := int(m.Stats.TotalFileCount); s > maxPreviousFileCount {
|
||||
maxPreviousFileCount = s
|
||||
}
|
||||
}
|
||||
|
||||
u.Progress.UploadStarted(maxPreviousFileCount, maxPreviousTotalFileSize)
|
||||
defer u.Progress.UploadFinished()
|
||||
|
||||
u.stats = snapshot.Stats{}
|
||||
@@ -966,6 +958,9 @@ func (u *Uploader) Upload(
|
||||
|
||||
s.StartTime = u.repo.Time()
|
||||
|
||||
scanctx, cancelScan := context.WithCancel(ctx)
|
||||
defer cancelScan()
|
||||
|
||||
switch entry := source.(type) {
|
||||
case fs.Directory:
|
||||
var previousDirs []fs.Directory
|
||||
@@ -979,9 +974,17 @@ func (u *Uploader) Upload(
|
||||
entry = ignorefs.New(entry, policyTree, ignorefs.ReportIgnoredFiles(func(_ string, md fs.Entry) {
|
||||
u.stats.AddExcluded(md)
|
||||
}))
|
||||
|
||||
go func() {
|
||||
ds, _ := u.scanDirectory(scanctx, entry)
|
||||
|
||||
u.Progress.EstimatedDataSize(ds.numFiles, ds.totalFileSize)
|
||||
}()
|
||||
|
||||
s.RootEntry, err = u.uploadDirWithCheckpointing(ctx, entry, policyTree, previousDirs, sourceInfo)
|
||||
|
||||
case fs.File:
|
||||
u.Progress.EstimatedDataSize(1, entry.Size())
|
||||
s.RootEntry, err = u.uploadFileWithCheckpointing(ctx, entry.Name(), entry, policyTree.EffectivePolicy(), sourceInfo)
|
||||
|
||||
default:
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
// UploadProgress is invoked by by uploader to report status of file and directory uploads.
|
||||
type UploadProgress interface {
|
||||
// UploadStarted is emitted once at the start of an upload
|
||||
UploadStarted(previousFileCount int, previousTotalSize int64)
|
||||
UploadStarted()
|
||||
|
||||
// UploadFinished is emitted once at the end of an upload
|
||||
UploadFinished()
|
||||
@@ -36,6 +36,9 @@ type UploadProgress interface {
|
||||
|
||||
// FinishedDirectory is emitted whenever a directory is finished uploading.
|
||||
FinishedDirectory(dirname string)
|
||||
|
||||
// EstimatedDataSize is emitted whenever the size of upload is estimated.
|
||||
EstimatedDataSize(fileCount int, totalBytes int64)
|
||||
}
|
||||
|
||||
// NullUploadProgress is an implementation of UploadProgress that does not produce any output.
|
||||
@@ -43,7 +46,10 @@ type NullUploadProgress struct {
|
||||
}
|
||||
|
||||
// UploadStarted implements UploadProgress.
|
||||
func (p *NullUploadProgress) UploadStarted(previousFileCount int, previousTotalSize int64) {}
|
||||
func (p *NullUploadProgress) UploadStarted() {}
|
||||
|
||||
// EstimatedDataSize implements UploadProgress.
|
||||
func (p *NullUploadProgress) EstimatedDataSize(fileCount int, totalBytes int64) {}
|
||||
|
||||
// UploadFinished implements UploadProgress.
|
||||
func (p *NullUploadProgress) UploadFinished() {}
|
||||
@@ -79,10 +85,13 @@ type UploadCounters struct {
|
||||
TotalCachedBytes int64 `json:"cachedBytes"`
|
||||
TotalHashedBytes int64 `json:"hashedBytes"`
|
||||
|
||||
EstimatedBytes int64 `json:"estimatedBytes"`
|
||||
|
||||
TotalCachedFiles int32 `json:"cachedFiles"`
|
||||
TotalHashedFiles int32 `json:"hashedFiles"`
|
||||
|
||||
TotalIgnoredErrors int32 `json:"ignoredErrors"`
|
||||
EstimatedFiles int32 `json:"estimatedFiles"`
|
||||
|
||||
CurrentDirectory string `json:"directory"`
|
||||
|
||||
@@ -100,11 +109,17 @@ type CountingUploadProgress struct {
|
||||
}
|
||||
|
||||
// UploadStarted implements UploadProgress.
|
||||
func (p *CountingUploadProgress) UploadStarted(previousFileCount int, previousTotalFileSize int64) {
|
||||
func (p *CountingUploadProgress) UploadStarted() {
|
||||
// reset counters to all-zero values.
|
||||
p.counters = UploadCounters{}
|
||||
}
|
||||
|
||||
// EstimatedDataSize implements UploadProgress.
|
||||
func (p *CountingUploadProgress) EstimatedDataSize(numFiles int, numBytes int64) {
|
||||
atomic.StoreInt64(&p.counters.EstimatedBytes, numBytes)
|
||||
atomic.StoreInt32(&p.counters.EstimatedFiles, int32(numFiles))
|
||||
}
|
||||
|
||||
// HashedBytes implements UploadProgress.
|
||||
func (p *CountingUploadProgress) HashedBytes(numBytes int64) {
|
||||
atomic.AddInt64(&p.counters.TotalHashedBytes, numBytes)
|
||||
@@ -149,6 +164,8 @@ func (p *CountingUploadProgress) Snapshot() UploadCounters {
|
||||
TotalHashedFiles: atomic.LoadInt32(&p.counters.TotalHashedFiles),
|
||||
TotalCachedBytes: atomic.LoadInt64(&p.counters.TotalCachedBytes),
|
||||
TotalHashedBytes: atomic.LoadInt64(&p.counters.TotalHashedBytes),
|
||||
EstimatedBytes: atomic.LoadInt64(&p.counters.EstimatedBytes),
|
||||
EstimatedFiles: atomic.LoadInt32(&p.counters.EstimatedFiles),
|
||||
CurrentDirectory: p.counters.CurrentDirectory,
|
||||
LastErrorPath: p.counters.LastErrorPath,
|
||||
LastError: p.counters.LastError,
|
||||
|
||||
51
snapshot/snapshotfs/upload_scan.go
Normal file
51
snapshot/snapshotfs/upload_scan.go
Normal file
@@ -0,0 +1,51 @@
|
||||
package snapshotfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
)
|
||||
|
||||
type scanResults struct {
|
||||
numFiles int
|
||||
totalFileSize int64
|
||||
}
|
||||
|
||||
// scanDirectory computes the number of files and their total size in a given directory recursively descending
|
||||
// into subdirectories. The scan teminates early as soon as the provided context is canceled.
|
||||
func (u *Uploader) scanDirectory(ctx context.Context, dir fs.Directory) (scanResults, error) {
|
||||
var res scanResults
|
||||
|
||||
if u.disableEstimation {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
entries, err := dir.Readdir(ctx)
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
|
||||
for _, e := range entries {
|
||||
if err := ctx.Err(); err != nil {
|
||||
// terminate early if context got canceled
|
||||
return res, err
|
||||
}
|
||||
|
||||
switch e := e.(type) {
|
||||
case fs.Directory:
|
||||
dr, err := u.scanDirectory(ctx, e)
|
||||
res.numFiles += dr.numFiles
|
||||
res.totalFileSize += dr.totalFileSize
|
||||
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
|
||||
case fs.File:
|
||||
res.numFiles++
|
||||
res.totalFileSize += e.Size()
|
||||
}
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
@@ -299,6 +299,7 @@ func TestUploadWithCheckpointing(t *testing.T) {
|
||||
|
||||
// create a channel that will be sent to whenever checkpoint completes.
|
||||
u.checkpointFinished = make(chan struct{})
|
||||
u.disableEstimation = true
|
||||
|
||||
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user