refactor(general): make counters and size stats uint64 for consistency (#5262)

- make counters and size stats uint64 for consistency
- pass `deleteParallelism` as a parameter
This commit is contained in:
Julio López
2026-04-10 21:34:17 -07:00
committed by GitHub
parent e1dce8a049
commit 2a8ac65be4
21 changed files with 223 additions and 164 deletions

View File

@@ -320,18 +320,18 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot, p *Pa
// may have not observed them yet.
maxReplacementTime := maxTime.Add(-p.CleanupSafetyMargin)
var deletedEpochMarkersCount, deletedWatermarksCount atomic.Int64
var deletedEpochMarkersCount, deletedWatermarksCount atomic.Uint64
eg.Go(func() error {
deleted, err := e.cleanupEpochMarkers(ctx, cs)
deletedEpochMarkersCount.Store(int64(deleted))
deleted, err := e.cleanupEpochMarkers(ctx, cs, p.DeleteParallelism)
deletedEpochMarkersCount.Store(deleted)
return err
})
eg.Go(func() error {
deleted, err := e.cleanupWatermarks(ctx, cs, p, maxReplacementTime)
deletedWatermarksCount.Store(int64(deleted))
deleted, err := e.cleanupWatermarks(ctx, cs, maxReplacementTime, p.DeleteParallelism)
deletedWatermarksCount.Store(deleted)
return err
})
@@ -341,14 +341,14 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot, p *Pa
}
result := &maintenancestats.CleanupMarkersStats{
DeletedEpochMarkerBlobCount: int(deletedEpochMarkersCount.Load()),
DeletedWatermarkBlobCount: int(deletedWatermarksCount.Load()),
DeletedEpochMarkerBlobCount: deletedEpochMarkersCount.Load(),
DeletedWatermarkBlobCount: deletedWatermarksCount.Load(),
}
return result, nil
}
func (e *Manager) cleanupEpochMarkers(ctx context.Context, cs CurrentSnapshot) (int, error) {
func (e *Manager) cleanupEpochMarkers(ctx context.Context, cs CurrentSnapshot, deleteParallelism int) (uint64, error) {
// delete epoch markers for epoch < current-1
var toDelete []blob.ID
@@ -360,15 +360,10 @@ func (e *Manager) cleanupEpochMarkers(ctx context.Context, cs CurrentSnapshot) (
}
}
p, err := e.getParameters(ctx)
if err != nil {
return 0, err
}
return len(toDelete), errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, p.DeleteParallelism), "error deleting index blob marker")
return uint64(len(toDelete)), errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, deleteParallelism), "error deleting index blob marker")
}
func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, p *Parameters, maxReplacementTime time.Time) (int, error) {
func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, maxReplacementTime time.Time, deleteParallelism int) (uint64, error) {
var toDelete []blob.ID
for _, bm := range cs.DeletionWatermarkBlobs {
@@ -386,7 +381,7 @@ func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, p *
}
}
return len(toDelete), errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, p.DeleteParallelism), "error deleting watermark blobs")
return uint64(len(toDelete)), errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, deleteParallelism), "error deleting watermark blobs")
}
// CleanupSupersededIndexes cleans up the indexes which have been superseded by compacted ones.
@@ -426,13 +421,13 @@ func (e *Manager) CleanupSupersededIndexes(ctx context.Context) (*maintenancesta
var toDelete []blob.ID
var deletedTotalSize int64
var deletedTotalSize uint64
for _, bm := range blobs {
if epoch, ok := epochNumberFromBlobID(bm.BlobID); ok {
if blobSetWrittenEarlyEnough(cs.SingleEpochCompactionSets[epoch], maxReplacementTime) {
toDelete = append(toDelete, bm.BlobID)
deletedTotalSize += bm.Length
deletedTotalSize += maintenancestats.ToUint64(bm.Length)
}
}
}
@@ -443,7 +438,7 @@ func (e *Manager) CleanupSupersededIndexes(ctx context.Context) (*maintenancesta
return &maintenancestats.CleanupSupersededIndexesStats{
MaxReplacementTime: maxReplacementTime,
DeletedBlobCount: len(toDelete),
DeletedBlobCount: uint64(len(toDelete)),
DeletedTotalSize: deletedTotalSize,
}, nil
}
@@ -613,8 +608,8 @@ func (e *Manager) MaybeGenerateRangeCheckpoint(ctx context.Context) (*maintenanc
}
return &maintenancestats.GenerateRangeCheckpointStats{
RangeMinEpoch: firstNonRangeCompacted,
RangeMaxEpoch: latestSettled,
RangeMinEpoch: maintenancestats.ToUint64(firstNonRangeCompacted),
RangeMaxEpoch: maintenancestats.ToUint64(latestSettled),
}, nil
}
@@ -751,7 +746,7 @@ func (e *Manager) MaybeAdvanceWriteEpoch(ctx context.Context) (*maintenancestats
e.mu.Unlock()
result := &maintenancestats.AdvanceEpochStats{
CurrentEpoch: cs.WriteEpoch,
CurrentEpoch: maintenancestats.ToUint64(cs.WriteEpoch),
}
if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], p.MinEpochDuration, p.EpochAdvanceOnCountThreshold, p.EpochAdvanceOnTotalSizeBytesThreshold) {
@@ -759,7 +754,7 @@ func (e *Manager) MaybeAdvanceWriteEpoch(ctx context.Context) (*maintenancestats
return nil, errors.Wrap(err, "error advancing epoch")
}
result.CurrentEpoch = cs.WriteEpoch + 1
result.CurrentEpoch = maintenancestats.ToUint64(cs.WriteEpoch + 1)
result.WasAdvanced = true
}
@@ -1043,15 +1038,15 @@ func (e *Manager) MaybeCompactSingleEpoch(ctx context.Context) (*maintenancestat
uncompactedBlobs = ue
}
var uncompactedSize int64
var uncompactedSize uint64
for _, b := range uncompactedBlobs {
uncompactedSize += b.Length
uncompactedSize += maintenancestats.ToUint64(b.Length)
}
result := &maintenancestats.CompactSingleEpochStats{
SupersededIndexBlobCount: len(uncompactedBlobs),
SupersededIndexBlobCount: uint64(len(uncompactedBlobs)),
SupersededIndexTotalSize: uncompactedSize,
Epoch: uncompacted,
Epoch: maintenancestats.ToUint64(uncompacted),
}
contentlog.Log1(ctx, e.log, "starting single-epoch compaction for epoch", result)

