diff --git a/cli/command_snapshot_fix.go b/cli/command_snapshot_fix.go index 51ad751fd..336809b06 100644 --- a/cli/command_snapshot_fix.go +++ b/cli/command_snapshot_fix.go @@ -64,12 +64,16 @@ func failedEntryCallback(rep repo.RepositoryWriter, enumVal string) snapshotfs.R } func (c *commonRewriteSnapshots) rewriteMatchingSnapshots(ctx context.Context, rep repo.RepositoryWriter, rewrite snapshotfs.RewriteDirEntryCallback) error { - rw := snapshotfs.NewDirRewriter(rep, snapshotfs.DirRewriterOptions{ + rw, err := snapshotfs.NewDirRewriter(ctx, rep, snapshotfs.DirRewriterOptions{ Parallel: c.parallel, RewriteEntry: rewrite, OnDirectoryReadFailure: failedEntryCallback(rep, c.invalidDirHandling), }) - defer rw.Close() + if err != nil { + return errors.Wrap(err, "unable to create dir rewriter") + } + + defer rw.Close(ctx) var updatedSnapshots int diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index 6fe3d5b99..60961ed4d 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -75,7 +75,7 @@ func ShouldReduceTestComplexity() bool { return true } - return strings.Contains(runtime.GOARCH, "arm") + return strings.Contains(runtime.GOARCH, "arm") && runtime.GOOS != "darwin" } // ShouldSkipUnicodeFilenames returns true if: diff --git a/repo/content/content_manager_iterate.go b/repo/content/content_manager_iterate.go index fee5fbd7d..af563f823 100644 --- a/repo/content/content_manager_iterate.go +++ b/repo/content/content_manager_iterate.go @@ -9,6 +9,7 @@ "github.com/pkg/errors" + "github.com/kopia/kopia/internal/bigmap" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/content/index" ) @@ -228,7 +229,12 @@ func(ci Info) error { // IterateUnreferencedBlobs returns the list of unreferenced storage blobs. func (bm *WriteManager) IterateUnreferencedBlobs(ctx context.Context, blobPrefixes []blob.ID, parallellism int, callback func(blob.Metadata) error) error { - var usedPacks sync.Map + usedPacks, err := bigmap.NewSet(ctx) + if err != nil { + return errors.Wrap(err, "new set") + } + + defer usedPacks.Close(ctx) bm.log.Debugf("determining blobs in use") // find packs in use @@ -240,7 +246,7 @@ func (bm *WriteManager) IterateUnreferencedBlobs(ctx context.Context, blobPrefix }, func(pi PackInfo) error { if pi.ContentCount > 0 { - usedPacks.Store(pi.PackID, struct{}{}) + usedPacks.Put(ctx, []byte(pi.PackID)) } return nil }); err != nil { @@ -270,7 +276,7 @@ func(pi PackInfo) error { if err := blob.IterateAllPrefixesInParallel(ctx, parallellism, bm.st, prefixes, func(bm blob.Metadata) error { - if _, ok := usedPacks.Load(bm.BlobID); ok { + if usedPacks.Contains([]byte(bm.BlobID)) { return nil } diff --git a/repo/content/index/id.go b/repo/content/index/id.go index 430f49a0f..e57e046ec 100644 --- a/repo/content/index/id.go +++ b/repo/content/index/id.go @@ -108,6 +108,19 @@ func (i ID) AppendToLogBuffer(sb *logging.Buffer) { sb.AppendBytes(buf[0 : i.idLen*2]) } +// Append appends content ID to the slice. +func (i ID) Append(out []byte) []byte { + var buf [128]byte + + if i.prefix != 0 { + out = append(out, i.prefix) + } + + hex.Encode(buf[0:i.idLen*2], i.data[0:i.idLen]) + + return append(out, buf[0:i.idLen*2]...) +} + // String returns a string representation of ID. func (i ID) String() string { return string(i.Prefix()) + hex.EncodeToString(i.data[:i.idLen]) diff --git a/repo/content/index/id_test.go b/repo/content/index/id_test.go index ee55ba795..ebcdb66cd 100644 --- a/repo/content/index/id_test.go +++ b/repo/content/index/id_test.go @@ -2,6 +2,7 @@ import ( "encoding/json" + "fmt" "testing" "github.com/stretchr/testify/require" @@ -122,6 +123,8 @@ func TestIDHash(t *testing.T) { require.NoError(t, err) require.Equal(t, h, cid.Hash()) + require.Equal(t, fmt.Sprintf("%v%x", prefix, h), string(cid.Append(nil))) + require.Equal(t, prefix != "", cid.HasPrefix()) require.Equal(t, prefix, cid.Prefix()) } diff --git a/repo/object/objectid.go b/repo/object/objectid.go index 663ed750b..d078cfd93 100644 --- a/repo/object/objectid.go +++ b/repo/object/objectid.go @@ -80,6 +80,19 @@ func (i ID) String() string { return indirectPrefix + compressionPrefix + i.cid.String() } +// Append appends string representation of ObjectID that is suitable for displaying in the UI. +func (i ID) Append(out []byte) []byte { + for j := 0; j < int(i.indirection); j++ { + out = append(out, 'I') + } + + if i.compression { + out = append(out, 'Z') + } + + return i.cid.Append(out) +} + // IndexObjectID returns the object ID of the underlying index object. func (i ID) IndexObjectID() (ID, bool) { if i.indirection > 0 { diff --git a/repo/object/objectid_test.go b/repo/object/objectid_test.go index 29dd916cd..61560b62a 100644 --- a/repo/object/objectid_test.go +++ b/repo/object/objectid_test.go @@ -67,6 +67,21 @@ func TestToStrings(t *testing.T) { require.Equal(t, []string{"f0f0", "f1f1"}, strs) } +func TestString(t *testing.T) { + cases := map[ID]string{ + EmptyID: "", + mustParseID(t, "Dabcd"): "abcd", + mustParseID(t, "abcd"): "abcd", + mustParseID(t, "IIabcd"): "IIabcd", + mustParseID(t, "Zabcd"): "Zabcd", + } + + for id, str := range cases { + require.Equal(t, str, id.String()) + require.Equal(t, str, string(id.Append(nil))) + } +} + func mustParseID(t *testing.T, s string) ID { t.Helper() diff --git a/snapshot/snapshotfs/dir_rewriter.go b/snapshot/snapshotfs/dir_rewriter.go index a98176fc8..5dbb6debb 100644 --- a/snapshot/snapshotfs/dir_rewriter.go +++ b/snapshot/snapshotfs/dir_rewriter.go @@ -7,10 +7,10 @@ "encoding/json" "path" "runtime" - "sync" "github.com/pkg/errors" + "github.com/kopia/kopia/internal/bigmap" "github.com/kopia/kopia/internal/impossible" "github.com/kopia/kopia/internal/workshare" "github.com/kopia/kopia/repo" @@ -53,7 +53,7 @@ type DirRewriter struct { ws *workshare.Pool opts DirRewriterOptions - cache sync.Map + cache *bigmap.Map rep repo.RepositoryWriter } @@ -91,9 +91,16 @@ func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath stri key := rw.getCacheKey(input) // see if we already processed this exact directory entry - if v, ok := rw.cache.Load(key); ok { - //nolint:forcetypeassert - return v.(*snapshot.DirEntry).Clone(), nil + cached, ok, err := rw.cache.Get(ctx, nil, key[:]) + if err != nil { + return nil, errors.Wrap(err, "cache get") + } + + if ok { + de := &snapshot.DirEntry{} + jerr := json.Unmarshal(cached, de) + + return de, errors.Wrap(jerr, "json unmarshal") } // entry not cached yet, run rewriter @@ -114,10 +121,14 @@ func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath stri result = rep2 } - actual, _ := rw.cache.LoadOrStore(key, result.Clone()) + v, err := json.Marshal(result) + if err != nil { + return nil, errors.Wrap(err, "unable to marshal JSON") + } - //nolint:forcetypeassert - return actual.(*snapshot.DirEntry), nil + rw.cache.PutIfAbsent(ctx, key[:], v) + + return result, nil } func (rw *DirRewriter) processDirectory(ctx context.Context, pathFromRoot string, entry *snapshot.DirEntry) (*snapshot.DirEntry, error) { @@ -232,8 +243,10 @@ func (rw *DirRewriter) RewriteSnapshotManifest(ctx context.Context, man *snapsho } // Close closes the rewriter. -func (rw *DirRewriter) Close() { +func (rw *DirRewriter) Close(ctx context.Context) { rw.ws.Close() + + rw.cache.Close(ctx) } // RewriteKeep is a callback that keeps the unreadable entry. @@ -294,7 +307,7 @@ func RewriteRemove(ctx context.Context, parentPath string, entry *snapshot.DirEn } // NewDirRewriter creates a new directory rewriter. -func NewDirRewriter(rep repo.RepositoryWriter, opts DirRewriterOptions) *DirRewriter { +func NewDirRewriter(ctx context.Context, rep repo.RepositoryWriter, opts DirRewriterOptions) (*DirRewriter, error) { if opts.Parallel == 0 { opts.Parallel = runtime.NumCPU() } @@ -303,9 +316,15 @@ func NewDirRewriter(rep repo.RepositoryWriter, opts DirRewriterOptions) *DirRewr opts.OnDirectoryReadFailure = RewriteFail } - return &DirRewriter{ - ws: workshare.NewPool(opts.Parallel - 1), - opts: opts, - rep: rep, + cache, err := bigmap.NewMap(ctx) + if err != nil { + return nil, errors.Wrap(err, "new map") } + + return &DirRewriter{ + ws: workshare.NewPool(opts.Parallel - 1), + opts: opts, + rep: rep, + cache: cache, + }, nil } diff --git a/snapshot/snapshotfs/snapshot_storage_stats.go b/snapshot/snapshotfs/snapshot_storage_stats.go index b5f06c3a2..d9ac7304e 100644 --- a/snapshot/snapshotfs/snapshot_storage_stats.go +++ b/snapshot/snapshotfs/snapshot_storage_stats.go @@ -2,13 +2,13 @@ import ( "context" - "sync" "sync/atomic" "time" "github.com/pkg/errors" "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/internal/bigmap" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/object" "github.com/kopia/kopia/snapshot" @@ -24,9 +24,14 @@ func CalculateStorageStats(ctx context.Context, rep repo.Repository, manifests [ unique := new(snapshot.StorageUsageDetails) runningTotal := new(snapshot.StorageUsageDetails) - var uniqueContents sync.Map + uniqueContents, err := bigmap.NewSet(ctx) + if err != nil { + return errors.Wrap(err, "NewSet") + } - tw := NewTreeWalker(TreeWalkerOptions{ + defer uniqueContents.Close(ctx) + + tw, twerr := NewTreeWalker(ctx, TreeWalkerOptions{ EntryCallback: func(ctx context.Context, entry fs.Entry, oid object.ID, entryPath string) error { if !entry.IsDir() { atomic.AddInt32(&unique.FileObjectCount, 1) @@ -44,8 +49,10 @@ func CalculateStorageStats(ctx context.Context, rep repo.Repository, manifests [ return errors.Wrapf(err, "error verifying object %v", oid) } + var cidbuf [128]byte + for _, cid := range contentIDs { - if _, loaded := uniqueContents.LoadOrStore(cid, struct{}{}); !loaded { + if uniqueContents.Put(ctx, cid.Append(cidbuf[:0])) { atomic.AddInt32(&unique.ContentCount, 1) atomic.AddInt32(&runningTotal.ContentCount, 1) @@ -71,7 +78,10 @@ func CalculateStorageStats(ctx context.Context, rep repo.Repository, manifests [ return nil }, }) - defer tw.Close() + if twerr != nil { + return errors.Wrap(twerr, "tree walker") + } + defer tw.Close(ctx) src := manifests[0].Source diff --git a/snapshot/snapshotfs/snapshot_tree_walker.go b/snapshot/snapshotfs/snapshot_tree_walker.go index 9757bd03b..94a80d4e1 100644 --- a/snapshot/snapshotfs/snapshot_tree_walker.go +++ b/snapshot/snapshotfs/snapshot_tree_walker.go @@ -9,6 +9,7 @@ "github.com/pkg/errors" "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/internal/bigmap" "github.com/kopia/kopia/internal/workshare" "github.com/kopia/kopia/repo/object" ) @@ -23,7 +24,7 @@ type TreeWalker struct { options TreeWalkerOptions - enqueued sync.Map + enqueued *bigmap.Set wp *workshare.Pool mu sync.Mutex @@ -85,9 +86,10 @@ func (w *TreeWalker) TooManyErrors() bool { return w.numErrors >= w.options.MaxErrors } -func (w *TreeWalker) alreadyProcessed(e fs.Entry) bool { - _, existing := w.enqueued.LoadOrStore(oidOf(e), struct{}{}) - return existing +func (w *TreeWalker) alreadyProcessed(ctx context.Context, e fs.Entry) bool { + var idbuf [128]byte + + return !w.enqueued.Put(ctx, oidOf(e).Append(idbuf[:0])) } func (w *TreeWalker) processEntry(ctx context.Context, e fs.Entry, entryPath string) { @@ -117,7 +119,7 @@ type errStop struct { return errStop{errors.New("")} } - if w.alreadyProcessed(ent) { + if w.alreadyProcessed(ctx, ent) { return nil } @@ -146,7 +148,7 @@ func (w *TreeWalker) Process(ctx context.Context, e fs.Entry, entryPath string) return errors.Errorf("entry does not have ObjectID") } - if w.alreadyProcessed(e) { + if w.alreadyProcessed(ctx, e) { return nil } @@ -156,8 +158,9 @@ func (w *TreeWalker) Process(ctx context.Context, e fs.Entry, entryPath string) } // Close closes the tree walker. -func (w *TreeWalker) Close() { +func (w *TreeWalker) Close(ctx context.Context) { w.wp.Close() + w.enqueued.Close(ctx) } // TreeWalkerOptions provides optional fields for TreeWalker. @@ -169,7 +172,7 @@ type TreeWalkerOptions struct { } // NewTreeWalker creates new tree walker. -func NewTreeWalker(options TreeWalkerOptions) *TreeWalker { +func NewTreeWalker(ctx context.Context, options TreeWalkerOptions) (*TreeWalker, error) { if options.Parallelism <= 0 { options.Parallelism = runtime.NumCPU() * walkersPerCPU } @@ -178,8 +181,14 @@ func NewTreeWalker(options TreeWalkerOptions) *TreeWalker { options.MaxErrors = 1 } - return &TreeWalker{ - options: options, - wp: workshare.NewPool(options.Parallelism - 1), + s, err := bigmap.NewSet(ctx) + if err != nil { + return nil, errors.Wrap(err, "NewSet") } + + return &TreeWalker{ + options: options, + wp: workshare.NewPool(options.Parallelism - 1), + enqueued: s, + }, nil } diff --git a/snapshot/snapshotfs/snapshot_tree_walker_test.go b/snapshot/snapshotfs/snapshot_tree_walker_test.go index 3d40b6892..4f9a85297 100644 --- a/snapshot/snapshotfs/snapshot_tree_walker_test.go +++ b/snapshot/snapshotfs/snapshot_tree_walker_test.go @@ -19,16 +19,19 @@ func TestSnapshotTreeWalker(t *testing.T) { callbackCounter := new(int32) - w := snapshotfs.NewTreeWalker( + ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant) + + w, err := snapshotfs.NewTreeWalker( + ctx, snapshotfs.TreeWalkerOptions{ EntryCallback: func(ctx context.Context, entry fs.Entry, oid object.ID, entryPath string) error { atomic.AddInt32(callbackCounter, 1) return nil }, }) - defer w.Close() + require.NoError(t, err) - ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant) + defer w.Close(ctx) sourceRoot := mockfs.NewDirectory() require.Error(t, w.Process(ctx, sourceRoot, ".")) @@ -79,7 +82,10 @@ func TestSnapshotTreeWalker(t *testing.T) { func TestSnapshotTreeWalker_Errors(t *testing.T) { someErr1 := errors.Errorf("some error") - w := snapshotfs.NewTreeWalker( + ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant) + + w, err := snapshotfs.NewTreeWalker( + ctx, snapshotfs.TreeWalkerOptions{ Parallelism: 1, EntryCallback: func(ctx context.Context, entry fs.Entry, oid object.ID, entryPath string) error { @@ -90,9 +96,9 @@ func TestSnapshotTreeWalker_Errors(t *testing.T) { return nil }, }) - defer w.Close() + require.NoError(t, err) - ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant) + defer w.Close(ctx) sourceRoot := mockfs.NewDirectory() require.Error(t, w.Process(ctx, sourceRoot, "root-dir")) @@ -118,7 +124,10 @@ func TestSnapshotTreeWalker_Errors(t *testing.T) { func TestSnapshotTreeWalker_MultipleErrors(t *testing.T) { someErr1 := errors.Errorf("some error") - w := snapshotfs.NewTreeWalker( + ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant) + + w, err := snapshotfs.NewTreeWalker( + ctx, snapshotfs.TreeWalkerOptions{ Parallelism: 1, MaxErrors: -1, @@ -134,9 +143,9 @@ func TestSnapshotTreeWalker_MultipleErrors(t *testing.T) { return nil }, }) - defer w.Close() + require.NoError(t, err) - ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant) + defer w.Close(ctx) sourceRoot := mockfs.NewDirectory() require.Error(t, w.Process(ctx, sourceRoot, "root-dir")) @@ -165,7 +174,10 @@ func TestSnapshotTreeWalker_MultipleErrors(t *testing.T) { func TestSnapshotTreeWalker_MultipleErrorsSameOID(t *testing.T) { someErr1 := errors.Errorf("some error") - w := snapshotfs.NewTreeWalker( + ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant) + + w, err := snapshotfs.NewTreeWalker( + ctx, snapshotfs.TreeWalkerOptions{ Parallelism: 1, MaxErrors: -1, @@ -181,9 +193,9 @@ func TestSnapshotTreeWalker_MultipleErrorsSameOID(t *testing.T) { return nil }, }) - defer w.Close() + require.NoError(t, err) - ctx, env := repotesting.NewEnvironment(t, repotesting.FormatNotImportant) + defer w.Close(ctx) sourceRoot := mockfs.NewDirectory() require.Error(t, w.Process(ctx, sourceRoot, "root-dir")) diff --git a/snapshot/snapshotfs/snapshot_verifier.go b/snapshot/snapshotfs/snapshot_verifier.go index 290745e18..5b376a11a 100644 --- a/snapshot/snapshotfs/snapshot_verifier.go +++ b/snapshot/snapshotfs/snapshot_verifier.go @@ -129,12 +129,15 @@ type VerifierOptions struct { // InParallel starts parallel verification and invokes the provided function which can call // call Process() on in the provided TreeWalker. func (v *Verifier) InParallel(ctx context.Context, enqueue func(tw *TreeWalker) error) error { - tw := NewTreeWalker(TreeWalkerOptions{ + tw, twerr := NewTreeWalker(ctx, TreeWalkerOptions{ Parallelism: v.opts.Parallelism, EntryCallback: v.verifyObject, MaxErrors: v.opts.MaxErrors, }) - defer tw.Close() + if twerr != nil { + return errors.Wrap(twerr, "tree walker") + } + defer tw.Close(ctx) v.fileWorkQueue = make(chan verifyFileWorkItem, v.opts.FileQueueLength) diff --git a/snapshot/snapshotgc/gc.go b/snapshot/snapshotgc/gc.go index 318fed96f..ef2c400ea 100644 --- a/snapshot/snapshotgc/gc.go +++ b/snapshot/snapshotgc/gc.go @@ -3,12 +3,12 @@ import ( "context" - "sync" "time" "github.com/pkg/errors" "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/internal/bigmap" "github.com/kopia/kopia/internal/stats" "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/repo" @@ -23,7 +23,7 @@ var log = logging.Module("snapshotgc") -func findInUseContentIDs(ctx context.Context, rep repo.Repository, used *sync.Map) error { +func findInUseContentIDs(ctx context.Context, rep repo.Repository, used *bigmap.Set) error { ids, err := snapshot.ListSnapshotManifests(ctx, rep, nil, nil) if err != nil { return errors.Wrap(err, "unable to list snapshot manifest IDs") @@ -34,21 +34,27 @@ func findInUseContentIDs(ctx context.Context, rep repo.Repository, used *sync.Ma return errors.Wrap(err, "unable to load manifest IDs") } - w := snapshotfs.NewTreeWalker(snapshotfs.TreeWalkerOptions{ + w, twerr := snapshotfs.NewTreeWalker(ctx, snapshotfs.TreeWalkerOptions{ EntryCallback: func(ctx context.Context, entry fs.Entry, oid object.ID, entryPath string) error { - contentIDs, err := rep.VerifyObject(ctx, oid) - if err != nil { - return errors.Wrapf(err, "error verifying %v", oid) + contentIDs, verr := rep.VerifyObject(ctx, oid) + if verr != nil { + return errors.Wrapf(verr, "error verifying %v", oid) } + var cidbuf [128]byte + for _, cid := range contentIDs { - used.Store(cid, nil) + used.Put(ctx, cid.Append(cidbuf[:0])) } return nil }, }) - defer w.Close() + if twerr != nil { + return errors.Wrap(err, "unable to create tree walker") + } + + defer w.Close(ctx) log(ctx).Infof("Looking for active contents...") @@ -93,13 +99,15 @@ func Run(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete bool, sa } func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete bool, safety maintenance.SafetyParameters, maintenanceStartTime time.Time, st *Stats) error { - var ( - used sync.Map + var unused, inUse, system, tooRecent, undeleted stats.CountSum - unused, inUse, system, tooRecent, undeleted stats.CountSum - ) + used, serr := bigmap.NewSet(ctx) + if serr != nil { + return errors.Wrap(serr, "unable to create new set") + } + defer used.Close(ctx) - if err := findInUseContentIDs(ctx, rep, &used); err != nil { + if err := findInUseContentIDs(ctx, rep, used); err != nil { return errors.Wrap(err, "unable to find in-use content ID") } @@ -113,7 +121,9 @@ func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete return nil } - if _, ok := used.Load(ci.GetContentID()); ok { + var cidbuf [128]byte + + if used.Contains(ci.GetContentID().Append(cidbuf[:0])) { if ci.GetDeleted() { if err := rep.ContentManager().UndeleteContent(ctx, ci.GetContentID()); err != nil { return errors.Wrapf(err, "Could not undelete referenced content: %v", ci)