mirror of
https://github.com/kopia/kopia.git
synced 2026-03-30 12:03:39 -04:00
refactor(general): leverage stats.CountSum in maintenance stats (#4963)
Leverage stats.CountSum and atomics in content rewrite. Rationale: simplifies code Also, fix typos and update comments for consistency.
This commit is contained in:
@@ -6,6 +6,7 @@
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
@@ -13,6 +14,7 @@
|
||||
"github.com/kopia/kopia/internal/contentlog"
|
||||
"github.com/kopia/kopia/internal/contentlog/logparam"
|
||||
"github.com/kopia/kopia/internal/contentparam"
|
||||
"github.com/kopia/kopia/internal/stats"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
@@ -62,14 +64,8 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
|
||||
cnt := getContentToRewrite(ctx, rep, opt)
|
||||
|
||||
var (
|
||||
mu sync.Mutex
|
||||
toRewriteBytes int64
|
||||
toRewriteCount int
|
||||
retainedBytes int64
|
||||
retainedCount int
|
||||
rewrittenBytes int64
|
||||
rewrittenCount int
|
||||
failedCount int
|
||||
toRewrite, retained, rewritten stats.CountSum
|
||||
failedCount atomic.Uint64
|
||||
)
|
||||
|
||||
if opt.Parallel == 0 {
|
||||
@@ -86,9 +82,7 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
|
||||
|
||||
for c := range cnt {
|
||||
if c.err != nil {
|
||||
mu.Lock()
|
||||
failedCount++
|
||||
mu.Unlock()
|
||||
failedCount.Add(1)
|
||||
|
||||
return
|
||||
}
|
||||
@@ -103,10 +97,7 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
|
||||
logparam.Bool("deleted", c.Deleted),
|
||||
logparam.Duration("age", age))
|
||||
|
||||
mu.Lock()
|
||||
retainedBytes += int64(c.PackedLength)
|
||||
retainedCount++
|
||||
mu.Unlock()
|
||||
retained.Add(int64(c.PackedLength))
|
||||
|
||||
continue
|
||||
}
|
||||
@@ -119,10 +110,7 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
|
||||
logparam.Bool("deleted", c.Deleted),
|
||||
logparam.Duration("age", age))
|
||||
|
||||
mu.Lock()
|
||||
toRewriteBytes += int64(c.PackedLength)
|
||||
toRewriteCount++
|
||||
mu.Unlock()
|
||||
toRewrite.Add(int64(c.PackedLength))
|
||||
|
||||
if opt.DryRun {
|
||||
continue
|
||||
@@ -142,15 +130,10 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
|
||||
contentparam.ContentID("contentID", c.ContentID),
|
||||
logparam.Error("error", err))
|
||||
|
||||
mu.Lock()
|
||||
failedCount++
|
||||
mu.Unlock()
|
||||
failedCount.Add(1)
|
||||
}
|
||||
} else {
|
||||
mu.Lock()
|
||||
rewrittenBytes += int64(c.PackedLength)
|
||||
rewrittenCount++
|
||||
mu.Unlock()
|
||||
rewritten.Add(int64(c.PackedLength))
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -158,18 +141,22 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
|
||||
|
||||
wg.Wait()
|
||||
|
||||
toRewriteCount, toRewriteBytes := toRewrite.Approximate()
|
||||
retainedCount, retainedBytes := retained.Approximate()
|
||||
rewrittenCount, rewrittenBytes := rewritten.Approximate()
|
||||
|
||||
result := &maintenancestats.RewriteContentsStats{
|
||||
ToRewriteContentCount: toRewriteCount,
|
||||
ToRewriteContentCount: int(toRewriteCount),
|
||||
ToRewriteContentSize: toRewriteBytes,
|
||||
RewrittenContentCount: rewrittenCount,
|
||||
RewrittenContentCount: int(rewrittenCount),
|
||||
RewrittenContentSize: rewrittenBytes,
|
||||
RetainedContentCount: retainedCount,
|
||||
RetainedContentCount: int(retainedCount),
|
||||
RetainedContentSize: retainedBytes,
|
||||
}
|
||||
|
||||
contentlog.Log1(ctx, log, "Rewritten contents", result)
|
||||
|
||||
if failedCount == 0 {
|
||||
if failedCount.Load() == 0 {
|
||||
if err := rep.ContentManager().Flush(ctx); err != nil {
|
||||
return nil, errors.Wrap(err, "error flushing repo")
|
||||
}
|
||||
@@ -177,7 +164,7 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
|
||||
return result, nil
|
||||
}
|
||||
|
||||
return nil, errors.Errorf("failed to rewrite %v contents", failedCount)
|
||||
return nil, errors.Errorf("failed to rewrite %v contents", failedCount.Load())
|
||||
}
|
||||
|
||||
func getContentToRewrite(ctx context.Context, rep repo.DirectRepository, opt *RewriteContentsOptions) <-chan contentInfoOrError {
|
||||
|
||||
@@ -293,15 +293,15 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety Sa
|
||||
// running full orphaned blob deletion, otherwise next quick maintenance will start a quick rewrite
|
||||
// and we'd never delete blobs orphaned by full rewrite.
|
||||
if hadRecentFullRewrite(s) {
|
||||
userLog(ctx).Debug("Had recent full rewrite - performing full blob deletion.")
|
||||
userLog(ctx).Debug("Had recent full rewrite - performing full pack deletion.")
|
||||
err = runTaskDeleteOrphanedPacksFull(ctx, runParams, s, safety)
|
||||
} else {
|
||||
userLog(ctx).Debug("Performing quick blob deletion.")
|
||||
userLog(ctx).Debug("Performing quick pack deletion.")
|
||||
err = runTaskDeleteOrphanedPacksQuick(ctx, runParams, s, safety)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error deleting unreferenced metadata blobs")
|
||||
return errors.Wrap(err, "error deleting unreferenced metadata packs")
|
||||
}
|
||||
} else {
|
||||
notDeletingOrphanedPacks(ctx, log, s, safety)
|
||||
@@ -327,7 +327,7 @@ func notRewritingContents(ctx context.Context, log *contentlog.Logger) {
|
||||
func notDeletingOrphanedPacks(ctx context.Context, log *contentlog.Logger, s *Schedule, safety SafetyParameters) {
|
||||
left := nextPackDeleteTime(s, safety).Sub(clock.Now()).Truncate(time.Second)
|
||||
|
||||
contentlog.Log1(ctx, log, "Skipping blob deletion because not enough time has passed yet", logparam.Duration("left", left))
|
||||
contentlog.Log1(ctx, log, "Skipping pack deletion because not enough time has passed yet", logparam.Duration("left", left))
|
||||
}
|
||||
|
||||
func runTaskCleanupLogs(ctx context.Context, runParams RunParameters, s *Schedule) error {
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
|
||||
const rewriteContentsStatsKind = "rewriteContentsStats"
|
||||
|
||||
// RewriteContentsStats are the stats for rewritting contents.
|
||||
// RewriteContentsStats are the stats for rewriting contents.
|
||||
type RewriteContentsStats struct {
|
||||
ToRewriteContentCount int `json:"toRewriteContentCount"`
|
||||
ToRewriteContentSize int64 `json:"toRewriteContentSize"`
|
||||
@@ -32,7 +32,7 @@ func (rs *RewriteContentsStats) WriteValueTo(jw *contentlog.JSONWriter) {
|
||||
|
||||
// Summary generates a human readable summary for the stats.
|
||||
func (rs *RewriteContentsStats) Summary() string {
|
||||
return fmt.Sprintf("Found %v(%v) contents to rewrite, and rewritten %v(%v). Retained %v(%v) contents from rewrite",
|
||||
return fmt.Sprintf("Found %v(%v) contents to rewrite and rewrote %v(%v). Retained %v(%v) contents from rewrite",
|
||||
rs.ToRewriteContentCount, rs.ToRewriteContentSize, rs.RewrittenContentCount, rs.RewrittenContentSize, rs.RetainedContentCount, rs.RetainedContentSize)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user