View File

@@ -624,7 +624,7 @@ func TestMaybeAdvanceEpoch_Empty(t *testing.T) {
stats, err := te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.NoError(t, err)
require.Equal(t, 0, stats.CurrentEpoch)
require.EqualValues(t, 0, stats.CurrentEpoch)
require.False(t, stats.WasAdvanced)
// check current epoch again
@@ -674,7 +674,7 @@ func TestMaybeAdvanceEpoch(t *testing.T) {
stats, err := te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.NoError(t, err)
require.Equal(t, 1, stats.CurrentEpoch)
require.EqualValues(t, 1, stats.CurrentEpoch)
require.True(t, stats.WasAdvanced)
err = te.mgr.Refresh(ctx) // force state refresh
@@ -947,7 +947,7 @@ func TestMaybeCompactSingleEpoch_CompactionError(t *testing.T) {
stats, err := te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.NoError(t, err)
require.Equal(t, j+1, stats.CurrentEpoch)
require.EqualValues(t, j+1, stats.CurrentEpoch)
require.True(t, stats.WasAdvanced)
err = te.mgr.Refresh(ctx) // force state refresh
@@ -1000,7 +1000,7 @@ func TestMaybeCompactSingleEpoch(t *testing.T) {
stats, err := te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.NoError(t, err)
require.Equal(t, j+1, stats.CurrentEpoch)
require.EqualValues(t, j+1, stats.CurrentEpoch)
require.True(t, stats.WasAdvanced)
err = te.mgr.Refresh(ctx) // force state refresh
@@ -1024,8 +1024,8 @@ func TestMaybeCompactSingleEpoch(t *testing.T) {
for j := range newestEpochToCompact {
stats, err := te.mgr.MaybeCompactSingleEpoch(ctx)
require.NoError(t, err)
require.Equal(t, idxCount, stats.SupersededIndexBlobCount)
require.Equal(t, j, stats.Epoch)
require.EqualValues(t, idxCount, stats.SupersededIndexBlobCount)
require.EqualValues(t, j, stats.Epoch)
err = te.mgr.Refresh(ctx) // force state refresh
require.NoError(t, err)
@@ -1127,7 +1127,7 @@ func TestMaybeGenerateRangeCheckpoint_CompactionError(t *testing.T) {
stats, err := te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.NoError(t, err)
require.Equal(t, j+1, stats.CurrentEpoch)
require.EqualValues(t, j+1, stats.CurrentEpoch)
require.True(t, stats.WasAdvanced)
err = te.mgr.Refresh(ctx)
@@ -1178,7 +1178,7 @@ func TestMaybeGenerateRangeCheckpoint_FromUncompactedEpochs(t *testing.T) {
stats, err := te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.NoError(t, err)
require.Equal(t, j+1, stats.CurrentEpoch)
require.EqualValues(t, j+1, stats.CurrentEpoch)
require.True(t, stats.WasAdvanced)
err = te.mgr.Refresh(ctx)
@@ -1193,8 +1193,8 @@ func TestMaybeGenerateRangeCheckpoint_FromUncompactedEpochs(t *testing.T) {
stats, err := te.mgr.MaybeGenerateRangeCheckpoint(ctx)
require.NoError(t, err)
require.Equal(t, 0, stats.RangeMinEpoch)
require.Equal(t, 8, stats.RangeMaxEpoch)
require.EqualValues(t, 0, stats.RangeMinEpoch)
require.EqualValues(t, 8, stats.RangeMaxEpoch)
err = te.mgr.Refresh(ctx)
require.NoError(t, err)
@@ -1233,7 +1233,7 @@ func TestMaybeGenerateRangeCheckpoint_FromCompactedEpochs(t *testing.T) {
stats, err := te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.NoError(t, err)
require.Equal(t, j+1, stats.CurrentEpoch)
require.EqualValues(t, j+1, stats.CurrentEpoch)
require.True(t, stats.WasAdvanced)
err = te.mgr.Refresh(ctx)
@@ -1250,8 +1250,8 @@ func TestMaybeGenerateRangeCheckpoint_FromCompactedEpochs(t *testing.T) {
for j := range newestEpochToCompact {
stats, err := te.mgr.MaybeCompactSingleEpoch(ctx)
require.NoError(t, err)
require.Equal(t, idxCount, stats.SupersededIndexBlobCount)
require.Equal(t, j, stats.Epoch)
require.EqualValues(t, idxCount, stats.SupersededIndexBlobCount)
require.EqualValues(t, j, stats.Epoch)
err = te.mgr.Refresh(ctx) // force state refresh
require.NoError(t, err)
@@ -1270,8 +1270,8 @@ func TestMaybeGenerateRangeCheckpoint_FromCompactedEpochs(t *testing.T) {
stats, err := te.mgr.MaybeGenerateRangeCheckpoint(ctx)
require.NoError(t, err)
require.Equal(t, 0, stats.RangeMinEpoch)
require.Equal(t, 8, stats.RangeMaxEpoch)
require.EqualValues(t, 0, stats.RangeMinEpoch)
require.EqualValues(t, 8, stats.RangeMaxEpoch)
err = te.mgr.Refresh(ctx)
require.NoError(t, err)

View File

@@ -56,7 +56,7 @@ func extendBlobRetentionTime(ctx context.Context, rep repo.DirectRepositoryWrite
var (
wg errgroup.Group
extendedCount, toExtend, failedCount atomic.Uint32
extendedCount, toExtend, failedCount atomic.Uint64
)
if opt.Parallel == 0 {
@@ -79,7 +79,7 @@ func extendBlobRetentionTime(ctx context.Context, rep repo.DirectRepositoryWrite
}
if currentCount := extendedCount.Add(1); currentCount%100 == 0 {
contentlog.Log1(ctx, log, "extended blobs", logparam.UInt32("count", currentCount))
contentlog.Log1(ctx, log, "extended blobs", logparam.UInt64("count", currentCount))
}
}
@@ -100,7 +100,7 @@ func extendBlobRetentionTime(ctx context.Context, rep repo.DirectRepositoryWrite
close(extend)
contentlog.Log1(ctx, log, "Found blobs to extend", logparam.UInt32("count", toExtend.Load()))
contentlog.Log1(ctx, log, "Found blobs to extend", logparam.UInt64("count", toExtend.Load()))
errWait := wg.Wait() // wait for all extend workers to finish.
impossible.PanicOnError(errWait)

View File

@@ -73,8 +73,8 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTime(t *testing.T) {
// extend retention time of all blobs
stats, err := maintenance.ExtendBlobRetentionTime(ctx, env.RepositoryWriter, maintenance.ExtendBlobRetentionTimeOptions{})
require.NoError(t, err)
require.Equal(t, uint32(4), stats.ToExtendBlobCount)
require.Equal(t, uint32(4), stats.ExtendedBlobCount)
require.EqualValues(t, 4, stats.ToExtendBlobCount)
require.EqualValues(t, 4, stats.ExtendedBlobCount)
require.Equal(t, "24h0m0s", stats.RetentionPeriod)
gotMode, expiry, err = st.GetRetention(ctx, blobsBefore[lastBlobIdx].BlobID)

View File

@@ -64,14 +64,14 @@ func CleanupLogs(ctx context.Context, rep repo.DirectRepositoryWriter, opt LogRe
return allLogBlobs[i].Timestamp.After(allLogBlobs[j].Timestamp)
})
var retainedSize int64
var retainedSize uint64
deletePosition := len(allLogBlobs)
for i, bm := range allLogBlobs {
retainedSize += bm.Length
retainedSize += maintenancestats.ToUint64(bm.Length)
if retainedSize > opt.MaxTotalSize && opt.MaxTotalSize > 0 {
if opt.MaxTotalSize > 0 && retainedSize > uint64(opt.MaxTotalSize) {
deletePosition = i
break
}
@@ -89,15 +89,15 @@ func CleanupLogs(ctx context.Context, rep repo.DirectRepositoryWriter, opt LogRe
toDelete := allLogBlobs[deletePosition:]
var toDeleteSize int64
var toDeleteSize uint64
for _, bm := range toDelete {
toDeleteSize += bm.Length
toDeleteSize += maintenancestats.ToUint64(bm.Length)
}
result := &maintenancestats.CleanupLogsStats{
RetainedBlobCount: deletePosition,
RetainedBlobCount: maintenancestats.ToUint64(deletePosition),
RetainedBlobSize: retainedSize,
ToDeleteBlobCount: len(toDelete),
ToDeleteBlobCount: maintenancestats.ToUint64(len(toDelete)),
ToDeleteBlobSize: toDeleteSize,
DeletedBlobCount: 0,
DeletedBlobSize: 0,

View File

@@ -142,12 +142,12 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
rewrittenCount, rewrittenBytes := rewritten.Approximate()
result := &maintenancestats.RewriteContentsStats{
ToRewriteContentCount: int(toRewriteCount),
ToRewriteContentSize: toRewriteBytes,
RewrittenContentCount: int(rewrittenCount),
RewrittenContentSize: rewrittenBytes,
RetainedContentCount: int(retainedCount),
RetainedContentSize: retainedBytes,
ToRewriteContentCount: uint64(toRewriteCount),
ToRewriteContentSize: maintenancestats.ToUint64(toRewriteBytes),
RewrittenContentCount: uint64(rewrittenCount),
RewrittenContentSize: maintenancestats.ToUint64(rewrittenBytes),
RetainedContentCount: uint64(retainedCount),
RetainedContentSize: maintenancestats.ToUint64(retainedBytes),
}
contentlog.Log1(ctx, log, "Rewritten contents", result)

View File

@@ -334,7 +334,7 @@ func runTaskCleanupLogs(ctx context.Context, runParams RunParameters, s *Schedul
return ReportRun(ctx, runParams.rep, TaskCleanupLogs, s, func() (maintenancestats.Kind, error) {
stats, err := CleanupLogs(ctx, runParams.rep, runParams.Params.LogRetention.OrDefault())
var deletedLogCount int
var deletedLogCount uint64
if stats != nil {
deletedLogCount = stats.DeletedBlobCount
}

View File

@@ -147,10 +147,10 @@ func DeleteUnreferencedPacks(ctx context.Context, rep repo.DirectRepositoryWrite
retainedCount, retainedSize := retained.Approximate()
result := &maintenancestats.DeleteUnreferencedPacksStats{
UnreferencedPackCount: unreferencedCount,
UnreferencedTotalSize: unreferencedSize,
RetainedPackCount: retainedCount,
RetainedTotalSize: retainedSize,
UnreferencedPackCount: uint64(unreferencedCount),
UnreferencedTotalSize: maintenancestats.ToUint64(unreferencedSize),
RetainedPackCount: uint64(retainedCount),
RetainedTotalSize: maintenancestats.ToUint64(retainedSize),
DeletedPackCount: 0,
DeletedTotalSize: 0,
}
@@ -167,8 +167,8 @@ func DeleteUnreferencedPacks(ctx context.Context, rep repo.DirectRepositoryWrite
}
deletedCount, deletedSize := deleted.Approximate()
result.DeletedPackCount = deletedCount
result.DeletedTotalSize = deletedSize
result.DeletedPackCount = uint64(deletedCount)
result.DeletedTotalSize = maintenancestats.ToUint64(deletedSize)
contentlog.Log1(ctx, log, "Completed deleting unreferenced pack blobs", result)

View File

@@ -10,14 +10,14 @@
// AdvanceEpochStats are the stats for advancing write epoch.
type AdvanceEpochStats struct {
CurrentEpoch int `json:"currentEpoch"`
WasAdvanced bool `json:"wasAdvanced"`
CurrentEpoch uint64 `json:"currentEpoch"`
WasAdvanced bool `json:"wasAdvanced"`
}
// WriteValueTo writes the stats to JSONWriter.
func (as *AdvanceEpochStats) WriteValueTo(jw *contentlog.JSONWriter) {
jw.BeginObjectField(as.Kind())
jw.IntField("currentEpoch", as.CurrentEpoch)
jw.UInt64Field("currentEpoch", as.CurrentEpoch)
jw.BoolField("wasAdvanced", as.WasAdvanced)
jw.EndObject()
}

View File

@@ -11,23 +11,23 @@
// CleanupLogsStats are the stats for cleaning up logs.
type CleanupLogsStats struct {
ToDeleteBlobCount int `json:"toDeleteBlobCount"`
ToDeleteBlobSize int64 `json:"toDeleteBlobSize"`
DeletedBlobCount int `json:"deletedBlobCount"`
DeletedBlobSize int64 `json:"deletedBlobSize"`
RetainedBlobCount int `json:"retainedBlobCount"`
RetainedBlobSize int64 `json:"retainedBlobSize"`
ToDeleteBlobCount uint64 `json:"toDeleteBlobCount"`
ToDeleteBlobSize uint64 `json:"toDeleteBlobSize"`
DeletedBlobCount uint64 `json:"deletedBlobCount"`
DeletedBlobSize uint64 `json:"deletedBlobSize"`
RetainedBlobCount uint64 `json:"retainedBlobCount"`
RetainedBlobSize uint64 `json:"retainedBlobSize"`
}
// WriteValueTo writes the stats to JSONWriter.
func (cs *CleanupLogsStats) WriteValueTo(jw *contentlog.JSONWriter) {
jw.BeginObjectField(cs.Kind())
jw.IntField("toDeleteBlobCount", cs.ToDeleteBlobCount)
jw.Int64Field("toDeleteBlobSize", cs.ToDeleteBlobSize)
jw.IntField("deletedBlobCount", cs.DeletedBlobCount)
jw.Int64Field("deletedBlobSize", cs.DeletedBlobSize)
jw.IntField("retainedBlobCount", cs.RetainedBlobCount)
jw.Int64Field("retainedBlobSize", cs.RetainedBlobSize)
jw.UInt64Field("toDeleteBlobCount", cs.ToDeleteBlobCount)
jw.UInt64Field("toDeleteBlobSize", cs.ToDeleteBlobSize)
jw.UInt64Field("deletedBlobCount", cs.DeletedBlobCount)
jw.UInt64Field("deletedBlobSize", cs.DeletedBlobSize)
jw.UInt64Field("retainedBlobCount", cs.RetainedBlobCount)
jw.UInt64Field("retainedBlobSize", cs.RetainedBlobSize)
jw.EndObject()
}

View File

@@ -10,15 +10,15 @@
// CleanupMarkersStats are the stats for cleaning up markers.
type CleanupMarkersStats struct {
DeletedEpochMarkerBlobCount int `json:"deletedEpochMarkerBlobCount"`
DeletedWatermarkBlobCount int `json:"deletedWatermarkBlobCount"`
DeletedEpochMarkerBlobCount uint64 `json:"deletedEpochMarkerBlobCount"`
DeletedWatermarkBlobCount uint64 `json:"deletedWatermarkBlobCount"`
}
// WriteValueTo writes the stats to JSONWriter.
func (cs *CleanupMarkersStats) WriteValueTo(jw *contentlog.JSONWriter) {
jw.BeginObjectField(cs.Kind())
jw.IntField("deletedEpochMarkerBlobCount", cs.DeletedEpochMarkerBlobCount)
jw.IntField("deletedWatermarkBlobCount", cs.DeletedWatermarkBlobCount)
jw.UInt64Field("deletedEpochMarkerBlobCount", cs.DeletedEpochMarkerBlobCount)
jw.UInt64Field("deletedWatermarkBlobCount", cs.DeletedWatermarkBlobCount)
jw.EndObject()
}

View File

@@ -13,16 +13,16 @@
// CleanupSupersededIndexesStats are the stats for cleaning up superseded indexes.
type CleanupSupersededIndexesStats struct {
MaxReplacementTime time.Time `json:"maxReplacementTime"`
DeletedBlobCount int `json:"deletedBlobCount"`
DeletedTotalSize int64 `json:"deletedTotalSize"`
DeletedBlobCount uint64 `json:"deletedBlobCount"`
DeletedTotalSize uint64 `json:"deletedTotalSize"`
}
// WriteValueTo writes the stats to JSONWriter.
func (cs *CleanupSupersededIndexesStats) WriteValueTo(jw *contentlog.JSONWriter) {
jw.BeginObjectField(cs.Kind())
jw.TimeField("maxReplacementTime", cs.MaxReplacementTime)
jw.IntField("deletedBlobCount", cs.DeletedBlobCount)
jw.Int64Field("deletedTotalSize", cs.DeletedTotalSize)
jw.UInt64Field("deletedBlobCount", cs.DeletedBlobCount)
jw.UInt64Field("deletedTotalSize", cs.DeletedTotalSize)
jw.EndObject()
}

View File

@@ -11,17 +11,17 @@
// CompactSingleEpochStats are the stats for compacting an index epoch.
type CompactSingleEpochStats struct {
SupersededIndexBlobCount int `json:"supersededIndexBlobCount"`
SupersededIndexTotalSize int64 `json:"supersededIndexTotalSize"`
Epoch int `json:"epoch"`
SupersededIndexBlobCount uint64 `json:"supersededIndexBlobCount"`
SupersededIndexTotalSize uint64 `json:"supersededIndexTotalSize"`
Epoch uint64 `json:"epoch"`
}
// WriteValueTo writes the stats to JSONWriter.
func (cs *CompactSingleEpochStats) WriteValueTo(jw *contentlog.JSONWriter) {
jw.BeginObjectField(cs.Kind())
jw.IntField("supersededIndexBlobCount", cs.SupersededIndexBlobCount)
jw.Int64Field("supersededIndexTotalSize", cs.SupersededIndexTotalSize)
jw.IntField("epoch", cs.Epoch)
jw.UInt64Field("supersededIndexBlobCount", cs.SupersededIndexBlobCount)
jw.UInt64Field("supersededIndexTotalSize", cs.SupersededIndexTotalSize)
jw.UInt64Field("epoch", cs.Epoch)
jw.EndObject()
}

View File

@@ -11,23 +11,23 @@
// DeleteUnreferencedPacksStats are the stats for deleting unreferenced packs.
type DeleteUnreferencedPacksStats struct {
UnreferencedPackCount uint32 `json:"unreferencedPackCount"`
UnreferencedTotalSize int64 `json:"unreferencedTotalSize"`
DeletedPackCount uint32 `json:"deletedPackCount"`
DeletedTotalSize int64 `json:"deletedTotalSize"`
RetainedPackCount uint32 `json:"retainedPackCount"`
RetainedTotalSize int64 `json:"retainedTotalSize"`
UnreferencedPackCount uint64 `json:"unreferencedPackCount"`
UnreferencedTotalSize uint64 `json:"unreferencedTotalSize"`
DeletedPackCount uint64 `json:"deletedPackCount"`
DeletedTotalSize uint64 `json:"deletedTotalSize"`
RetainedPackCount uint64 `json:"retainedPackCount"`
RetainedTotalSize uint64 `json:"retainedTotalSize"`
}
// WriteValueTo writes the stats to JSONWriter.
func (ds *DeleteUnreferencedPacksStats) WriteValueTo(jw *contentlog.JSONWriter) {
jw.BeginObjectField(ds.Kind())
jw.UInt32Field("unreferencedPackCount", ds.UnreferencedPackCount)
jw.Int64Field("unreferencedTotalSize", ds.UnreferencedTotalSize)
jw.UInt32Field("deletedPackCount", ds.DeletedPackCount)
jw.Int64Field("deletedTotalSize", ds.DeletedTotalSize)
jw.UInt32Field("retainedPackCount", ds.RetainedPackCount)
jw.Int64Field("retainedTotalSize", ds.RetainedTotalSize)
jw.UInt64Field("unreferencedPackCount", ds.UnreferencedPackCount)
jw.UInt64Field("unreferencedTotalSize", ds.UnreferencedTotalSize)
jw.UInt64Field("deletedPackCount", ds.DeletedPackCount)
jw.UInt64Field("deletedTotalSize", ds.DeletedTotalSize)
jw.UInt64Field("retainedPackCount", ds.RetainedPackCount)
jw.UInt64Field("retainedTotalSize", ds.RetainedTotalSize)
jw.EndObject()
}

View File

@@ -10,16 +10,16 @@
// ExtendBlobRetentionStats are the stats for extending blob retention time.
type ExtendBlobRetentionStats struct {
ToExtendBlobCount uint32 `json:"toExtendBlobCount"`
ExtendedBlobCount uint32 `json:"extendedBlobCount"`
ToExtendBlobCount uint64 `json:"toExtendBlobCount"`
ExtendedBlobCount uint64 `json:"extendedBlobCount"`
RetentionPeriod string `json:"retentionPeriod"`
}
// WriteValueTo writes the stats to JSONWriter.
func (es *ExtendBlobRetentionStats) WriteValueTo(jw *contentlog.JSONWriter) {
jw.BeginObjectField(es.Kind())
jw.UInt32Field("toExtendBlobCount", es.ToExtendBlobCount)
jw.UInt32Field("extendedBlobCount", es.ExtendedBlobCount)
jw.UInt64Field("toExtendBlobCount", es.ToExtendBlobCount)
jw.UInt64Field("extendedBlobCount", es.ExtendedBlobCount)
jw.StringField("retentionPeriod", es.RetentionPeriod)
jw.EndObject()
}

View File

@@ -10,15 +10,15 @@
// GenerateRangeCheckpointStats are the stats for generating range checkpoints.
type GenerateRangeCheckpointStats struct {
RangeMinEpoch int `json:"rangeMinEpoch"`
RangeMaxEpoch int `json:"rangeMaxEpoch"`
RangeMinEpoch uint64 `json:"rangeMinEpoch"`
RangeMaxEpoch uint64 `json:"rangeMaxEpoch"`
}
// WriteValueTo writes the stats to JSONWriter.
func (gs *GenerateRangeCheckpointStats) WriteValueTo(jw *contentlog.JSONWriter) {
jw.BeginObjectField(gs.Kind())
jw.IntField("rangeMinEpoch", gs.RangeMinEpoch)
jw.IntField("rangeMaxEpoch", gs.RangeMaxEpoch)
jw.UInt64Field("rangeMinEpoch", gs.RangeMinEpoch)
jw.UInt64Field("rangeMaxEpoch", gs.RangeMaxEpoch)
jw.EndObject()
}

View File

@@ -11,23 +11,23 @@
// RewriteContentsStats are the stats for rewriting contents.
type RewriteContentsStats struct {
ToRewriteContentCount int `json:"toRewriteContentCount"`
ToRewriteContentSize int64 `json:"toRewriteContentSize"`
RewrittenContentCount int `json:"rewrittenContentCount"`
RewrittenContentSize int64 `json:"rewrittenContentSize"`
RetainedContentCount int `json:"retainedContentCount"`
RetainedContentSize int64 `json:"retainedContentSize"`
ToRewriteContentCount uint64 `json:"toRewriteContentCount"`
ToRewriteContentSize uint64 `json:"toRewriteContentSize"`
RewrittenContentCount uint64 `json:"rewrittenContentCount"`
RewrittenContentSize uint64 `json:"rewrittenContentSize"`
RetainedContentCount uint64 `json:"retainedContentCount"`
RetainedContentSize uint64 `json:"retainedContentSize"`
}
// WriteValueTo writes the stats to JSONWriter.
func (rs *RewriteContentsStats) WriteValueTo(jw *contentlog.JSONWriter) {
jw.BeginObjectField(rs.Kind())
jw.IntField("toRewriteContentCount", rs.ToRewriteContentCount)
jw.Int64Field("toRewriteContentSize", rs.ToRewriteContentSize)
jw.IntField("rewrittenContentCount", rs.RewrittenContentCount)
jw.Int64Field("rewrittenContentSize", rs.RewrittenContentSize)
jw.IntField("retainedContentCount", rs.RetainedContentCount)
jw.Int64Field("retainedContentSize", rs.RetainedContentSize)
jw.UInt64Field("toRewriteContentCount", rs.ToRewriteContentCount)
jw.UInt64Field("toRewriteContentSize", rs.ToRewriteContentSize)
jw.UInt64Field("rewrittenContentCount", rs.RewrittenContentCount)
jw.UInt64Field("rewrittenContentSize", rs.RewrittenContentSize)
jw.UInt64Field("retainedContentCount", rs.RetainedContentCount)
jw.UInt64Field("retainedContentSize", rs.RetainedContentSize)
jw.EndObject()
}

View File

@@ -10,35 +10,35 @@
// SnapshotGCStats delivers are the stats for snapshot GC.
type SnapshotGCStats struct {
UnreferencedContentCount uint32 `json:"unreferencedContentCount"`
UnreferencedContentSize int64 `json:"unreferencedContentSize"`
DeletedContentCount uint32 `json:"deletedContentCount"`
DeletedContentSize int64 `json:"deletedContentSize"`
UnreferencedRecentContentCount uint32 `json:"unreferencedRecentContentCount"`
UnreferencedRecentContentSize int64 `json:"unreferencedRecentContentSize"`
InUseContentCount uint32 `json:"inUseContentCount"`
InUseContentSize int64 `json:"inUseContentSize"`
InUseSystemContentCount uint32 `json:"inUseSystemContentCount"`
InUseSystemContentSize int64 `json:"inUseSystemContentSize"`
RecoveredContentCount uint32 `json:"recoveredContentCount"`
RecoveredContentSize int64 `json:"recoveredContentSize"`
UnreferencedContentCount uint64 `json:"unreferencedContentCount"`
UnreferencedContentSize uint64 `json:"unreferencedContentSize"`
DeletedContentCount uint64 `json:"deletedContentCount"`
DeletedContentSize uint64 `json:"deletedContentSize"`
UnreferencedRecentContentCount uint64 `json:"unreferencedRecentContentCount"`
UnreferencedRecentContentSize uint64 `json:"unreferencedRecentContentSize"`
InUseContentCount uint64 `json:"inUseContentCount"`
InUseContentSize uint64 `json:"inUseContentSize"`
InUseSystemContentCount uint64 `json:"inUseSystemContentCount"`
InUseSystemContentSize uint64 `json:"inUseSystemContentSize"`
RecoveredContentCount uint64 `json:"recoveredContentCount"`
RecoveredContentSize uint64 `json:"recoveredContentSize"`
}
// WriteValueTo writes the stats to JSONWriter.
func (ss *SnapshotGCStats) WriteValueTo(jw *contentlog.JSONWriter) {
jw.BeginObjectField(ss.Kind())
jw.UInt32Field("unreferencedContentCount", ss.UnreferencedContentCount)
jw.Int64Field("unreferencedContentSize", ss.UnreferencedContentSize)
jw.UInt32Field("deletedContentCount", ss.DeletedContentCount)
jw.Int64Field("deletedContentSize", ss.DeletedContentSize)
jw.UInt32Field("unreferencedRecentContentCount", ss.UnreferencedRecentContentCount)
jw.Int64Field("unreferencedRecentContentSize", ss.UnreferencedRecentContentSize)
jw.UInt32Field("inUseContentCount", ss.InUseContentCount)
jw.Int64Field("inUseContentSize", ss.InUseContentSize)
jw.UInt32Field("inUseSystemContentCount", ss.InUseSystemContentCount)
jw.Int64Field("inUseSystemContentSize", ss.InUseSystemContentSize)
jw.UInt32Field("recoveredContentCount", ss.RecoveredContentCount)
jw.Int64Field("recoveredContentSize", ss.RecoveredContentSize)
jw.UInt64Field("unreferencedContentCount", ss.UnreferencedContentCount)
jw.UInt64Field("unreferencedContentSize", ss.UnreferencedContentSize)
jw.UInt64Field("deletedContentCount", ss.DeletedContentCount)
jw.UInt64Field("deletedContentSize", ss.DeletedContentSize)
jw.UInt64Field("unreferencedRecentContentCount", ss.UnreferencedRecentContentCount)
jw.UInt64Field("unreferencedRecentContentSize", ss.UnreferencedRecentContentSize)
jw.UInt64Field("inUseContentCount", ss.InUseContentCount)
jw.UInt64Field("inUseContentSize", ss.InUseContentSize)
jw.UInt64Field("inUseSystemContentCount", ss.InUseSystemContentCount)
jw.UInt64Field("inUseSystemContentSize", ss.InUseSystemContentSize)
jw.UInt64Field("recoveredContentCount", ss.RecoveredContentCount)
jw.UInt64Field("recoveredContentSize", ss.RecoveredContentSize)
jw.EndObject()
}

View File

@@ -0,0 +1,41 @@
package maintenancestats
import (
"math"
"testing"
"github.com/stretchr/testify/require"
)
func TestToUint64(t *testing.T) {
cases := []struct {
in int
expected uint64
}{
{
in: math.MinInt,
expected: 0,
},
{
in: -1,
expected: 0,
},
{
in: 0,
expected: 0,
},
{
in: 1,
expected: 1,
},
{
in: math.MaxInt,
expected: math.MaxInt,
},
}
for _, c := range cases {
v := ToUint64(c.in)
require.Equal(t, c.expected, v)
}
}

View File

@@ -0,0 +1,23 @@
package maintenancestats
import (
"log"
"golang.org/x/time/rate"
)
var limit = rate.Sometimes{First: 10} //nolint:mnd
// ToUint64 converts v from a signed integer type T to uint64 while checking that
// the value is non-negative. It returns 0 for negative values.
func ToUint64[T int8 | int16 | int32 | int | int64](v T) uint64 {
if v >= 0 {
return uint64(v)
}
limit.Do(func() {
log.Println("warning, converting negative value to uint64:", v)
})
return 0
}

View File

@@ -214,28 +214,28 @@ func buildGCResult(unused, inUse, system, tooRecent, undeleted, deleted *stats.C
result := &maintenancestats.SnapshotGCStats{}
cnt, size := unused.Approximate()
result.UnreferencedContentCount = cnt
result.UnreferencedContentSize = size
result.UnreferencedContentCount = uint64(cnt)
result.UnreferencedContentSize = maintenancestats.ToUint64(size)
cnt, size = tooRecent.Approximate()
result.UnreferencedRecentContentCount = cnt
result.UnreferencedRecentContentSize = size
result.UnreferencedRecentContentCount = uint64(cnt)
result.UnreferencedRecentContentSize = maintenancestats.ToUint64(size)
cnt, size = inUse.Approximate()
result.InUseContentCount = cnt
result.InUseContentSize = size
result.InUseContentCount = uint64(cnt)
result.InUseContentSize = maintenancestats.ToUint64(size)
cnt, size = system.Approximate()
result.InUseSystemContentCount = cnt
result.InUseSystemContentSize = size
result.InUseSystemContentCount = uint64(cnt)
result.InUseSystemContentSize = maintenancestats.ToUint64(size)
cnt, size = undeleted.Approximate()
result.RecoveredContentCount = cnt
result.RecoveredContentSize = size
result.RecoveredContentCount = uint64(cnt)
result.RecoveredContentSize = maintenancestats.ToUint64(size)
cnt, size = deleted.Approximate()
result.DeletedContentCount = cnt
result.DeletedContentSize = size
result.DeletedContentCount = uint64(cnt)
result.DeletedContentSize = maintenancestats.ToUint64(size)
return result
}