feature(general): maintenance stats for rewrite contents phase (#4961)

This commit is contained in:
lyndon-li
2025-11-07 02:22:58 +08:00
committed by GitHub
parent 1961ed4dc3
commit 3b7f193c29
8 changed files with 152 additions and 20 deletions

View File

@@ -48,8 +48,7 @@ func (c *commandContentRewrite) runContentRewriteCommand(ctx context.Context, re
return err
}
//nolint:wrapcheck
return maintenance.RewriteContents(ctx, rep, &maintenance.RewriteContentsOptions{
_, err = maintenance.RewriteContents(ctx, rep, &maintenance.RewriteContentsOptions{
ContentIDRange: c.contentRange.contentIDRange(),
ContentIDs: contentIDs,
FormatVersion: c.contentRewriteFormatVersion,
@@ -58,6 +57,8 @@ func (c *commandContentRewrite) runContentRewriteCommand(ctx context.Context, re
ShortPacks: c.contentRewriteShortPacks,
DryRun: c.contentRewriteDryRun,
}, c.contentRewriteSafety)
return errors.Wrap(err, "error rewriting contents")
}
func toContentIDs(s []string) ([]content.ID, error) {

View File

@@ -16,6 +16,7 @@
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/maintenancestats"
)
const parallelContentRewritesCPUMultiplier = 2
@@ -42,14 +43,14 @@ type contentInfoOrError struct {
// blobs and index entries to point at them.
//
//nolint:funlen
func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *RewriteContentsOptions, safety SafetyParameters) error {
func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *RewriteContentsOptions, safety SafetyParameters) (*maintenancestats.RewriteContentsStats, error) {
ctx = contentlog.WithParams(ctx,
logparam.String("span:content-rewrite", contentlog.RandomSpanID()))
log := rep.LogManager().NewLogger("maintenance-content-rewrite")
if opt == nil {
return errors.New("missing options")
return nil, errors.New("missing options")
}
if opt.ShortPacks {
@@ -61,9 +62,14 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
cnt := getContentToRewrite(ctx, rep, opt)
var (
mu sync.Mutex
totalBytes int64
failedCount int
mu sync.Mutex
toRewriteBytes int64
toRewriteCount int
retainedBytes int64
retainedCount int
rewrittenBytes int64
rewrittenCount int
failedCount int
)
if opt.Parallel == 0 {
@@ -97,6 +103,11 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
logparam.Bool("deleted", c.Deleted),
logparam.Duration("age", age))
mu.Lock()
retainedBytes += int64(c.PackedLength)
retainedCount++
mu.Unlock()
continue
}
@@ -109,7 +120,8 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
logparam.Duration("age", age))
mu.Lock()
totalBytes += int64(c.PackedLength)
toRewriteBytes += int64(c.PackedLength)
toRewriteCount++
mu.Unlock()
if opt.DryRun {
@@ -134,6 +146,11 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
failedCount++
mu.Unlock()
}
} else {
mu.Lock()
rewrittenBytes += int64(c.PackedLength)
rewrittenCount++
mu.Unlock()
}
}
}()
@@ -141,16 +158,26 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
wg.Wait()
contentlog.Log1(ctx, log,
"Total bytes rewritten",
logparam.Int64("bytes", totalBytes))
if failedCount == 0 {
//nolint:wrapcheck
return rep.ContentManager().Flush(ctx)
result := &maintenancestats.RewriteContentsStats{
ToRewriteContentCount: toRewriteCount,
ToRewriteContentSize: toRewriteBytes,
RewrittenContentCount: rewrittenCount,
RewrittenContentSize: rewrittenBytes,
RetainedContentCount: retainedCount,
RetainedContentSize: retainedBytes,
}
return errors.Errorf("failed to rewrite %v contents", failedCount)
contentlog.Log1(ctx, log, "Rewritten contents", result)
if failedCount == 0 {
if err := rep.ContentManager().Flush(ctx); err != nil {
return nil, errors.Wrap(err, "error flushing repo")
}
return result, nil
}
return nil, errors.Errorf("failed to rewrite %v contents", failedCount)
}
func getContentToRewrite(ctx context.Context, rep repo.DirectRepository, opt *RewriteContentsOptions) <-chan contentInfoOrError {

View File

@@ -12,6 +12,7 @@
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/repo/maintenancestats"
"github.com/kopia/kopia/repo/object"
)
@@ -22,6 +23,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {
opt *maintenance.RewriteContentsOptions
wantPDelta int
wantQDelta int
stats *maintenancestats.RewriteContentsStats
}{
{
numPContents: 2,
@@ -31,6 +33,12 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {
},
wantPDelta: 1,
wantQDelta: 1,
stats: &maintenancestats.RewriteContentsStats{
ToRewriteContentCount: 5,
ToRewriteContentSize: 320,
RewrittenContentCount: 5,
RewrittenContentSize: 320,
},
},
{
numPContents: 2,
@@ -41,6 +49,10 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {
},
wantPDelta: 0,
wantQDelta: 0,
stats: &maintenancestats.RewriteContentsStats{
ToRewriteContentCount: 5,
ToRewriteContentSize: 320,
},
},
{
numPContents: 2,
@@ -51,6 +63,12 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {
},
wantPDelta: 1,
wantQDelta: 0,
stats: &maintenancestats.RewriteContentsStats{
ToRewriteContentCount: 2,
ToRewriteContentSize: 128,
RewrittenContentCount: 2,
RewrittenContentSize: 128,
},
},
{
numPContents: 1,
@@ -60,6 +78,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {
},
wantPDelta: 0, // single pack won't get rewritten
wantQDelta: 0,
stats: &maintenancestats.RewriteContentsStats{},
},
{
numPContents: 1,
@@ -69,6 +88,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {
},
wantPDelta: 0,
wantQDelta: 0,
stats: &maintenancestats.RewriteContentsStats{},
},
}
@@ -103,8 +123,11 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {
qBlobsBefore, err := blob.ListAllBlobs(ctx, env.RepositoryWriter.BlobStorage(), "q")
require.NoError(t, err)
var stats *maintenancestats.RewriteContentsStats
require.NoError(t, repo.DirectWriteSession(ctx, env.RepositoryWriter, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.DirectRepositoryWriter) error {
return maintenance.RewriteContents(ctx, w, tc.opt, maintenance.SafetyNone)
stats, err = maintenance.RewriteContents(ctx, w, tc.opt, maintenance.SafetyNone)
return err
}))
pBlobsAfter, err := blob.ListAllBlobs(ctx, env.RepositoryWriter.BlobStorage(), "p")
@@ -115,6 +138,12 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {
require.Equal(t, tc.wantPDelta, len(pBlobsAfter)-len(pBlobsBefore), "invalid p blob count delta")
require.Equal(t, tc.wantQDelta, len(qBlobsAfter)-len(qBlobsBefore), "invalid q blob count delta")
if tc.stats == nil {
require.Nil(t, stats)
} else {
require.Equal(t, *tc.stats, *stats)
}
})
}
}

