diff --git a/snapshot/snapshotfs/upload.go b/snapshot/snapshotfs/upload.go index 325f62e78..684a6a1f9 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/snapshotfs/upload.go @@ -588,6 +588,9 @@ func uploadDirInternal( ) (object.ID, fs.DirectorySummary, error) { u.stats.TotalDirectoryCount++ + u.Progress.StartedDirectory(dirRelativePath) + defer u.Progress.FinishedDirectory(dirRelativePath) + var summ fs.DirectorySummary summ.TotalDirCount = 1 diff --git a/snapshot/snapshotfs/upload_progress.go b/snapshot/snapshotfs/upload_progress.go index 0a9efd491..0c8372578 100644 --- a/snapshot/snapshotfs/upload_progress.go +++ b/snapshot/snapshotfs/upload_progress.go @@ -1,5 +1,10 @@ package snapshotfs +import ( + "sync" + "sync/atomic" +) + // UploadProgress is invoked by by uploader to report status of file and directory uploads. type UploadProgress interface { // UploadStarted is emitted once at the start of an upload @@ -22,6 +27,12 @@ type UploadProgress interface { // UploadedBytes is emitted whenever bytes are written to the blob storage. UploadedBytes(numBytes int64) + + // StartedDirectory is emitted whenever a directory starts being uploaded. + StartedDirectory(dirname string) + + // FinishedDirectory is emitted whenever a directory is finished uploading. + FinishedDirectory(dirname string) } // NullUploadProgress is an implementation of UploadProgress that does not produce any output. @@ -49,4 +60,76 @@ func (p *NullUploadProgress) HashingFile(fname string) {} // FinishedHashingFile implements UploadProgress func (p *NullUploadProgress) FinishedHashingFile(fname string, numBytes int64) {} +// StartedDirectory implements UploadProgress +func (p *NullUploadProgress) StartedDirectory(dirname string) {} + +// FinishedDirectory implements UploadProgress +func (p *NullUploadProgress) FinishedDirectory(dirname string) {} + var _ UploadProgress = (*NullUploadProgress)(nil) + +// UploadCounters represents a snapshot of upload counters. +type UploadCounters struct { + TotalCachedBytes int64 + TotalHashedBytes int64 + + TotalCachedFiles int32 + TotalHashedFiles int32 + + CurrentDirectory string +} + +// CountingUploadProgress is an implementation of UploadProgress that accumulates counters. +type CountingUploadProgress struct { + NullUploadProgress + + mu sync.Mutex + + counters UploadCounters +} + +// UploadStarted implements UploadProgress +func (p *CountingUploadProgress) UploadStarted() { + // reset counters to all-zero values. + p.counters = UploadCounters{} +} + +// HashedBytes implements UploadProgress +func (p *CountingUploadProgress) HashedBytes(numBytes int64) { + atomic.AddInt64(&p.counters.TotalHashedBytes, numBytes) +} + +// CachedFile implements UploadProgress +func (p *CountingUploadProgress) CachedFile(fname string, numBytes int64) { + atomic.AddInt32(&p.counters.TotalCachedFiles, 1) + atomic.AddInt64(&p.counters.TotalCachedBytes, numBytes) +} + +// FinishedHashingFile implements UploadProgress +func (p *CountingUploadProgress) FinishedHashingFile(fname string, numBytes int64) { + atomic.AddInt32(&p.counters.TotalHashedFiles, 1) +} + +// StartedDirectory implements UploadProgress +func (p *CountingUploadProgress) StartedDirectory(dirname string) { + p.mu.Lock() + defer p.mu.Unlock() + + p.counters.CurrentDirectory = dirname +} + +// Snapshot captures current snapshot of the upload. +func (p *CountingUploadProgress) Snapshot() UploadCounters { + p.mu.Lock() + defer p.mu.Unlock() + + return UploadCounters{ + TotalCachedFiles: atomic.LoadInt32(&p.counters.TotalCachedFiles), + TotalHashedFiles: atomic.LoadInt32(&p.counters.TotalHashedFiles), + TotalCachedBytes: atomic.LoadInt64(&p.counters.TotalCachedBytes), + TotalHashedBytes: atomic.LoadInt64(&p.counters.TotalHashedBytes), + CurrentDirectory: p.counters.CurrentDirectory, + } +} + +var _ UploadProgress = (*CountingUploadProgress)(nil)