mirror of
https://github.com/kopia/kopia.git
synced 2026-05-24 22:54:55 -04:00
Improved upload reporting (#832)
* blob: refactored upload reporting Instead of plumbing this through blob storage context, we are passing and explicit callback that reports uploads as they happen. * htmlui: improved counter presentation * nit: added missing UI route which fixes Reload behavior on the Tasks page
This commit is contained in:
16
cli/app.go
16
cli/app.go
@@ -12,7 +12,6 @@
|
||||
|
||||
"github.com/kopia/kopia/internal/apiclient"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
"github.com/kopia/kopia/repo/logging"
|
||||
"github.com/kopia/kopia/repo/maintenance"
|
||||
@@ -99,7 +98,8 @@ func assertDirectRepository(act func(ctx context.Context, rep repo.DirectReposit
|
||||
func directRepositoryWriteAction(act func(ctx context.Context, rep repo.DirectRepositoryWriter) error) func(ctx *kingpin.ParseContext) error {
|
||||
return maybeRepositoryAction(assertDirectRepository(func(ctx context.Context, rep repo.DirectRepository) error {
|
||||
return repo.DirectWriteSession(ctx, rep, repo.WriteSessionOptions{
|
||||
Purpose: "directRepositoryWriteAction",
|
||||
Purpose: "directRepositoryWriteAction",
|
||||
OnUpload: progress.UploadedBytes,
|
||||
}, func(dw repo.DirectRepositoryWriter) error { return act(ctx, dw) })
|
||||
}), repositoryAccessMode{
|
||||
mustBeConnected: true,
|
||||
@@ -128,7 +128,8 @@ func repositoryReaderAction(act func(ctx context.Context, rep repo.Repository) e
|
||||
func repositoryWriterAction(act func(ctx context.Context, rep repo.RepositoryWriter) error) func(ctx *kingpin.ParseContext) error {
|
||||
return maybeRepositoryAction(func(ctx context.Context, rep repo.Repository) error {
|
||||
return repo.WriteSession(ctx, rep, repo.WriteSessionOptions{
|
||||
Purpose: "repositoryWriterAction",
|
||||
Purpose: "repositoryWriterAction",
|
||||
OnUpload: progress.UploadedBytes,
|
||||
}, func(w repo.RepositoryWriter) error {
|
||||
return act(ctx, w)
|
||||
})
|
||||
@@ -141,12 +142,6 @@ func rootContext() context.Context {
|
||||
ctx := context.Background()
|
||||
ctx = content.UsingContentCache(ctx, *enableCaching)
|
||||
ctx = content.UsingListCache(ctx, *enableListCaching)
|
||||
ctx = blob.WithUploadProgressCallback(ctx, func(desc string, bytesSent, totalBytes int64) {
|
||||
if bytesSent >= totalBytes {
|
||||
log(ctx).Debugf("Uploaded %v %v %v", desc, bytesSent, totalBytes)
|
||||
progress.UploadedBytes(totalBytes)
|
||||
}
|
||||
})
|
||||
|
||||
return ctx
|
||||
}
|
||||
@@ -213,7 +208,8 @@ func maybeRunMaintenance(ctx context.Context, rep repo.Repository) error {
|
||||
}
|
||||
|
||||
err := repo.DirectWriteSession(ctx, dr, repo.WriteSessionOptions{
|
||||
Purpose: "maybeRunMaintenance",
|
||||
Purpose: "maybeRunMaintenance",
|
||||
OnUpload: progress.UploadedBytes,
|
||||
}, func(w repo.DirectRepositoryWriter) error {
|
||||
return snapshotmaintenance.Run(ctx, w, maintenance.ModeAuto, false)
|
||||
})
|
||||
|
||||
@@ -175,6 +175,7 @@ func stripProtocol(addr string) string {
|
||||
func isKnownUIRoute(path string) bool {
|
||||
return strings.HasPrefix(path, "/snapshots") ||
|
||||
strings.HasPrefix(path, "/policies") ||
|
||||
strings.HasPrefix(path, "/tasks") ||
|
||||
strings.HasPrefix(path, "/repo")
|
||||
}
|
||||
|
||||
|
||||
@@ -130,7 +130,8 @@ div.tab-body {
|
||||
.loglevel-5 { color: red; font-weight: bold; } /* fatal */
|
||||
|
||||
.counter-badge {
|
||||
font-size: 100%;
|
||||
font-size: 90%;
|
||||
margin: 2px;
|
||||
}
|
||||
|
||||
.page-title {
|
||||
|
||||
@@ -214,7 +214,7 @@ export class SourcesTable extends Component {
|
||||
}
|
||||
|
||||
return <>
|
||||
<Spinner animation="border" variant="primary" size="sm" title={title} /> Uploading {totals}
|
||||
<Spinner animation="border" variant="primary" size="sm" title={title} /> {totals}
|
||||
|
||||
{x.row.original.currentTask && <Link to={"/tasks/"+x.row.original.currentTask}>Details</Link>}
|
||||
</>;
|
||||
|
||||
@@ -117,7 +117,43 @@ export class TaskDetails extends Component {
|
||||
break;
|
||||
}
|
||||
|
||||
return <Badge className="counter-badge" variant={variant}>{label}: {formatted}</Badge>
|
||||
return <Badge key={label} className="counter-badge" variant={variant}>{label}: {formatted}</Badge>
|
||||
}
|
||||
|
||||
counterLevelToSortOrder(l) {
|
||||
switch (l) {
|
||||
case "error":
|
||||
return 30
|
||||
case "notice":
|
||||
return 10;
|
||||
case "warning":
|
||||
return 5;
|
||||
default:
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
sortedBadges(counters) {
|
||||
let keys = Object.keys(counters);
|
||||
|
||||
// sort keys by their level and the name alphabetically.
|
||||
keys.sort((a, b) => {
|
||||
if (counters[a].level !== counters[b].level) {
|
||||
return this.counterLevelToSortOrder(counters[b].level) - this.counterLevelToSortOrder(counters[a].level);
|
||||
}
|
||||
|
||||
if (a < b) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (a > b) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
});
|
||||
|
||||
return keys.map(c => (counters[c].value > 0) && this.counterBadge(c, counters[c]));
|
||||
}
|
||||
|
||||
render() {
|
||||
@@ -163,7 +199,7 @@ export class TaskDetails extends Component {
|
||||
</Form.Row>
|
||||
{task.counters && <Form.Row>
|
||||
<Col>
|
||||
{Object.keys(task.counters).map(c => (task.counters[c].value > 0) && <>{this.counterBadge(c, task.counters[c])} </>)}
|
||||
{this.sortedBadges(task.counters)}
|
||||
</Col>
|
||||
</Form.Row>}
|
||||
<hr/>
|
||||
|
||||
@@ -89,7 +89,7 @@ func (e *Environment) Setup(t *testing.T, opts ...Options) *Environment {
|
||||
|
||||
e.Repository = rep
|
||||
|
||||
e.RepositoryWriter, err = rep.(repo.DirectRepository).NewDirectWriter(ctx, "test")
|
||||
e.RepositoryWriter, err = rep.(repo.DirectRepository).NewDirectWriter(ctx, repo.WriteSessionOptions{Purpose: "test"})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -137,7 +137,7 @@ func (e *Environment) MustReopen(t *testing.T, openOpts ...func(*repo.Options))
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
e.RepositoryWriter, err = rep.(repo.DirectRepository).NewDirectWriter(ctx, "test")
|
||||
e.RepositoryWriter, err = rep.(repo.DirectRepository).NewDirectWriter(ctx, repo.WriteSessionOptions{Purpose: "test"})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -158,7 +158,7 @@ func (e *Environment) MustOpenAnother(t *testing.T) repo.RepositoryWriter {
|
||||
rep2.Close(ctx)
|
||||
})
|
||||
|
||||
w, err := rep2.NewWriter(ctx, "test")
|
||||
w, err := rep2.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "test"})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ func TestTimeFuncWiring(t *testing.T) {
|
||||
|
||||
r0 := rep.(repo.DirectRepository)
|
||||
|
||||
env.RepositoryWriter, err = r0.NewDirectWriter(ctx, "test")
|
||||
env.RepositoryWriter, err = r0.NewDirectWriter(ctx, repo.WriteSessionOptions{Purpose: "test"})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -256,8 +256,15 @@ func (s *sourceManager) snapshotInternal(ctx context.Context, ctrl uitask.Contro
|
||||
return errors.Wrap(err, "unable to create local filesystem")
|
||||
}
|
||||
|
||||
onUpload := func(int64) {}
|
||||
|
||||
return repo.WriteSession(ctx, s.server.rep, repo.WriteSessionOptions{
|
||||
Purpose: "Source Manager Uploader",
|
||||
OnUpload: func(numBytes int64) {
|
||||
// extra indirection to allow changing onUpload function later
|
||||
// once we have the uploader
|
||||
onUpload(numBytes)
|
||||
},
|
||||
}, func(w repo.RepositoryWriter) error {
|
||||
log(ctx).Debugf("uploading %v", s.src)
|
||||
u := snapshotfs.NewUploader(w)
|
||||
@@ -272,6 +279,9 @@ func (s *sourceManager) snapshotInternal(ctx context.Context, ctrl uitask.Contro
|
||||
// set up progress that will keep counters and report to the uitask.
|
||||
prog := &uitaskProgress{s.progress, ctrl, 0}
|
||||
u.Progress = prog
|
||||
onUpload = func(numBytes int64) {
|
||||
u.Progress.UploadedBytes(numBytes)
|
||||
}
|
||||
|
||||
log(ctx).Debugf("starting upload of %v", s.src)
|
||||
s.setUploader(u)
|
||||
|
||||
@@ -131,7 +131,7 @@ func (r *apiServerRepository) Flush(ctx context.Context) error {
|
||||
return errors.Wrap(r.cli.Post(ctx, "flush", nil, nil), "Flush")
|
||||
}
|
||||
|
||||
func (r *apiServerRepository) NewWriter(ctx context.Context, purpose string) (RepositoryWriter, error) {
|
||||
func (r *apiServerRepository) NewWriter(ctx context.Context, opt WriteSessionOptions) (RepositoryWriter, error) {
|
||||
// apiServerRepository is stateless except object manager.
|
||||
r2 := *r
|
||||
w := &r2
|
||||
|
||||
@@ -139,12 +139,6 @@ func translateError(err error) error {
|
||||
}
|
||||
|
||||
func (s *b2Storage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes) error {
|
||||
progressCallback := blob.ProgressCallback(ctx)
|
||||
if progressCallback != nil {
|
||||
progressCallback(string(id), 0, int64(data.Length()))
|
||||
defer progressCallback(string(id), int64(data.Length()), int64(data.Length()))
|
||||
}
|
||||
|
||||
throttled, err := s.uploadThrottler.AddReader(ioutil.NopCloser(data.Reader()))
|
||||
if err != nil {
|
||||
return translateError(err)
|
||||
|
||||
@@ -150,15 +150,6 @@ func (fs *fsImpl) PutBlobInPath(ctx context.Context, dirPath, path string, data
|
||||
return errors.Wrap(err, "can't get random bytes")
|
||||
}
|
||||
|
||||
progressCallback := blob.ProgressCallback(ctx)
|
||||
|
||||
combinedLength := data.Length()
|
||||
|
||||
if progressCallback != nil {
|
||||
progressCallback(path, 0, int64(combinedLength))
|
||||
defer progressCallback(path, int64(combinedLength), int64(combinedLength))
|
||||
}
|
||||
|
||||
tempFile := fmt.Sprintf("%s.tmp.%x", path, randSuffix)
|
||||
|
||||
f, err := fs.createTempFileAndDir(tempFile)
|
||||
|
||||
@@ -104,20 +104,6 @@ func (gcs *gcsStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes)
|
||||
writer.ChunkSize = writerChunkSize
|
||||
writer.ContentType = "application/x-kopia"
|
||||
|
||||
combinedLength := data.Length()
|
||||
progressCallback := blob.ProgressCallback(ctx)
|
||||
|
||||
if progressCallback != nil {
|
||||
progressCallback(string(b), 0, int64(combinedLength))
|
||||
defer progressCallback(string(b), int64(combinedLength), int64(combinedLength))
|
||||
|
||||
writer.ProgressFunc = func(completed int64) {
|
||||
if completed != int64(combinedLength) {
|
||||
progressCallback(string(b), completed, int64(combinedLength))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_, err := iocopy.Copy(writer, data.Reader())
|
||||
if err != nil {
|
||||
// cancel context before closing the writer causes it to abandon the upload.
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
package blob
|
||||
|
||||
import "context"
|
||||
|
||||
type contextKey string
|
||||
|
||||
const progressCallbackContextKey contextKey = "progress-callback"
|
||||
|
||||
// ProgressFunc is used to report progress of a long-running storage operation.
|
||||
type ProgressFunc func(desc string, completed, total int64)
|
||||
|
||||
// WithUploadProgressCallback returns a context that passes callback function to be used storage upload progress.
|
||||
func WithUploadProgressCallback(ctx context.Context, callback ProgressFunc) context.Context {
|
||||
return context.WithValue(ctx, progressCallbackContextKey, callback)
|
||||
}
|
||||
|
||||
// ProgressCallback gets the progress callback function from the context.
|
||||
func ProgressCallback(ctx context.Context) ProgressFunc {
|
||||
pf, _ := ctx.Value(progressCallbackContextKey).(ProgressFunc)
|
||||
return pf
|
||||
}
|
||||
@@ -115,17 +115,8 @@ func (s *s3Storage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes) err
|
||||
return errors.Wrap(err, "AddReader")
|
||||
}
|
||||
|
||||
combinedLength := data.Length()
|
||||
|
||||
progressCallback := blob.ProgressCallback(ctx)
|
||||
if progressCallback != nil {
|
||||
progressCallback(string(b), 0, int64(combinedLength))
|
||||
defer progressCallback(string(b), int64(combinedLength), int64(combinedLength))
|
||||
}
|
||||
|
||||
uploadInfo, err := s.cli.PutObject(ctx, s.BucketName, s.getObjectNameString(b), throttled, int64(combinedLength), minio.PutObjectOptions{
|
||||
uploadInfo, err := s.cli.PutObject(ctx, s.BucketName, s.getObjectNameString(b), throttled, int64(data.Length()), minio.PutObjectOptions{
|
||||
ContentType: "application/x-kopia",
|
||||
Progress: newProgressReader(progressCallback, string(b), int64(combinedLength)),
|
||||
})
|
||||
|
||||
if errors.Is(err, io.EOF) && uploadInfo.Size == 0 {
|
||||
@@ -198,32 +189,6 @@ func (s *s3Storage) DisplayName() string {
|
||||
return fmt.Sprintf("S3: %v %v", s.Endpoint, s.BucketName)
|
||||
}
|
||||
|
||||
type progressReader struct {
|
||||
cb blob.ProgressFunc
|
||||
blobID string
|
||||
completed int64
|
||||
totalLength int64
|
||||
lastReported int64
|
||||
}
|
||||
|
||||
func (r *progressReader) Read(b []byte) (int, error) {
|
||||
r.completed += int64(len(b))
|
||||
if r.completed >= r.lastReported+1000000 && r.completed < r.totalLength {
|
||||
r.cb(r.blobID, r.completed, r.totalLength)
|
||||
r.lastReported = r.completed
|
||||
}
|
||||
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
func newProgressReader(cb blob.ProgressFunc, blobID string, totalLength int64) io.Reader {
|
||||
if cb == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &progressReader{cb: cb, blobID: blobID, totalLength: totalLength}
|
||||
}
|
||||
|
||||
func toBandwidth(bytesPerSecond int) iothrottler.Bandwidth {
|
||||
if bytesPerSecond <= 0 {
|
||||
return iothrottler.Unlimited
|
||||
|
||||
@@ -111,14 +111,6 @@ func (s *sftpImpl) PutBlobInPath(ctx context.Context, dirPath, fullPath string,
|
||||
return errors.Wrap(err, "can't get random bytes")
|
||||
}
|
||||
|
||||
progressCallback := blob.ProgressCallback(ctx)
|
||||
combinedLength := data.Length()
|
||||
|
||||
if progressCallback != nil {
|
||||
progressCallback(fullPath, 0, int64(combinedLength))
|
||||
defer progressCallback(fullPath, int64(combinedLength), int64(combinedLength))
|
||||
}
|
||||
|
||||
tempFile := fmt.Sprintf("%s.tmp.%x", fullPath, randSuffix)
|
||||
|
||||
f, err := s.createTempFileAndDir(tempFile)
|
||||
|
||||
@@ -59,12 +59,7 @@ func (c *contentCacheForData) getContent(ctx context.Context, cacheKey cacheKey,
|
||||
}
|
||||
|
||||
if err == nil && useCache {
|
||||
// do not report cache writes as uploads.
|
||||
if puterr := c.cacheStorage.PutBlob(
|
||||
blob.WithUploadProgressCallback(ctx, nil),
|
||||
blob.ID(cacheKey),
|
||||
gather.FromSlice(hmac.Append(b, c.hmacSecret)),
|
||||
); puterr != nil {
|
||||
if puterr := c.cacheStorage.PutBlob(ctx, blob.ID(cacheKey), gather.FromSlice(hmac.Append(b, c.hmacSecret))); puterr != nil {
|
||||
stats.Record(ctx, metricContentCacheStoreErrors.M(1))
|
||||
log(ctx).Warningf("unable to write cache item %v: %v", cacheKey, puterr)
|
||||
}
|
||||
|
||||
@@ -91,12 +91,8 @@ func (c *contentCacheForMetadata) getContent(ctx context.Context, cacheKey cache
|
||||
}
|
||||
|
||||
if useCache {
|
||||
// store the whole blob in the cache, do not report cache writes as uploads.
|
||||
if puterr := c.cacheStorage.PutBlob(
|
||||
blob.WithUploadProgressCallback(ctx, nil),
|
||||
blobID,
|
||||
gather.FromSlice(blobData),
|
||||
); puterr != nil {
|
||||
// store the whole blob in the cache.
|
||||
if puterr := c.cacheStorage.PutBlob(ctx, blobID, gather.FromSlice(blobData)); puterr != nil {
|
||||
stats.Record(ctx, metricContentCacheStoreErrors.M(1))
|
||||
log(ctx).Warningf("unable to write cache item %v: %v", blobID, puterr)
|
||||
}
|
||||
|
||||
@@ -90,6 +90,8 @@ type WriteManager struct {
|
||||
disableIndexFlushCount int
|
||||
flushPackIndexesAfter time.Time // time when those indexes should be flushed
|
||||
|
||||
onUpload func(int64)
|
||||
|
||||
*SharedManager
|
||||
}
|
||||
|
||||
@@ -352,6 +354,8 @@ func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error {
|
||||
data := b.Bytes()
|
||||
dataCopy := append([]byte(nil), data...)
|
||||
|
||||
bm.onUpload(int64(len(data)))
|
||||
|
||||
indexBlobMD, err := bm.indexBlobManager.writeIndexBlob(ctx, data, bm.currentSessionInfo.ID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error writing index blob")
|
||||
@@ -781,6 +785,7 @@ func NewManager(ctx context.Context, st blob.Storage, f *FormattingOptions, cach
|
||||
type SessionOptions struct {
|
||||
SessionUser string
|
||||
SessionHost string
|
||||
OnUpload func(int64)
|
||||
}
|
||||
|
||||
// NewWriteManager returns a session write manager.
|
||||
@@ -789,6 +794,10 @@ func NewWriteManager(sm *SharedManager, options SessionOptions) *WriteManager {
|
||||
|
||||
sm.addRef()
|
||||
|
||||
if options.OnUpload == nil {
|
||||
options.OnUpload = func(int64) {}
|
||||
}
|
||||
|
||||
return &WriteManager{
|
||||
SharedManager: sm,
|
||||
|
||||
@@ -800,5 +809,6 @@ func NewWriteManager(sm *SharedManager, options SessionOptions) *WriteManager {
|
||||
packIndexBuilder: make(packIndexBuilder),
|
||||
sessionUser: options.SessionUser,
|
||||
sessionHost: options.SessionHost,
|
||||
onUpload: options.OnUpload,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,6 +137,7 @@ func getPackedContentIV(output []byte, contentID ID) ([]byte, error) {
|
||||
|
||||
func (bm *WriteManager) writePackFileNotLocked(ctx context.Context, packFile blob.ID, data gather.Bytes) error {
|
||||
bm.Stats.wroteContent(data.Length())
|
||||
bm.onUpload(int64(data.Length()))
|
||||
|
||||
return bm.st.PutBlob(ctx, packFile, data)
|
||||
}
|
||||
|
||||
@@ -110,6 +110,8 @@ func (bm *WriteManager) writeSessionMarkerLocked(ctx context.Context) error {
|
||||
return errors.Wrap(err, "unable to encrypt session marker")
|
||||
}
|
||||
|
||||
bm.onUpload(int64(len(encrypted)))
|
||||
|
||||
if err := bm.st.PutBlob(ctx, sessionBlobID, gather.FromSlice(encrypted)); err != nil {
|
||||
return errors.Wrapf(err, "unable to write session marker: %v", string(sessionBlobID))
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@ type grpcRepositoryClient struct {
|
||||
innerSessionMutex sync.Mutex
|
||||
innerSession *grpcInnerSession
|
||||
|
||||
purpose string
|
||||
opt WriteSessionOptions
|
||||
isReadOnly bool
|
||||
transparentRetries bool
|
||||
|
||||
@@ -383,8 +383,8 @@ func (r *grpcInnerSession) Flush(ctx context.Context) error {
|
||||
return errNoSessionResponse()
|
||||
}
|
||||
|
||||
func (r *grpcRepositoryClient) NewWriter(ctx context.Context, purpose string) (RepositoryWriter, error) {
|
||||
w, err := newGRPCAPIRepositoryForConnection(ctx, r.conn, r.connRefCount, r.cliOpts, purpose, false)
|
||||
func (r *grpcRepositoryClient) NewWriter(ctx context.Context, opt WriteSessionOptions) (RepositoryWriter, error) {
|
||||
w, err := newGRPCAPIRepositoryForConnection(ctx, r.conn, r.connRefCount, r.cliOpts, opt, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -425,7 +425,7 @@ func (r *grpcRepositoryClient) retry(ctx context.Context, attempt sessionAttempt
|
||||
func (r *grpcRepositoryClient) inSessionWithoutRetry(ctx context.Context, attempt sessionAttemptFunc) (interface{}, error) {
|
||||
sess, err := r.getOrEstablishInnerSession(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "unable to establish session for purpose=%v", r.purpose)
|
||||
return nil, errors.Wrapf(err, "unable to establish session for purpose=%v", r.opt.Purpose)
|
||||
}
|
||||
|
||||
return attempt(ctx, sess)
|
||||
@@ -525,6 +525,8 @@ func (r *grpcInnerSession) GetContent(ctx context.Context, contentID content.ID)
|
||||
}
|
||||
|
||||
func (r *grpcRepositoryClient) WriteContent(ctx context.Context, data []byte, prefix content.ID) (content.ID, error) {
|
||||
r.opt.OnUpload(int64(len(data)))
|
||||
|
||||
v, err := r.inSessionWithoutRetry(ctx, func(ctx context.Context, sess *grpcInnerSession) (interface{}, error) {
|
||||
return sess.WriteContent(ctx, data, prefix)
|
||||
})
|
||||
@@ -636,7 +638,7 @@ func OpenGRPCAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts Clien
|
||||
return nil, errors.Wrap(err, "dial error")
|
||||
}
|
||||
|
||||
rep, err := newGRPCAPIRepositoryForConnection(ctx, conn, new(int32), cliOpts, "", true)
|
||||
rep, err := newGRPCAPIRepositoryForConnection(ctx, conn, new(int32), cliOpts, WriteSessionOptions{}, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -651,7 +653,7 @@ func (r *grpcRepositoryClient) getOrEstablishInnerSession(ctx context.Context) (
|
||||
if r.innerSession == nil {
|
||||
cli := apipb.NewKopiaRepositoryClient(r.conn)
|
||||
|
||||
log(ctx).Debugf("establishing new GRPC streaming session (purpose=%v)", r.purpose)
|
||||
log(ctx).Debugf("establishing new GRPC streaming session (purpose=%v)", r.opt.Purpose)
|
||||
|
||||
retryPolicy := retry.Always
|
||||
if r.transparentRetries && r.innerSessionAttemptCount == 0 {
|
||||
@@ -676,7 +678,7 @@ func (r *grpcRepositoryClient) getOrEstablishInnerSession(ctx context.Context) (
|
||||
|
||||
go newSess.readLoop(ctx)
|
||||
|
||||
newSess.repoParams, err = newSess.initializeSession(ctx, r.purpose, r.isReadOnly)
|
||||
newSess.repoParams, err = newSess.initializeSession(ctx, r.opt.Purpose, r.isReadOnly)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to initialize session")
|
||||
}
|
||||
@@ -704,13 +706,17 @@ func (r *grpcRepositoryClient) killInnerSession() {
|
||||
}
|
||||
|
||||
// newGRPCAPIRepositoryForConnection opens GRPC-based repository connection.
|
||||
func newGRPCAPIRepositoryForConnection(ctx context.Context, conn *grpc.ClientConn, connRefCount *int32, cliOpts ClientOptions, purpose string, transparentRetries bool) (*grpcRepositoryClient, error) {
|
||||
func newGRPCAPIRepositoryForConnection(ctx context.Context, conn *grpc.ClientConn, connRefCount *int32, cliOpts ClientOptions, opt WriteSessionOptions, transparentRetries bool) (*grpcRepositoryClient, error) {
|
||||
if opt.OnUpload == nil {
|
||||
opt.OnUpload = func(i int64) {}
|
||||
}
|
||||
|
||||
rr := &grpcRepositoryClient{
|
||||
connRefCount: connRefCount,
|
||||
conn: conn,
|
||||
cliOpts: cliOpts,
|
||||
transparentRetries: transparentRetries,
|
||||
purpose: purpose,
|
||||
opt: opt,
|
||||
isReadOnly: cliOpts.ReadOnly,
|
||||
}
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ type Repository interface {
|
||||
Time() time.Time
|
||||
ClientOptions() ClientOptions
|
||||
|
||||
NewWriter(ctx context.Context, purpose string) (RepositoryWriter, error)
|
||||
NewWriter(ctx context.Context, opt WriteSessionOptions) (RepositoryWriter, error)
|
||||
|
||||
UpdateDescription(d string)
|
||||
|
||||
@@ -52,7 +52,7 @@ type DirectRepository interface {
|
||||
ContentReader() content.Reader
|
||||
IndexBlobReader() content.IndexBlobReader
|
||||
|
||||
NewDirectWriter(ctx context.Context, purpose string) (DirectRepositoryWriter, error)
|
||||
NewDirectWriter(ctx context.Context, opt WriteSessionOptions) (DirectRepositoryWriter, error)
|
||||
|
||||
// misc
|
||||
UniqueID() []byte
|
||||
@@ -168,15 +168,16 @@ func (r *directRepository) UpdateDescription(d string) {
|
||||
}
|
||||
|
||||
// NewWriter returns new RepositoryWriter session for repository.
|
||||
func (r *directRepository) NewWriter(ctx context.Context, purpose string) (RepositoryWriter, error) {
|
||||
return r.NewDirectWriter(ctx, purpose)
|
||||
func (r *directRepository) NewWriter(ctx context.Context, opt WriteSessionOptions) (RepositoryWriter, error) {
|
||||
return r.NewDirectWriter(ctx, opt)
|
||||
}
|
||||
|
||||
// NewDirectWriter returns new DirectRepositoryWriter session for repository.
|
||||
func (r *directRepository) NewDirectWriter(ctx context.Context, purpose string) (DirectRepositoryWriter, error) {
|
||||
func (r *directRepository) NewDirectWriter(ctx context.Context, opt WriteSessionOptions) (DirectRepositoryWriter, error) {
|
||||
cmgr := content.NewWriteManager(r.sm, content.SessionOptions{
|
||||
SessionUser: r.cliOpts.Username,
|
||||
SessionHost: r.cliOpts.Hostname,
|
||||
OnUpload: opt.OnUpload,
|
||||
})
|
||||
|
||||
mmgr, err := manifest.NewManager(ctx, cmgr, manifest.ManagerOptions{
|
||||
@@ -308,12 +309,13 @@ func (r *directRepository) Time() time.Time {
|
||||
// WriteSessionOptions describes options for a write session.
|
||||
type WriteSessionOptions struct {
|
||||
Purpose string
|
||||
FlushOnFailure bool // whether to flush regardless of write sessionr result.
|
||||
FlushOnFailure bool // whether to flush regardless of write sessionr result.
|
||||
OnUpload func(int64) // invoke the provided function on each upload in the session
|
||||
}
|
||||
|
||||
// WriteSession executes the provided callback in a repository writer created for the purpose and flushes writes.
|
||||
func WriteSession(ctx context.Context, r Repository, opt WriteSessionOptions, cb func(w RepositoryWriter) error) error {
|
||||
w, err := r.NewWriter(ctx, opt.Purpose)
|
||||
w, err := r.NewWriter(ctx, opt)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to create writer")
|
||||
}
|
||||
@@ -323,7 +325,7 @@ func WriteSession(ctx context.Context, r Repository, opt WriteSessionOptions, cb
|
||||
|
||||
// DirectWriteSession executes the provided callback in a DirectRepositoryWriter created for the purpose and flushes writes.
|
||||
func DirectWriteSession(ctx context.Context, r DirectRepository, opt WriteSessionOptions, cb func(dw DirectRepositoryWriter) error) error {
|
||||
w, err := r.NewDirectWriter(ctx, opt.Purpose)
|
||||
w, err := r.NewDirectWriter(ctx, opt)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to create direct writer")
|
||||
}
|
||||
|
||||
@@ -336,17 +336,17 @@ func TestWriterScope(t *testing.T) {
|
||||
lw := rep.(repo.RepositoryWriter)
|
||||
|
||||
// w1, w2, w3 are indepdendent sessions.
|
||||
w1, err := rep.NewWriter(ctx, "writer1")
|
||||
w1, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "writer1"})
|
||||
must(t, err)
|
||||
|
||||
defer w1.Close(ctx)
|
||||
|
||||
w2, err := rep.NewWriter(ctx, "writer2")
|
||||
w2, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "writer2"})
|
||||
must(t, err)
|
||||
|
||||
defer w2.Close(ctx)
|
||||
|
||||
w3, err := rep.NewWriter(ctx, "writer3")
|
||||
w3, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "writer3"})
|
||||
must(t, err)
|
||||
|
||||
defer w3.Close(ctx)
|
||||
|
||||
@@ -206,13 +206,23 @@ func (p *CountingUploadProgress) Snapshot() UploadCounters {
|
||||
|
||||
// UITaskCounters returns UI task counters.
|
||||
func (p *CountingUploadProgress) UITaskCounters(final bool) map[string]uitask.CounterValue {
|
||||
m := map[string]uitask.CounterValue{
|
||||
"Cached Files": uitask.SimpleCounter(int64(atomic.LoadInt32(&p.counters.TotalCachedFiles))),
|
||||
"Hashed Files": uitask.SimpleCounter(int64(atomic.LoadInt32(&p.counters.TotalHashedFiles))),
|
||||
cachedFiles := int64(atomic.LoadInt32(&p.counters.TotalCachedFiles))
|
||||
hashedFiles := int64(atomic.LoadInt32(&p.counters.TotalHashedFiles))
|
||||
|
||||
"Cached Bytes": uitask.BytesCounter(atomic.LoadInt64(&p.counters.TotalCachedBytes)),
|
||||
"Hashed Bytes": uitask.BytesCounter(atomic.LoadInt64(&p.counters.TotalHashedBytes)),
|
||||
"Uploaded Bytes": uitask.BytesCounter(atomic.LoadInt64(&p.counters.TotalUploadedBytes)),
|
||||
cachedBytes := atomic.LoadInt64(&p.counters.TotalCachedBytes)
|
||||
hashedBytes := atomic.LoadInt64(&p.counters.TotalHashedBytes)
|
||||
|
||||
m := map[string]uitask.CounterValue{
|
||||
"Cached Files": uitask.SimpleCounter(cachedFiles),
|
||||
"Hashed Files": uitask.SimpleCounter(hashedFiles),
|
||||
"Processed Files": uitask.NoticeBytesCounter(hashedFiles + cachedFiles),
|
||||
|
||||
"Cached Bytes": uitask.BytesCounter(cachedBytes),
|
||||
"Hashed Bytes": uitask.BytesCounter(hashedBytes),
|
||||
"Processed Bytes": uitask.NoticeBytesCounter(hashedBytes + cachedBytes),
|
||||
|
||||
// bytes actually ploaded to the server (non-deduplicated)
|
||||
"Uploaded Bytes": uitask.NoticeBytesCounter(atomic.LoadInt64(&p.counters.TotalUploadedBytes)),
|
||||
|
||||
"Excluded Files": uitask.SimpleCounter(int64(atomic.LoadInt32(&p.counters.TotalExcludedFiles))),
|
||||
"Excluded Directories": uitask.SimpleCounter(int64(atomic.LoadInt32(&p.counters.TotalExcludedDirs))),
|
||||
|
||||
@@ -94,7 +94,7 @@ func newUploadTestHarness(ctx context.Context, t *testing.T) *uploadTestHarness
|
||||
sourceDir.AddFile("d2/d1/f1", []byte{1, 2, 3}, defaultPermissions)
|
||||
sourceDir.AddFile("d2/d1/f2", []byte{1, 2, 3, 4}, defaultPermissions)
|
||||
|
||||
w, err := rep.NewWriter(ctx, "test")
|
||||
w, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "test"})
|
||||
if err != nil {
|
||||
panic("writer creation error: " + err.Error())
|
||||
}
|
||||
|
||||
@@ -195,7 +195,7 @@ func (th *testHarness) openAnother(t *testing.T) repo.RepositoryWriter {
|
||||
r.Close(ctx)
|
||||
})
|
||||
|
||||
w, err := r.NewWriter(ctx, "test")
|
||||
w, err := r.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "test"})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ func testAPIServerRepository(t *testing.T, serverStartArgs []string, useGRPC, al
|
||||
|
||||
// open new write session in repository client
|
||||
|
||||
writeSess, err := rep.NewWriter(ctx, "some writer")
|
||||
writeSess, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "some writer"})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -141,7 +141,7 @@ func longLivedRepositoryTest(ctx context.Context, t *testing.T, cancel chan stru
|
||||
}
|
||||
defer rep.Close(ctx)
|
||||
|
||||
w, err := rep.(repo.DirectRepository).NewDirectWriter(ctx, "longLivedRepositoryTest")
|
||||
w, err := rep.(repo.DirectRepository).NewDirectWriter(ctx, repo.WriteSessionOptions{Purpose: "longLivedRepositoryTest"})
|
||||
if err != nil {
|
||||
t.Errorf("error opening writer: %v", err)
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user