View File

@@ -450,7 +450,7 @@ func runTaskDropDeletedContentsFull(ctx context.Context, runParams RunParameters
func runTaskRewriteContentsQuick(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskRewriteContentsQuick, s, func() (maintenancestats.Kind, error) {
return nil, RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{
return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{
ContentIDRange: index.AllPrefixedIDs,
PackPrefix: content.PackBlobIDPrefixSpecial,
ShortPacks: true,
@@ -460,7 +460,7 @@ func runTaskRewriteContentsQuick(ctx context.Context, runParams RunParameters, s
func runTaskRewriteContentsFull(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskRewriteContentsFull, s, func() (maintenancestats.Kind, error) {
return nil, RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{
return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{
ContentIDRange: index.AllIDs,
ShortPacks: true,
}, safety)

View File

@@ -66,6 +66,8 @@ func BuildFromExtra(stats Extra) (Summarizer, error) {
result = &ExtendBlobRetentionStats{}
case cleanupLogsStatsKind:
result = &CleanupLogsStats{}
case rewriteContentsStatsKind:
result = &RewriteContentsStats{}
default:
return nil, errors.Wrapf(ErrUnSupportedStatKindError, "invalid kind for stats %v", stats)
}

View File

@@ -119,6 +119,21 @@ func TestBuildExtraSuccess(t *testing.T) {
Data: []byte(`{"toDeleteBlobCount":10,"toDeleteBlobSize":1024,"deletedBlobCount":5,"deletedBlobSize":512,"retainedBlobCount":20,"retainedBlobSize":2048}`),
},
},
{
name: "RewriteContentsStats",
stats: &RewriteContentsStats{
ToRewriteContentCount: 30,
ToRewriteContentSize: 3092,
RewrittenContentCount: 10,
RewrittenContentSize: 1024,
RetainedContentCount: 20,
RetainedContentSize: 2048,
},
expected: Extra{
Kind: rewriteContentsStatsKind,
Data: []byte(`{"toRewriteContentCount":30,"toRewriteContentSize":3092,"rewrittenContentCount":10,"rewrittenContentSize":1024,"retainedContentCount":20,"retainedContentSize":2048}`),
},
},
}
for _, tc := range cases {
@@ -273,6 +288,21 @@ func TestBuildFromExtraSuccess(t *testing.T) {
DeletedBlobSize: 512,
},
},
{
name: "RewriteContentsStats",
stats: Extra{
Kind: rewriteContentsStatsKind,
Data: []byte(`{"toRewriteContentCount":30,"toRewriteContentSize":3092,"rewrittenContentCount":10,"rewrittenContentSize":1024,"retainedContentCount":20,"retainedContentSize":2048}`),
},
expected: &RewriteContentsStats{
ToRewriteContentCount: 30,
ToRewriteContentSize: 3092,
RewrittenContentCount: 10,
RewrittenContentSize: 1024,
RetainedContentCount: 20,
RetainedContentSize: 2048,
},
},
}
for _, tc := range cases {

View File

@@ -32,7 +32,8 @@ func (ds *DeleteUnreferencedPacksStats) WriteValueTo(jw *contentlog.JSONWriter)
// Summary generates a human readable summary for the stats.
func (ds *DeleteUnreferencedPacksStats) Summary() string {
return fmt.Sprintf("Found %v(%v) unreferenced pack blobs, deleted %v(%v) and retained %v(%v).", ds.UnreferencedPackCount, ds.UnreferencedTotalSize, ds.DeletedPackCount, ds.DeletedTotalSize, ds.RetainedPackCount, ds.RetainedTotalSize)
return fmt.Sprintf("Found %v(%v) unreferenced pack blobs to delete and deleted %v(%v). Retained %v(%v) unreferenced pack blobs.",
ds.UnreferencedPackCount, ds.UnreferencedTotalSize, ds.DeletedPackCount, ds.DeletedTotalSize, ds.RetainedPackCount, ds.RetainedTotalSize)
}
// Kind returns the kind name for the stats.

View File

@@ -0,0 +1,42 @@
package maintenancestats
import (
"fmt"
"github.com/kopia/kopia/internal/contentlog"
)
const rewriteContentsStatsKind = "rewriteContentsStats"
// RewriteContentsStats are the stats for rewritting contents.
type RewriteContentsStats struct {
ToRewriteContentCount int `json:"toRewriteContentCount"`
ToRewriteContentSize int64 `json:"toRewriteContentSize"`
RewrittenContentCount int `json:"rewrittenContentCount"`
RewrittenContentSize int64 `json:"rewrittenContentSize"`
RetainedContentCount int `json:"retainedContentCount"`
RetainedContentSize int64 `json:"retainedContentSize"`
}
// WriteValueTo writes the stats to JSONWriter.
func (rs *RewriteContentsStats) WriteValueTo(jw *contentlog.JSONWriter) {
jw.BeginObjectField(rs.Kind())
jw.IntField("toRewriteContentCount", rs.ToRewriteContentCount)
jw.Int64Field("toRewriteContentSize", rs.ToRewriteContentSize)
jw.IntField("rewrittenContentCount", rs.RewrittenContentCount)
jw.Int64Field("rewrittenContentSize", rs.RewrittenContentSize)
jw.IntField("retainedContentCount", rs.RetainedContentCount)
jw.Int64Field("retainedContentSize", rs.RetainedContentSize)
jw.EndObject()
}
// Summary generates a human readable summary for the stats.
func (rs *RewriteContentsStats) Summary() string {
return fmt.Sprintf("Found %v(%v) contents to rewrite, and rewritten %v(%v). Retained %v(%v) contents from rewrite",
rs.ToRewriteContentCount, rs.ToRewriteContentSize, rs.RewrittenContentCount, rs.RewrittenContentSize, rs.RetainedContentCount, rs.RetainedContentSize)
}
// Kind returns the kind name for the stats.
func (rs *RewriteContentsStats) Kind() string {
return rewriteContentsStatsKind
}