From de840547e6b61579d46945f9d29d832fc0a20d07 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 13 Feb 2021 10:51:11 -0800 Subject: [PATCH] Improved upload reporting (#832) * blob: refactored upload reporting Instead of plumbing this through blob storage context, we are passing and explicit callback that reports uploads as they happen. * htmlui: improved counter presentation * nit: added missing UI route which fixes Reload behavior on the Tasks page --- cli/app.go | 16 +++----- cli/command_server_start.go | 1 + htmlui/src/App.css | 3 +- htmlui/src/SourcesTable.js | 2 +- htmlui/src/TaskDetails.js | 40 ++++++++++++++++++- internal/repotesting/repotesting.go | 6 +-- internal/repotesting/repotesting_test.go | 2 +- internal/server/source_manager.go | 10 +++++ repo/api_server_repository.go | 2 +- repo/blob/b2/b2_storage.go | 6 --- repo/blob/filesystem/filesystem_storage.go | 9 ----- repo/blob/gcs/gcs_storage.go | 14 ------- repo/blob/progress.go | 21 ---------- repo/blob/s3/s3_storage.go | 37 +---------------- repo/blob/sftp/sftp_storage.go | 8 ---- repo/content/content_cache_data.go | 7 +--- repo/content/content_cache_metadata.go | 8 +--- repo/content/content_manager.go | 10 +++++ repo/content/content_manager_lock_free.go | 1 + repo/content/sessions.go | 2 + repo/grpc_repository_client.go | 24 ++++++----- repo/repository.go | 18 +++++---- repo/repository_test.go | 6 +-- snapshot/snapshotfs/upload_progress.go | 22 +++++++--- snapshot/snapshotfs/upload_test.go | 2 +- .../snapshotmaintenance_test.go | 2 +- .../api_server_repository_test.go | 2 +- .../repository_stress_test.go | 2 +- 28 files changed, 128 insertions(+), 155 deletions(-) delete mode 100644 repo/blob/progress.go diff --git a/cli/app.go b/cli/app.go index d6e136cf5..29448828d 100644 --- a/cli/app.go +++ b/cli/app.go @@ -12,7 +12,6 @@ "github.com/kopia/kopia/internal/apiclient" "github.com/kopia/kopia/repo" - "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/logging" "github.com/kopia/kopia/repo/maintenance" @@ -99,7 +98,8 @@ func assertDirectRepository(act func(ctx context.Context, rep repo.DirectReposit func directRepositoryWriteAction(act func(ctx context.Context, rep repo.DirectRepositoryWriter) error) func(ctx *kingpin.ParseContext) error { return maybeRepositoryAction(assertDirectRepository(func(ctx context.Context, rep repo.DirectRepository) error { return repo.DirectWriteSession(ctx, rep, repo.WriteSessionOptions{ - Purpose: "directRepositoryWriteAction", + Purpose: "directRepositoryWriteAction", + OnUpload: progress.UploadedBytes, }, func(dw repo.DirectRepositoryWriter) error { return act(ctx, dw) }) }), repositoryAccessMode{ mustBeConnected: true, @@ -128,7 +128,8 @@ func repositoryReaderAction(act func(ctx context.Context, rep repo.Repository) e func repositoryWriterAction(act func(ctx context.Context, rep repo.RepositoryWriter) error) func(ctx *kingpin.ParseContext) error { return maybeRepositoryAction(func(ctx context.Context, rep repo.Repository) error { return repo.WriteSession(ctx, rep, repo.WriteSessionOptions{ - Purpose: "repositoryWriterAction", + Purpose: "repositoryWriterAction", + OnUpload: progress.UploadedBytes, }, func(w repo.RepositoryWriter) error { return act(ctx, w) }) @@ -141,12 +142,6 @@ func rootContext() context.Context { ctx := context.Background() ctx = content.UsingContentCache(ctx, *enableCaching) ctx = content.UsingListCache(ctx, *enableListCaching) - ctx = blob.WithUploadProgressCallback(ctx, func(desc string, bytesSent, totalBytes int64) { - if bytesSent >= totalBytes { - log(ctx).Debugf("Uploaded %v %v %v", desc, bytesSent, totalBytes) - progress.UploadedBytes(totalBytes) - } - }) return ctx } @@ -213,7 +208,8 @@ func maybeRunMaintenance(ctx context.Context, rep repo.Repository) error { } err := repo.DirectWriteSession(ctx, dr, repo.WriteSessionOptions{ - Purpose: "maybeRunMaintenance", + Purpose: "maybeRunMaintenance", + OnUpload: progress.UploadedBytes, }, func(w repo.DirectRepositoryWriter) error { return snapshotmaintenance.Run(ctx, w, maintenance.ModeAuto, false) }) diff --git a/cli/command_server_start.go b/cli/command_server_start.go index cbe5eea02..76c512f7e 100644 --- a/cli/command_server_start.go +++ b/cli/command_server_start.go @@ -175,6 +175,7 @@ func stripProtocol(addr string) string { func isKnownUIRoute(path string) bool { return strings.HasPrefix(path, "/snapshots") || strings.HasPrefix(path, "/policies") || + strings.HasPrefix(path, "/tasks") || strings.HasPrefix(path, "/repo") } diff --git a/htmlui/src/App.css b/htmlui/src/App.css index fba568f45..f2c88a8a3 100644 --- a/htmlui/src/App.css +++ b/htmlui/src/App.css @@ -130,7 +130,8 @@ div.tab-body { .loglevel-5 { color: red; font-weight: bold; } /* fatal */ .counter-badge { - font-size: 100%; + font-size: 90%; + margin: 2px; } .page-title { diff --git a/htmlui/src/SourcesTable.js b/htmlui/src/SourcesTable.js index 4dc8cd3f1..b598828bd 100644 --- a/htmlui/src/SourcesTable.js +++ b/htmlui/src/SourcesTable.js @@ -214,7 +214,7 @@ export class SourcesTable extends Component { } return <> -  Uploading {totals} +  {totals}   {x.row.original.currentTask && Details} ; diff --git a/htmlui/src/TaskDetails.js b/htmlui/src/TaskDetails.js index 378ec69dc..b3e1d9d0f 100644 --- a/htmlui/src/TaskDetails.js +++ b/htmlui/src/TaskDetails.js @@ -117,7 +117,43 @@ export class TaskDetails extends Component { break; } - return {label}: {formatted} + return {label}: {formatted} + } + + counterLevelToSortOrder(l) { + switch (l) { + case "error": + return 30 + case "notice": + return 10; + case "warning": + return 5; + default: + return 0; + } + } + + sortedBadges(counters) { + let keys = Object.keys(counters); + + // sort keys by their level and the name alphabetically. + keys.sort((a, b) => { + if (counters[a].level !== counters[b].level) { + return this.counterLevelToSortOrder(counters[b].level) - this.counterLevelToSortOrder(counters[a].level); + } + + if (a < b) { + return -1; + } + + if (a > b) { + return 1; + } + + return 0; + }); + + return keys.map(c => (counters[c].value > 0) && this.counterBadge(c, counters[c])); } render() { @@ -163,7 +199,7 @@ export class TaskDetails extends Component { {task.counters && - {Object.keys(task.counters).map(c => (task.counters[c].value > 0) && <>{this.counterBadge(c, task.counters[c])} )} + {this.sortedBadges(task.counters)} }
diff --git a/internal/repotesting/repotesting.go b/internal/repotesting/repotesting.go index ce4da3840..2554f047b 100644 --- a/internal/repotesting/repotesting.go +++ b/internal/repotesting/repotesting.go @@ -89,7 +89,7 @@ func (e *Environment) Setup(t *testing.T, opts ...Options) *Environment { e.Repository = rep - e.RepositoryWriter, err = rep.(repo.DirectRepository).NewDirectWriter(ctx, "test") + e.RepositoryWriter, err = rep.(repo.DirectRepository).NewDirectWriter(ctx, repo.WriteSessionOptions{Purpose: "test"}) if err != nil { t.Fatal(err) } @@ -137,7 +137,7 @@ func (e *Environment) MustReopen(t *testing.T, openOpts ...func(*repo.Options)) t.Fatalf("err: %v", err) } - e.RepositoryWriter, err = rep.(repo.DirectRepository).NewDirectWriter(ctx, "test") + e.RepositoryWriter, err = rep.(repo.DirectRepository).NewDirectWriter(ctx, repo.WriteSessionOptions{Purpose: "test"}) if err != nil { t.Fatalf("err: %v", err) } @@ -158,7 +158,7 @@ func (e *Environment) MustOpenAnother(t *testing.T) repo.RepositoryWriter { rep2.Close(ctx) }) - w, err := rep2.NewWriter(ctx, "test") + w, err := rep2.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "test"}) if err != nil { t.Fatal(err) } diff --git a/internal/repotesting/repotesting_test.go b/internal/repotesting/repotesting_test.go index b18258c67..b6e3a5544 100644 --- a/internal/repotesting/repotesting_test.go +++ b/internal/repotesting/repotesting_test.go @@ -31,7 +31,7 @@ func TestTimeFuncWiring(t *testing.T) { r0 := rep.(repo.DirectRepository) - env.RepositoryWriter, err = r0.NewDirectWriter(ctx, "test") + env.RepositoryWriter, err = r0.NewDirectWriter(ctx, repo.WriteSessionOptions{Purpose: "test"}) if err != nil { t.Fatal(err) } diff --git a/internal/server/source_manager.go b/internal/server/source_manager.go index b02066002..7a0a89049 100644 --- a/internal/server/source_manager.go +++ b/internal/server/source_manager.go @@ -256,8 +256,15 @@ func (s *sourceManager) snapshotInternal(ctx context.Context, ctrl uitask.Contro return errors.Wrap(err, "unable to create local filesystem") } + onUpload := func(int64) {} + return repo.WriteSession(ctx, s.server.rep, repo.WriteSessionOptions{ Purpose: "Source Manager Uploader", + OnUpload: func(numBytes int64) { + // extra indirection to allow changing onUpload function later + // once we have the uploader + onUpload(numBytes) + }, }, func(w repo.RepositoryWriter) error { log(ctx).Debugf("uploading %v", s.src) u := snapshotfs.NewUploader(w) @@ -272,6 +279,9 @@ func (s *sourceManager) snapshotInternal(ctx context.Context, ctrl uitask.Contro // set up progress that will keep counters and report to the uitask. prog := &uitaskProgress{s.progress, ctrl, 0} u.Progress = prog + onUpload = func(numBytes int64) { + u.Progress.UploadedBytes(numBytes) + } log(ctx).Debugf("starting upload of %v", s.src) s.setUploader(u) diff --git a/repo/api_server_repository.go b/repo/api_server_repository.go index bd66692b8..e8aedd4e8 100644 --- a/repo/api_server_repository.go +++ b/repo/api_server_repository.go @@ -131,7 +131,7 @@ func (r *apiServerRepository) Flush(ctx context.Context) error { return errors.Wrap(r.cli.Post(ctx, "flush", nil, nil), "Flush") } -func (r *apiServerRepository) NewWriter(ctx context.Context, purpose string) (RepositoryWriter, error) { +func (r *apiServerRepository) NewWriter(ctx context.Context, opt WriteSessionOptions) (RepositoryWriter, error) { // apiServerRepository is stateless except object manager. r2 := *r w := &r2 diff --git a/repo/blob/b2/b2_storage.go b/repo/blob/b2/b2_storage.go index 6a9addfc1..2835088e3 100644 --- a/repo/blob/b2/b2_storage.go +++ b/repo/blob/b2/b2_storage.go @@ -139,12 +139,6 @@ func translateError(err error) error { } func (s *b2Storage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes) error { - progressCallback := blob.ProgressCallback(ctx) - if progressCallback != nil { - progressCallback(string(id), 0, int64(data.Length())) - defer progressCallback(string(id), int64(data.Length()), int64(data.Length())) - } - throttled, err := s.uploadThrottler.AddReader(ioutil.NopCloser(data.Reader())) if err != nil { return translateError(err) diff --git a/repo/blob/filesystem/filesystem_storage.go b/repo/blob/filesystem/filesystem_storage.go index 53f281a42..194e3e7a8 100644 --- a/repo/blob/filesystem/filesystem_storage.go +++ b/repo/blob/filesystem/filesystem_storage.go @@ -150,15 +150,6 @@ func (fs *fsImpl) PutBlobInPath(ctx context.Context, dirPath, path string, data return errors.Wrap(err, "can't get random bytes") } - progressCallback := blob.ProgressCallback(ctx) - - combinedLength := data.Length() - - if progressCallback != nil { - progressCallback(path, 0, int64(combinedLength)) - defer progressCallback(path, int64(combinedLength), int64(combinedLength)) - } - tempFile := fmt.Sprintf("%s.tmp.%x", path, randSuffix) f, err := fs.createTempFileAndDir(tempFile) diff --git a/repo/blob/gcs/gcs_storage.go b/repo/blob/gcs/gcs_storage.go index 33f6544af..1fd3e7892 100644 --- a/repo/blob/gcs/gcs_storage.go +++ b/repo/blob/gcs/gcs_storage.go @@ -104,20 +104,6 @@ func (gcs *gcsStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes) writer.ChunkSize = writerChunkSize writer.ContentType = "application/x-kopia" - combinedLength := data.Length() - progressCallback := blob.ProgressCallback(ctx) - - if progressCallback != nil { - progressCallback(string(b), 0, int64(combinedLength)) - defer progressCallback(string(b), int64(combinedLength), int64(combinedLength)) - - writer.ProgressFunc = func(completed int64) { - if completed != int64(combinedLength) { - progressCallback(string(b), completed, int64(combinedLength)) - } - } - } - _, err := iocopy.Copy(writer, data.Reader()) if err != nil { // cancel context before closing the writer causes it to abandon the upload. diff --git a/repo/blob/progress.go b/repo/blob/progress.go deleted file mode 100644 index a27f123fe..000000000 --- a/repo/blob/progress.go +++ /dev/null @@ -1,21 +0,0 @@ -package blob - -import "context" - -type contextKey string - -const progressCallbackContextKey contextKey = "progress-callback" - -// ProgressFunc is used to report progress of a long-running storage operation. -type ProgressFunc func(desc string, completed, total int64) - -// WithUploadProgressCallback returns a context that passes callback function to be used storage upload progress. -func WithUploadProgressCallback(ctx context.Context, callback ProgressFunc) context.Context { - return context.WithValue(ctx, progressCallbackContextKey, callback) -} - -// ProgressCallback gets the progress callback function from the context. -func ProgressCallback(ctx context.Context) ProgressFunc { - pf, _ := ctx.Value(progressCallbackContextKey).(ProgressFunc) - return pf -} diff --git a/repo/blob/s3/s3_storage.go b/repo/blob/s3/s3_storage.go index 94a8f2d59..43794489e 100644 --- a/repo/blob/s3/s3_storage.go +++ b/repo/blob/s3/s3_storage.go @@ -115,17 +115,8 @@ func (s *s3Storage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes) err return errors.Wrap(err, "AddReader") } - combinedLength := data.Length() - - progressCallback := blob.ProgressCallback(ctx) - if progressCallback != nil { - progressCallback(string(b), 0, int64(combinedLength)) - defer progressCallback(string(b), int64(combinedLength), int64(combinedLength)) - } - - uploadInfo, err := s.cli.PutObject(ctx, s.BucketName, s.getObjectNameString(b), throttled, int64(combinedLength), minio.PutObjectOptions{ + uploadInfo, err := s.cli.PutObject(ctx, s.BucketName, s.getObjectNameString(b), throttled, int64(data.Length()), minio.PutObjectOptions{ ContentType: "application/x-kopia", - Progress: newProgressReader(progressCallback, string(b), int64(combinedLength)), }) if errors.Is(err, io.EOF) && uploadInfo.Size == 0 { @@ -198,32 +189,6 @@ func (s *s3Storage) DisplayName() string { return fmt.Sprintf("S3: %v %v", s.Endpoint, s.BucketName) } -type progressReader struct { - cb blob.ProgressFunc - blobID string - completed int64 - totalLength int64 - lastReported int64 -} - -func (r *progressReader) Read(b []byte) (int, error) { - r.completed += int64(len(b)) - if r.completed >= r.lastReported+1000000 && r.completed < r.totalLength { - r.cb(r.blobID, r.completed, r.totalLength) - r.lastReported = r.completed - } - - return len(b), nil -} - -func newProgressReader(cb blob.ProgressFunc, blobID string, totalLength int64) io.Reader { - if cb == nil { - return nil - } - - return &progressReader{cb: cb, blobID: blobID, totalLength: totalLength} -} - func toBandwidth(bytesPerSecond int) iothrottler.Bandwidth { if bytesPerSecond <= 0 { return iothrottler.Unlimited diff --git a/repo/blob/sftp/sftp_storage.go b/repo/blob/sftp/sftp_storage.go index 532e76401..ed52944fb 100644 --- a/repo/blob/sftp/sftp_storage.go +++ b/repo/blob/sftp/sftp_storage.go @@ -111,14 +111,6 @@ func (s *sftpImpl) PutBlobInPath(ctx context.Context, dirPath, fullPath string, return errors.Wrap(err, "can't get random bytes") } - progressCallback := blob.ProgressCallback(ctx) - combinedLength := data.Length() - - if progressCallback != nil { - progressCallback(fullPath, 0, int64(combinedLength)) - defer progressCallback(fullPath, int64(combinedLength), int64(combinedLength)) - } - tempFile := fmt.Sprintf("%s.tmp.%x", fullPath, randSuffix) f, err := s.createTempFileAndDir(tempFile) diff --git a/repo/content/content_cache_data.go b/repo/content/content_cache_data.go index c1bb2cbe5..2ab8f0055 100644 --- a/repo/content/content_cache_data.go +++ b/repo/content/content_cache_data.go @@ -59,12 +59,7 @@ func (c *contentCacheForData) getContent(ctx context.Context, cacheKey cacheKey, } if err == nil && useCache { - // do not report cache writes as uploads. - if puterr := c.cacheStorage.PutBlob( - blob.WithUploadProgressCallback(ctx, nil), - blob.ID(cacheKey), - gather.FromSlice(hmac.Append(b, c.hmacSecret)), - ); puterr != nil { + if puterr := c.cacheStorage.PutBlob(ctx, blob.ID(cacheKey), gather.FromSlice(hmac.Append(b, c.hmacSecret))); puterr != nil { stats.Record(ctx, metricContentCacheStoreErrors.M(1)) log(ctx).Warningf("unable to write cache item %v: %v", cacheKey, puterr) } diff --git a/repo/content/content_cache_metadata.go b/repo/content/content_cache_metadata.go index b76b40de5..ea33f62bb 100644 --- a/repo/content/content_cache_metadata.go +++ b/repo/content/content_cache_metadata.go @@ -91,12 +91,8 @@ func (c *contentCacheForMetadata) getContent(ctx context.Context, cacheKey cache } if useCache { - // store the whole blob in the cache, do not report cache writes as uploads. - if puterr := c.cacheStorage.PutBlob( - blob.WithUploadProgressCallback(ctx, nil), - blobID, - gather.FromSlice(blobData), - ); puterr != nil { + // store the whole blob in the cache. + if puterr := c.cacheStorage.PutBlob(ctx, blobID, gather.FromSlice(blobData)); puterr != nil { stats.Record(ctx, metricContentCacheStoreErrors.M(1)) log(ctx).Warningf("unable to write cache item %v: %v", blobID, puterr) } diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 76d101ed7..f463f8ea4 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -90,6 +90,8 @@ type WriteManager struct { disableIndexFlushCount int flushPackIndexesAfter time.Time // time when those indexes should be flushed + onUpload func(int64) + *SharedManager } @@ -352,6 +354,8 @@ func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error { data := b.Bytes() dataCopy := append([]byte(nil), data...) + bm.onUpload(int64(len(data))) + indexBlobMD, err := bm.indexBlobManager.writeIndexBlob(ctx, data, bm.currentSessionInfo.ID) if err != nil { return errors.Wrap(err, "error writing index blob") @@ -781,6 +785,7 @@ func NewManager(ctx context.Context, st blob.Storage, f *FormattingOptions, cach type SessionOptions struct { SessionUser string SessionHost string + OnUpload func(int64) } // NewWriteManager returns a session write manager. @@ -789,6 +794,10 @@ func NewWriteManager(sm *SharedManager, options SessionOptions) *WriteManager { sm.addRef() + if options.OnUpload == nil { + options.OnUpload = func(int64) {} + } + return &WriteManager{ SharedManager: sm, @@ -800,5 +809,6 @@ func NewWriteManager(sm *SharedManager, options SessionOptions) *WriteManager { packIndexBuilder: make(packIndexBuilder), sessionUser: options.SessionUser, sessionHost: options.SessionHost, + onUpload: options.OnUpload, } } diff --git a/repo/content/content_manager_lock_free.go b/repo/content/content_manager_lock_free.go index 4d27f18d8..2e4e70fc8 100644 --- a/repo/content/content_manager_lock_free.go +++ b/repo/content/content_manager_lock_free.go @@ -137,6 +137,7 @@ func getPackedContentIV(output []byte, contentID ID) ([]byte, error) { func (bm *WriteManager) writePackFileNotLocked(ctx context.Context, packFile blob.ID, data gather.Bytes) error { bm.Stats.wroteContent(data.Length()) + bm.onUpload(int64(data.Length())) return bm.st.PutBlob(ctx, packFile, data) } diff --git a/repo/content/sessions.go b/repo/content/sessions.go index a1dd80882..3a4d2d739 100644 --- a/repo/content/sessions.go +++ b/repo/content/sessions.go @@ -110,6 +110,8 @@ func (bm *WriteManager) writeSessionMarkerLocked(ctx context.Context) error { return errors.Wrap(err, "unable to encrypt session marker") } + bm.onUpload(int64(len(encrypted))) + if err := bm.st.PutBlob(ctx, sessionBlobID, gather.FromSlice(encrypted)); err != nil { return errors.Wrapf(err, "unable to write session marker: %v", string(sessionBlobID)) } diff --git a/repo/grpc_repository_client.go b/repo/grpc_repository_client.go index 9e75cd828..e51fa84e2 100644 --- a/repo/grpc_repository_client.go +++ b/repo/grpc_repository_client.go @@ -46,7 +46,7 @@ type grpcRepositoryClient struct { innerSessionMutex sync.Mutex innerSession *grpcInnerSession - purpose string + opt WriteSessionOptions isReadOnly bool transparentRetries bool @@ -383,8 +383,8 @@ func (r *grpcInnerSession) Flush(ctx context.Context) error { return errNoSessionResponse() } -func (r *grpcRepositoryClient) NewWriter(ctx context.Context, purpose string) (RepositoryWriter, error) { - w, err := newGRPCAPIRepositoryForConnection(ctx, r.conn, r.connRefCount, r.cliOpts, purpose, false) +func (r *grpcRepositoryClient) NewWriter(ctx context.Context, opt WriteSessionOptions) (RepositoryWriter, error) { + w, err := newGRPCAPIRepositoryForConnection(ctx, r.conn, r.connRefCount, r.cliOpts, opt, false) if err != nil { return nil, err } @@ -425,7 +425,7 @@ func (r *grpcRepositoryClient) retry(ctx context.Context, attempt sessionAttempt func (r *grpcRepositoryClient) inSessionWithoutRetry(ctx context.Context, attempt sessionAttemptFunc) (interface{}, error) { sess, err := r.getOrEstablishInnerSession(ctx) if err != nil { - return nil, errors.Wrapf(err, "unable to establish session for purpose=%v", r.purpose) + return nil, errors.Wrapf(err, "unable to establish session for purpose=%v", r.opt.Purpose) } return attempt(ctx, sess) @@ -525,6 +525,8 @@ func (r *grpcInnerSession) GetContent(ctx context.Context, contentID content.ID) } func (r *grpcRepositoryClient) WriteContent(ctx context.Context, data []byte, prefix content.ID) (content.ID, error) { + r.opt.OnUpload(int64(len(data))) + v, err := r.inSessionWithoutRetry(ctx, func(ctx context.Context, sess *grpcInnerSession) (interface{}, error) { return sess.WriteContent(ctx, data, prefix) }) @@ -636,7 +638,7 @@ func OpenGRPCAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts Clien return nil, errors.Wrap(err, "dial error") } - rep, err := newGRPCAPIRepositoryForConnection(ctx, conn, new(int32), cliOpts, "", true) + rep, err := newGRPCAPIRepositoryForConnection(ctx, conn, new(int32), cliOpts, WriteSessionOptions{}, true) if err != nil { return nil, err } @@ -651,7 +653,7 @@ func (r *grpcRepositoryClient) getOrEstablishInnerSession(ctx context.Context) ( if r.innerSession == nil { cli := apipb.NewKopiaRepositoryClient(r.conn) - log(ctx).Debugf("establishing new GRPC streaming session (purpose=%v)", r.purpose) + log(ctx).Debugf("establishing new GRPC streaming session (purpose=%v)", r.opt.Purpose) retryPolicy := retry.Always if r.transparentRetries && r.innerSessionAttemptCount == 0 { @@ -676,7 +678,7 @@ func (r *grpcRepositoryClient) getOrEstablishInnerSession(ctx context.Context) ( go newSess.readLoop(ctx) - newSess.repoParams, err = newSess.initializeSession(ctx, r.purpose, r.isReadOnly) + newSess.repoParams, err = newSess.initializeSession(ctx, r.opt.Purpose, r.isReadOnly) if err != nil { return nil, errors.Wrap(err, "unable to initialize session") } @@ -704,13 +706,17 @@ func (r *grpcRepositoryClient) killInnerSession() { } // newGRPCAPIRepositoryForConnection opens GRPC-based repository connection. -func newGRPCAPIRepositoryForConnection(ctx context.Context, conn *grpc.ClientConn, connRefCount *int32, cliOpts ClientOptions, purpose string, transparentRetries bool) (*grpcRepositoryClient, error) { +func newGRPCAPIRepositoryForConnection(ctx context.Context, conn *grpc.ClientConn, connRefCount *int32, cliOpts ClientOptions, opt WriteSessionOptions, transparentRetries bool) (*grpcRepositoryClient, error) { + if opt.OnUpload == nil { + opt.OnUpload = func(i int64) {} + } + rr := &grpcRepositoryClient{ connRefCount: connRefCount, conn: conn, cliOpts: cliOpts, transparentRetries: transparentRetries, - purpose: purpose, + opt: opt, isReadOnly: cliOpts.ReadOnly, } diff --git a/repo/repository.go b/repo/repository.go index bb5f37c4e..29156b63d 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -24,7 +24,7 @@ type Repository interface { Time() time.Time ClientOptions() ClientOptions - NewWriter(ctx context.Context, purpose string) (RepositoryWriter, error) + NewWriter(ctx context.Context, opt WriteSessionOptions) (RepositoryWriter, error) UpdateDescription(d string) @@ -52,7 +52,7 @@ type DirectRepository interface { ContentReader() content.Reader IndexBlobReader() content.IndexBlobReader - NewDirectWriter(ctx context.Context, purpose string) (DirectRepositoryWriter, error) + NewDirectWriter(ctx context.Context, opt WriteSessionOptions) (DirectRepositoryWriter, error) // misc UniqueID() []byte @@ -168,15 +168,16 @@ func (r *directRepository) UpdateDescription(d string) { } // NewWriter returns new RepositoryWriter session for repository. -func (r *directRepository) NewWriter(ctx context.Context, purpose string) (RepositoryWriter, error) { - return r.NewDirectWriter(ctx, purpose) +func (r *directRepository) NewWriter(ctx context.Context, opt WriteSessionOptions) (RepositoryWriter, error) { + return r.NewDirectWriter(ctx, opt) } // NewDirectWriter returns new DirectRepositoryWriter session for repository. -func (r *directRepository) NewDirectWriter(ctx context.Context, purpose string) (DirectRepositoryWriter, error) { +func (r *directRepository) NewDirectWriter(ctx context.Context, opt WriteSessionOptions) (DirectRepositoryWriter, error) { cmgr := content.NewWriteManager(r.sm, content.SessionOptions{ SessionUser: r.cliOpts.Username, SessionHost: r.cliOpts.Hostname, + OnUpload: opt.OnUpload, }) mmgr, err := manifest.NewManager(ctx, cmgr, manifest.ManagerOptions{ @@ -308,12 +309,13 @@ func (r *directRepository) Time() time.Time { // WriteSessionOptions describes options for a write session. type WriteSessionOptions struct { Purpose string - FlushOnFailure bool // whether to flush regardless of write sessionr result. + FlushOnFailure bool // whether to flush regardless of write sessionr result. + OnUpload func(int64) // invoke the provided function on each upload in the session } // WriteSession executes the provided callback in a repository writer created for the purpose and flushes writes. func WriteSession(ctx context.Context, r Repository, opt WriteSessionOptions, cb func(w RepositoryWriter) error) error { - w, err := r.NewWriter(ctx, opt.Purpose) + w, err := r.NewWriter(ctx, opt) if err != nil { return errors.Wrap(err, "unable to create writer") } @@ -323,7 +325,7 @@ func WriteSession(ctx context.Context, r Repository, opt WriteSessionOptions, cb // DirectWriteSession executes the provided callback in a DirectRepositoryWriter created for the purpose and flushes writes. func DirectWriteSession(ctx context.Context, r DirectRepository, opt WriteSessionOptions, cb func(dw DirectRepositoryWriter) error) error { - w, err := r.NewDirectWriter(ctx, opt.Purpose) + w, err := r.NewDirectWriter(ctx, opt) if err != nil { return errors.Wrap(err, "unable to create direct writer") } diff --git a/repo/repository_test.go b/repo/repository_test.go index 427e7f0a1..afb6f64e0 100644 --- a/repo/repository_test.go +++ b/repo/repository_test.go @@ -336,17 +336,17 @@ func TestWriterScope(t *testing.T) { lw := rep.(repo.RepositoryWriter) // w1, w2, w3 are indepdendent sessions. - w1, err := rep.NewWriter(ctx, "writer1") + w1, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "writer1"}) must(t, err) defer w1.Close(ctx) - w2, err := rep.NewWriter(ctx, "writer2") + w2, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "writer2"}) must(t, err) defer w2.Close(ctx) - w3, err := rep.NewWriter(ctx, "writer3") + w3, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "writer3"}) must(t, err) defer w3.Close(ctx) diff --git a/snapshot/snapshotfs/upload_progress.go b/snapshot/snapshotfs/upload_progress.go index 5a4731dd7..a4312e373 100644 --- a/snapshot/snapshotfs/upload_progress.go +++ b/snapshot/snapshotfs/upload_progress.go @@ -206,13 +206,23 @@ func (p *CountingUploadProgress) Snapshot() UploadCounters { // UITaskCounters returns UI task counters. func (p *CountingUploadProgress) UITaskCounters(final bool) map[string]uitask.CounterValue { - m := map[string]uitask.CounterValue{ - "Cached Files": uitask.SimpleCounter(int64(atomic.LoadInt32(&p.counters.TotalCachedFiles))), - "Hashed Files": uitask.SimpleCounter(int64(atomic.LoadInt32(&p.counters.TotalHashedFiles))), + cachedFiles := int64(atomic.LoadInt32(&p.counters.TotalCachedFiles)) + hashedFiles := int64(atomic.LoadInt32(&p.counters.TotalHashedFiles)) - "Cached Bytes": uitask.BytesCounter(atomic.LoadInt64(&p.counters.TotalCachedBytes)), - "Hashed Bytes": uitask.BytesCounter(atomic.LoadInt64(&p.counters.TotalHashedBytes)), - "Uploaded Bytes": uitask.BytesCounter(atomic.LoadInt64(&p.counters.TotalUploadedBytes)), + cachedBytes := atomic.LoadInt64(&p.counters.TotalCachedBytes) + hashedBytes := atomic.LoadInt64(&p.counters.TotalHashedBytes) + + m := map[string]uitask.CounterValue{ + "Cached Files": uitask.SimpleCounter(cachedFiles), + "Hashed Files": uitask.SimpleCounter(hashedFiles), + "Processed Files": uitask.NoticeBytesCounter(hashedFiles + cachedFiles), + + "Cached Bytes": uitask.BytesCounter(cachedBytes), + "Hashed Bytes": uitask.BytesCounter(hashedBytes), + "Processed Bytes": uitask.NoticeBytesCounter(hashedBytes + cachedBytes), + + // bytes actually ploaded to the server (non-deduplicated) + "Uploaded Bytes": uitask.NoticeBytesCounter(atomic.LoadInt64(&p.counters.TotalUploadedBytes)), "Excluded Files": uitask.SimpleCounter(int64(atomic.LoadInt32(&p.counters.TotalExcludedFiles))), "Excluded Directories": uitask.SimpleCounter(int64(atomic.LoadInt32(&p.counters.TotalExcludedDirs))), diff --git a/snapshot/snapshotfs/upload_test.go b/snapshot/snapshotfs/upload_test.go index a20a5b3ec..8b370c0dd 100644 --- a/snapshot/snapshotfs/upload_test.go +++ b/snapshot/snapshotfs/upload_test.go @@ -94,7 +94,7 @@ func newUploadTestHarness(ctx context.Context, t *testing.T) *uploadTestHarness sourceDir.AddFile("d2/d1/f1", []byte{1, 2, 3}, defaultPermissions) sourceDir.AddFile("d2/d1/f2", []byte{1, 2, 3, 4}, defaultPermissions) - w, err := rep.NewWriter(ctx, "test") + w, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "test"}) if err != nil { panic("writer creation error: " + err.Error()) } diff --git a/snapshot/snapshotmaintenance/snapshotmaintenance_test.go b/snapshot/snapshotmaintenance/snapshotmaintenance_test.go index ecbd505b4..e0a8c9609 100644 --- a/snapshot/snapshotmaintenance/snapshotmaintenance_test.go +++ b/snapshot/snapshotmaintenance/snapshotmaintenance_test.go @@ -195,7 +195,7 @@ func (th *testHarness) openAnother(t *testing.T) repo.RepositoryWriter { r.Close(ctx) }) - w, err := r.NewWriter(ctx, "test") + w, err := r.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "test"}) if err != nil { t.Fatal(err) } diff --git a/tests/end_to_end_test/api_server_repository_test.go b/tests/end_to_end_test/api_server_repository_test.go index 79d53dd19..ddf96132b 100644 --- a/tests/end_to_end_test/api_server_repository_test.go +++ b/tests/end_to_end_test/api_server_repository_test.go @@ -116,7 +116,7 @@ func testAPIServerRepository(t *testing.T, serverStartArgs []string, useGRPC, al // open new write session in repository client - writeSess, err := rep.NewWriter(ctx, "some writer") + writeSess, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "some writer"}) if err != nil { t.Fatal(err) } diff --git a/tests/repository_stress_test/repository_stress_test.go b/tests/repository_stress_test/repository_stress_test.go index fa580c27c..20929ee6a 100644 --- a/tests/repository_stress_test/repository_stress_test.go +++ b/tests/repository_stress_test/repository_stress_test.go @@ -141,7 +141,7 @@ func longLivedRepositoryTest(ctx context.Context, t *testing.T, cancel chan stru } defer rep.Close(ctx) - w, err := rep.(repo.DirectRepository).NewDirectWriter(ctx, "longLivedRepositoryTest") + w, err := rep.(repo.DirectRepository).NewDirectWriter(ctx, repo.WriteSessionOptions{Purpose: "longLivedRepositoryTest"}) if err != nil { t.Errorf("error opening writer: %v", err) return