feat(general): reduce memory usage in maintenance, snapshot fix and verify (#2365)

This commit is contained in:
Jarek Kowalski
2022-09-10 09:36:17 -07:00
committed by GitHub
parent 28ce29eab4
commit 645e680a8f
13 changed files with 181 additions and 64 deletions

View File

@@ -64,12 +64,16 @@ func failedEntryCallback(rep repo.RepositoryWriter, enumVal string) snapshotfs.R
}
func (c *commonRewriteSnapshots) rewriteMatchingSnapshots(ctx context.Context, rep repo.RepositoryWriter, rewrite snapshotfs.RewriteDirEntryCallback) error {
rw := snapshotfs.NewDirRewriter(rep, snapshotfs.DirRewriterOptions{
rw, err := snapshotfs.NewDirRewriter(ctx, rep, snapshotfs.DirRewriterOptions{
Parallel: c.parallel,
RewriteEntry: rewrite,
OnDirectoryReadFailure: failedEntryCallback(rep, c.invalidDirHandling),
})
defer rw.Close()
if err != nil {
return errors.Wrap(err, "unable to create dir rewriter")
}
defer rw.Close(ctx)
var updatedSnapshots int

View File

@@ -75,7 +75,7 @@ func ShouldReduceTestComplexity() bool {
return true
}
return strings.Contains(runtime.GOARCH, "arm")
return strings.Contains(runtime.GOARCH, "arm") && runtime.GOOS != "darwin"
}
// ShouldSkipUnicodeFilenames returns true if:

View File

@@ -9,6 +9,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/bigmap"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content/index"
)
@@ -228,7 +229,12 @@ func(ci Info) error {
// IterateUnreferencedBlobs returns the list of unreferenced storage blobs.
func (bm *WriteManager) IterateUnreferencedBlobs(ctx context.Context, blobPrefixes []blob.ID, parallellism int, callback func(blob.Metadata) error) error {
var usedPacks sync.Map
usedPacks, err := bigmap.NewSet(ctx)
if err != nil {
return errors.Wrap(err, "new set")
}
defer usedPacks.Close(ctx)
bm.log.Debugf("determining blobs in use")
// find packs in use
@@ -240,7 +246,7 @@ func (bm *WriteManager) IterateUnreferencedBlobs(ctx context.Context, blobPrefix
},
func(pi PackInfo) error {
if pi.ContentCount > 0 {
usedPacks.Store(pi.PackID, struct{}{})
usedPacks.Put(ctx, []byte(pi.PackID))
}
return nil
}); err != nil {
@@ -270,7 +276,7 @@ func(pi PackInfo) error {
if err := blob.IterateAllPrefixesInParallel(ctx, parallellism, bm.st, prefixes,
func(bm blob.Metadata) error {
if _, ok := usedPacks.Load(bm.BlobID); ok {
if usedPacks.Contains([]byte(bm.BlobID)) {
return nil
}

View File

@@ -108,6 +108,19 @@ func (i ID) AppendToLogBuffer(sb *logging.Buffer) {
sb.AppendBytes(buf[0 : i.idLen*2])
}
// Append appends content ID to the slice.
func (i ID) Append(out []byte) []byte {
var buf [128]byte
if i.prefix != 0 {
out = append(out, i.prefix)
}
hex.Encode(buf[0:i.idLen*2], i.data[0:i.idLen])
return append(out, buf[0:i.idLen*2]...)
}
// String returns a string representation of ID.
func (i ID) String() string {
return string(i.Prefix()) + hex.EncodeToString(i.data[:i.idLen])

View File

@@ -2,6 +2,7 @@
import (
"encoding/json"
"fmt"
"testing"
"github.com/stretchr/testify/require"
@@ -122,6 +123,8 @@ func TestIDHash(t *testing.T) {
require.NoError(t, err)
require.Equal(t, h, cid.Hash())
require.Equal(t, fmt.Sprintf("%v%x", prefix, h), string(cid.Append(nil)))
require.Equal(t, prefix != "", cid.HasPrefix())
require.Equal(t, prefix, cid.Prefix())
}

View File

@@ -80,6 +80,19 @@ func (i ID) String() string {
return indirectPrefix + compressionPrefix + i.cid.String()
}
// Append appends string representation of ObjectID that is suitable for displaying in the UI.
func (i ID) Append(out []byte) []byte {
for j := 0; j < int(i.indirection); j++ {
out = append(out, 'I')
}
if i.compression {
out = append(out, 'Z')
}
return i.cid.Append(out)
}
// IndexObjectID returns the object ID of the underlying index object.
func (i ID) IndexObjectID() (ID, bool) {
if i.indirection > 0 {

View File

@@ -67,6 +67,21 @@ func TestToStrings(t *testing.T) {
require.Equal(t, []string{"f0f0", "f1f1"}, strs)
}
func TestString(t *testing.T) {
cases := map[ID]string{
EmptyID: "",
mustParseID(t, "Dabcd"): "abcd",
mustParseID(t, "abcd"): "abcd",
mustParseID(t, "IIabcd"): "IIabcd",
mustParseID(t, "Zabcd"): "Zabcd",
}
for id, str := range cases {
require.Equal(t, str, id.String())
require.Equal(t, str, string(id.Append(nil)))
}
}
func mustParseID(t *testing.T, s string) ID {
t.Helper()

View File

@@ -7,10 +7,10 @@
"encoding/json"
"path"
"runtime"
"sync"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/bigmap"
"github.com/kopia/kopia/internal/impossible"
"github.com/kopia/kopia/internal/workshare"
"github.com/kopia/kopia/repo"
@@ -53,7 +53,7 @@ type DirRewriter struct {
ws *workshare.Pool
opts DirRewriterOptions
cache sync.Map
cache *bigmap.Map
rep repo.RepositoryWriter
}
@@ -91,9 +91,16 @@ func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath stri
key := rw.getCacheKey(input)
// see if we already processed this exact directory entry
if v, ok := rw.cache.Load(key); ok {
//nolint:forcetypeassert
return v.(*snapshot.DirEntry).Clone(), nil
cached, ok, err := rw.cache.Get(ctx, nil, key[:])
if err != nil {
return nil, errors.Wrap(err, "cache get")
}
if ok {
de := &snapshot.DirEntry{}
jerr := json.Unmarshal(cached, de)
return de, errors.Wrap(jerr, "json unmarshal")
}
// entry not cached yet, run rewriter
@@ -114,10 +121,14 @@ func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath stri
result = rep2
}
actual, _ := rw.cache.LoadOrStore(key, result.Clone())
v, err := json.Marshal(result)
if err != nil {
return nil, errors.Wrap(err, "unable to marshal JSON")
}
//nolint:forcetypeassert
return actual.(*snapshot.DirEntry), nil
rw.cache.PutIfAbsent(ctx, key[:], v)
return result, nil
}
func (rw *DirRewriter) processDirectory(ctx context.Context, pathFromRoot string, entry *snapshot.DirEntry) (*snapshot.DirEntry, error) {
@@ -232,8 +243,10 @@ func (rw *DirRewriter) RewriteSnapshotManifest(ctx context.Context, man *snapsho
}
// Close closes the rewriter.
func (rw *DirRewriter) Close() {
func (rw *DirRewriter) Close(ctx context.Context) {
rw.ws.Close()
rw.cache.Close(ctx)
}
// RewriteKeep is a callback that keeps the unreadable entry.
@@ -294,7 +307,7 @@ func RewriteRemove(ctx context.Context, parentPath string, entry *snapshot.DirEn
}
// NewDirRewriter creates a new directory rewriter.
func NewDirRewriter(rep repo.RepositoryWriter, opts DirRewriterOptions) *DirRewriter {
func NewDirRewriter(ctx context.Context, rep repo.RepositoryWriter, opts DirRewriterOptions) (*DirRewriter, error) {
if opts.Parallel == 0 {
opts.Parallel = runtime.NumCPU()
}
@@ -303,9 +316,15 @@ func NewDirRewriter(rep repo.RepositoryWriter, opts DirRewriterOptions) *DirRewr
opts.OnDirectoryReadFailure = RewriteFail
}
return &DirRewriter{
ws: workshare.NewPool(opts.Parallel - 1),
opts: opts,
rep: rep,
cache, err := bigmap.NewMap(ctx)
if err != nil {
return nil, errors.Wrap(err, "new map")
}
return &DirRewriter{
ws: workshare.NewPool(opts.Parallel - 1),
opts: opts,
rep: rep,
cache: cache,
}, nil
}

View File

@@ -2,13 +2,13 @@
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/bigmap"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot"
@@ -24,9 +24,14 @@ func CalculateStorageStats(ctx context.Context, rep repo.Repository, manifests [
unique := new(snapshot.StorageUsageDetails)
runningTotal := new(snapshot.StorageUsageDetails)
var uniqueContents sync.Map
uniqueContents, err := bigmap.NewSet(ctx)
if err != nil {
return errors.Wrap(err, "NewSet")
}
tw := NewTreeWalker(TreeWalkerOptions{
defer uniqueContents.Close(ctx)
tw, twerr := NewTreeWalker(ctx, TreeWalkerOptions{
EntryCallback: func(ctx context.Context, entry fs.Entry, oid object.ID, entryPath string) error {
if !entry.IsDir() {
atomic.AddInt32(&unique.FileObjectCount, 1)
@@ -44,8 +49,10 @@ func CalculateStorageStats(ctx context.Context, rep repo.Repository, manifests [
return errors.Wrapf(err, "error verifying object %v", oid)
}
var cidbuf [128]byte
for _, cid := range contentIDs {
if _, loaded := uniqueContents.LoadOrStore(cid, struct{}{}); !loaded {
if uniqueContents.Put(ctx, cid.Append(cidbuf[:0])) {
atomic.AddInt32(&unique.ContentCount, 1)
atomic.AddInt32(&runningTotal.ContentCount, 1)
@@ -71,7 +78,10 @@ func CalculateStorageStats(ctx context.Context, rep repo.Repository, manifests [
return nil
},
})
defer tw.Close()
if twerr != nil {
return errors.Wrap(twerr, "tree walker")
}
defer tw.Close(ctx)
src := manifests[0].Source

View File

@@ -9,6 +9,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/bigmap"
"github.com/kopia/kopia/internal/workshare"
"github.com/kopia/kopia/repo/object"
)
@@ -23,7 +24,7 @@
type TreeWalker struct {
options TreeWalkerOptions
enqueued sync.Map
enqueued *bigmap.Set
wp *workshare.Pool
mu sync.Mutex
@@ -85,9 +86,10 @@ func (w *TreeWalker) TooManyErrors() bool {
return w.numErrors >= w.options.MaxErrors
}
func (w *TreeWalker) alreadyProcessed(e fs.Entry) bool {
_, existing := w.enqueued.LoadOrStore(oidOf(e), struct{}{})
return existing
func (w *TreeWalker) alreadyProcessed(ctx context.Context, e fs.Entry) bool {
var idbuf [128]byte
return !w.enqueued.Put(ctx, oidOf(e).Append(idbuf[:0]))
}
func (w *TreeWalker) processEntry(ctx context.Context, e fs.Entry, entryPath string) {
@@ -117,7 +119,7 @@ type errStop struct {
return errStop{errors.New("")}
}
if w.alreadyProcessed(ent) {
if w.alreadyProcessed(ctx, ent) {
return nil
}
@@ -146,7 +148,7 @@ func (w *TreeWalker) Process(ctx context.Context, e fs.Entry, entryPath string)
return errors.Errorf("entry does not have ObjectID")
}
if w.alreadyProcessed(e) {
if w.alreadyProcessed(ctx, e) {
return nil
}
@@ -156,8 +158,9 @@ func (w *TreeWalker) Process(ctx context.Context, e fs.Entry, entryPath string)
}
// Close closes the tree walker.
func (w *TreeWalker) Close() {
func (w *TreeWalker) Close(ctx context.Context) {
w.wp.Close()
w.enqueued.Close(ctx)
}
// TreeWalkerOptions provides optional fields for TreeWalker.
@@ -169,7 +172,7 @@ type TreeWalkerOptions struct {
}
// NewTreeWalker creates new tree walker.
func NewTreeWalker(options TreeWalkerOptions) *TreeWalker {
func NewTreeWalker(ctx context.Context, options TreeWalkerOptions) (*TreeWalker, error) {
if options.Parallelism <= 0 {
options.Parallelism = runtime.NumCPU() * walkersPerCPU
}
@@ -178,8 +181,14 @@ func NewTreeWalker(options TreeWalkerOptions) *TreeWalker {
options.MaxErrors = 1
}
return &TreeWalker{
options: options,
wp: workshare.NewPool(options.Parallelism - 1),
s, err := bigmap.NewSet(ctx)
if err != nil {
return nil, errors.Wrap(err, "NewSet")
}
return &TreeWalker{
options: options,
wp: workshare.NewPool(options.Parallelism - 1),
enqueued: s,
}, nil
}

View File

@@ -19,16 +19,19 @@
func TestSnapshotTreeWalker(t *testing.T) {
callbackCounter := new(int32)
w := snapshotfs.NewTreeWalker(
ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant)
w, err := snapshotfs.NewTreeWalker(
ctx,
snapshotfs.TreeWalkerOptions{
EntryCallback: func(ctx context.Context, entry fs.Entry, oid object.ID, entryPath string) error {
atomic.AddInt32(callbackCounter, 1)
return nil
},
})
defer w.Close()
require.NoError(t, err)
ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant)
defer w.Close(ctx)
sourceRoot := mockfs.NewDirectory()
require.Error(t, w.Process(ctx, sourceRoot, "."))
@@ -79,7 +82,10 @@ func TestSnapshotTreeWalker(t *testing.T) {
func TestSnapshotTreeWalker_Errors(t *testing.T) {
someErr1 := errors.Errorf("some error")
w := snapshotfs.NewTreeWalker(
ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant)
w, err := snapshotfs.NewTreeWalker(
ctx,
snapshotfs.TreeWalkerOptions{
Parallelism: 1,
EntryCallback: func(ctx context.Context, entry fs.Entry, oid object.ID, entryPath string) error {
@@ -90,9 +96,9 @@ func TestSnapshotTreeWalker_Errors(t *testing.T) {
return nil
},
})
defer w.Close()
require.NoError(t, err)
ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant)
defer w.Close(ctx)
sourceRoot := mockfs.NewDirectory()
require.Error(t, w.Process(ctx, sourceRoot, "root-dir"))
@@ -118,7 +124,10 @@ func TestSnapshotTreeWalker_Errors(t *testing.T) {
func TestSnapshotTreeWalker_MultipleErrors(t *testing.T) {
someErr1 := errors.Errorf("some error")
w := snapshotfs.NewTreeWalker(
ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant)
w, err := snapshotfs.NewTreeWalker(
ctx,
snapshotfs.TreeWalkerOptions{
Parallelism: 1,
MaxErrors: -1,
@@ -134,9 +143,9 @@ func TestSnapshotTreeWalker_MultipleErrors(t *testing.T) {
return nil
},
})
defer w.Close()
require.NoError(t, err)
ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant)
defer w.Close(ctx)
sourceRoot := mockfs.NewDirectory()
require.Error(t, w.Process(ctx, sourceRoot, "root-dir"))
@@ -165,7 +174,10 @@ func TestSnapshotTreeWalker_MultipleErrors(t *testing.T) {
func TestSnapshotTreeWalker_MultipleErrorsSameOID(t *testing.T) {
someErr1 := errors.Errorf("some error")
w := snapshotfs.NewTreeWalker(
ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant)
w, err := snapshotfs.NewTreeWalker(
ctx,
snapshotfs.TreeWalkerOptions{
Parallelism: 1,
MaxErrors: -1,
@@ -181,9 +193,9 @@ func TestSnapshotTreeWalker_MultipleErrorsSameOID(t *testing.T) {
return nil
},
})
defer w.Close()
require.NoError(t, err)
ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant)
defer w.Close(ctx)
sourceRoot := mockfs.NewDirectory()
require.Error(t, w.Process(ctx, sourceRoot, "root-dir"))

View File

@@ -129,12 +129,15 @@ type VerifierOptions struct {
// InParallel starts parallel verification and invokes the provided function which can call
// call Process() on in the provided TreeWalker.
func (v *Verifier) InParallel(ctx context.Context, enqueue func(tw *TreeWalker) error) error {
tw := NewTreeWalker(TreeWalkerOptions{
tw, twerr := NewTreeWalker(ctx, TreeWalkerOptions{
Parallelism: v.opts.Parallelism,
EntryCallback: v.verifyObject,
MaxErrors: v.opts.MaxErrors,
})
defer tw.Close()
if twerr != nil {
return errors.Wrap(twerr, "tree walker")
}
defer tw.Close(ctx)
v.fileWorkQueue = make(chan verifyFileWorkItem, v.opts.FileQueueLength)

View File

@@ -3,12 +3,12 @@
import (
"context"
"sync"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/bigmap"
"github.com/kopia/kopia/internal/stats"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
@@ -23,7 +23,7 @@
var log = logging.Module("snapshotgc")
func findInUseContentIDs(ctx context.Context, rep repo.Repository, used *sync.Map) error {
func findInUseContentIDs(ctx context.Context, rep repo.Repository, used *bigmap.Set) error {
ids, err := snapshot.ListSnapshotManifests(ctx, rep, nil, nil)
if err != nil {
return errors.Wrap(err, "unable to list snapshot manifest IDs")
@@ -34,21 +34,27 @@ func findInUseContentIDs(ctx context.Context, rep repo.Repository, used *sync.Ma
return errors.Wrap(err, "unable to load manifest IDs")
}
w := snapshotfs.NewTreeWalker(snapshotfs.TreeWalkerOptions{
w, twerr := snapshotfs.NewTreeWalker(ctx, snapshotfs.TreeWalkerOptions{
EntryCallback: func(ctx context.Context, entry fs.Entry, oid object.ID, entryPath string) error {
contentIDs, err := rep.VerifyObject(ctx, oid)
if err != nil {
return errors.Wrapf(err, "error verifying %v", oid)
contentIDs, verr := rep.VerifyObject(ctx, oid)
if verr != nil {
return errors.Wrapf(verr, "error verifying %v", oid)
}
var cidbuf [128]byte
for _, cid := range contentIDs {
used.Store(cid, nil)
used.Put(ctx, cid.Append(cidbuf[:0]))
}
return nil
},
})
defer w.Close()
if twerr != nil {
return errors.Wrap(err, "unable to create tree walker")
}
defer w.Close(ctx)
log(ctx).Infof("Looking for active contents...")
@@ -93,13 +99,15 @@ func Run(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete bool, sa
}
func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete bool, safety maintenance.SafetyParameters, maintenanceStartTime time.Time, st *Stats) error {
var (
used sync.Map
var unused, inUse, system, tooRecent, undeleted stats.CountSum
unused, inUse, system, tooRecent, undeleted stats.CountSum
)
used, serr := bigmap.NewSet(ctx)
if serr != nil {
return errors.Wrap(serr, "unable to create new set")
}
defer used.Close(ctx)
if err := findInUseContentIDs(ctx, rep, &used); err != nil {
if err := findInUseContentIDs(ctx, rep, used); err != nil {
return errors.Wrap(err, "unable to find in-use content ID")
}
@@ -113,7 +121,9 @@ func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete
return nil
}
if _, ok := used.Load(ci.GetContentID()); ok {
var cidbuf [128]byte
if used.Contains(ci.GetContentID().Append(cidbuf[:0])) {
if ci.GetDeleted() {
if err := rep.ContentManager().UndeleteContent(ctx, ci.GetContentID()); err != nil {
return errors.Wrapf(err, "Could not undelete referenced content: %v", ci)