From 792cc874dcd313ff3944a2cba319ebd4947b3608 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 25 Sep 2021 14:54:31 -0700 Subject: [PATCH] repo: allow reusing of object writer buffers (#1315) This reduces memory consumption and speeds up backups. 1. Backing up kopia repository (3.5 GB files:133102 dirs:20074): before: 25s, 490 MB after: 21s, 445 MB 2. Large files (14.8 GB, 76 files) before: 30s, 597 MB after: 28s, 495 MB All tests repeated 5 times for clean local filesystem repo. --- internal/gather/gather_bytes.go | 1 + internal/gather/gather_write_buffer.go | 13 +++ internal/gather/gather_write_buffer_test.go | 13 +++ internal/repotesting/reconnectable_storage.go | 6 +- internal/repotesting/repotesting.go | 90 +++++++++---------- internal/repotesting/repotesting_test.go | 3 +- internal/server/api_content.go | 3 +- internal/server/grpc_session.go | 3 +- repo/api_server_repository.go | 10 +-- repo/content/content_formatter_test.go | 14 +-- repo/content/content_manager.go | 8 +- repo/content/content_manager_test.go | 23 ++--- repo/grpc_repository_client.go | 11 ++- repo/manifest/committed_manifest_manager.go | 5 +- repo/manifest/manifest_manager.go | 3 +- repo/object/object_manager.go | 41 ++++++--- repo/object/object_manager_test.go | 7 +- repo/object/object_writer.go | 6 +- repo/repo_benchmarks_test.go | 62 +++++++++++++ .../repository_stress_test.go | 3 +- tests/stress_test/stress_test.go | 3 +- 21 files changed, 228 insertions(+), 100 deletions(-) create mode 100644 repo/repo_benchmarks_test.go diff --git a/internal/gather/gather_bytes.go b/internal/gather/gather_bytes.go index 38ad27eb3..c323f41a4 100644 --- a/internal/gather/gather_bytes.go +++ b/internal/gather/gather_bytes.go @@ -172,6 +172,7 @@ func (b Bytes) WriteTo(w io.Writer) (int64, error) { // FromSlice creates Bytes from the specified slice. func FromSlice(b []byte) Bytes { var r Bytes + r.sliceBuf[0] = b r.Slices = r.sliceBuf[:] diff --git a/internal/gather/gather_write_buffer.go b/internal/gather/gather_write_buffer.go index 289ee829e..fa1ee5c33 100644 --- a/internal/gather/gather_write_buffer.go +++ b/internal/gather/gather_write_buffer.go @@ -33,6 +33,19 @@ func (b *WriteBuffer) Close() { b.inner.invalidate() } +// CloneContiguous initializes the write buffer with a contiguous (single-slice) copy of the provided +// slices. +func (b *WriteBuffer) CloneContiguous(byt Bytes) []byte { + contig := b.MakeContiguous(byt.Length()) + output := contig[:0] + + for _, s := range byt.Slices { + output = append(output, s...) + } + + return contig +} + // MakeContiguous ensures the write buffer consists of exactly one contiguous single slice of the provided length // and returns the slice. func (b *WriteBuffer) MakeContiguous(length int) []byte { diff --git a/internal/gather/gather_write_buffer_test.go b/internal/gather/gather_write_buffer_test.go index d8df26563..9ad771bc3 100644 --- a/internal/gather/gather_write_buffer_test.go +++ b/internal/gather/gather_write_buffer_test.go @@ -81,3 +81,16 @@ func TestGatherWriteBufferContig(t *testing.T) { require.Equal(t, theCap, len(b)) require.Equal(t, theCap, cap(b)) } + +func TestGatherWriteBufferCloneContig(t *testing.T) { + var w WriteBuffer + defer w.Close() + + w.Append([]byte{1, 2, 3}) + w.Append([]byte{4, 5, 6}) + + var w2 WriteBuffer + contig := w2.CloneContiguous(w.Bytes()) + + require.Equal(t, []byte{1, 2, 3, 4, 5, 6}, contig) +} diff --git a/internal/repotesting/reconnectable_storage.go b/internal/repotesting/reconnectable_storage.go index 73b61dfab..1258e2cc5 100644 --- a/internal/repotesting/reconnectable_storage.go +++ b/internal/repotesting/reconnectable_storage.go @@ -31,13 +31,13 @@ type reconnectableStorageOptions struct { // newReconnectableStorage wraps the provided storage that may or may not be round-trippable // in a wrapper that globally caches storage instance and ensures its connection info is // round-trippable. -func newReconnectableStorage(t *testing.T, st blob.Storage) blob.Storage { - t.Helper() +func newReconnectableStorage(tb testing.TB, st blob.Storage) blob.Storage { + tb.Helper() st2 := reconnectableStorage{st, &reconnectableStorageOptions{UUID: uuid.NewString()}} reconnectableStorageByUUID.Store(st2.opt.UUID, st2) - t.Cleanup(func() { + tb.Cleanup(func() { reconnectableStorageByUUID.Delete(st2.opt.UUID) }) diff --git a/internal/repotesting/repotesting.go b/internal/repotesting/repotesting.go index c00bb61c0..1de8344b2 100644 --- a/internal/repotesting/repotesting.go +++ b/internal/repotesting/repotesting.go @@ -38,11 +38,11 @@ type Options struct { } // setup sets up a test environment. -func (e *Environment) setup(t *testing.T, version content.FormatVersion, opts ...Options) *Environment { - t.Helper() +func (e *Environment) setup(tb testing.TB, version content.FormatVersion, opts ...Options) *Environment { + tb.Helper() - ctx := testlogging.Context(t) - e.configDir = testutil.TempDirectory(t) + ctx := testlogging.Context(tb) + e.configDir = testutil.TempDirectory(tb) openOpt := &repo.Options{} opt := &repo.NewRepositoryOptions{ @@ -71,7 +71,7 @@ func (e *Environment) setup(t *testing.T, version content.FormatVersion, opts .. } st := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, openOpt.TimeNowFunc) - st = newReconnectableStorage(t, st) + st = newReconnectableStorage(tb, st) e.st = st if e.Password == "" { @@ -79,49 +79,49 @@ func (e *Environment) setup(t *testing.T, version content.FormatVersion, opts .. } if err := repo.Initialize(ctx, st, opt, e.Password); err != nil { - t.Fatalf("err: %v", err) + tb.Fatalf("err: %v", err) } if err := repo.Connect(ctx, e.ConfigFile(), st, e.Password, nil); err != nil { - t.Fatalf("can't connect: %v", err) + tb.Fatalf("can't connect: %v", err) } e.connected = true rep, err := repo.Open(ctx, e.ConfigFile(), e.Password, openOpt) if err != nil { - t.Fatalf("can't open: %v", err) + tb.Fatalf("can't open: %v", err) } e.Repository = rep _, e.RepositoryWriter, err = rep.(repo.DirectRepository).NewDirectWriter(ctx, repo.WriteSessionOptions{Purpose: "test"}) if err != nil { - t.Fatal(err) + tb.Fatal(err) } - t.Cleanup(func() { rep.Close(ctx) }) + tb.Cleanup(func() { rep.Close(ctx) }) return e } // Close closes testing environment. -func (e *Environment) Close(ctx context.Context, t *testing.T) { - t.Helper() +func (e *Environment) Close(ctx context.Context, tb testing.TB) { + tb.Helper() if err := e.RepositoryWriter.Close(ctx); err != nil { - t.Fatalf("unable to close: %v", err) + tb.Fatalf("unable to close: %v", err) } if e.connected { if err := repo.Disconnect(ctx, e.ConfigFile()); err != nil { - t.Errorf("error disconnecting: %v", err) + tb.Errorf("error disconnecting: %v", err) } } if err := os.Remove(e.configDir); err != nil { // should be empty, assuming Disconnect was successful - t.Errorf("error removing config directory: %v", err) + tb.Errorf("error removing config directory: %v", err) } } @@ -131,47 +131,47 @@ func (e *Environment) ConfigFile() string { } // MustReopen closes and reopens the repository. -func (e *Environment) MustReopen(t *testing.T, openOpts ...func(*repo.Options)) { - t.Helper() +func (e *Environment) MustReopen(tb testing.TB, openOpts ...func(*repo.Options)) { + tb.Helper() - ctx := testlogging.Context(t) + ctx := testlogging.Context(tb) err := e.RepositoryWriter.Close(ctx) if err != nil { - t.Fatalf("close error: %v", err) + tb.Fatalf("close error: %v", err) } rep, err := repo.Open(ctx, e.ConfigFile(), e.Password, repoOptions(openOpts)) if err != nil { - t.Fatalf("err: %v", err) + tb.Fatalf("err: %v", err) } - t.Cleanup(func() { rep.Close(ctx) }) + tb.Cleanup(func() { rep.Close(ctx) }) _, e.RepositoryWriter, err = rep.(repo.DirectRepository).NewDirectWriter(ctx, repo.WriteSessionOptions{Purpose: "test"}) if err != nil { - t.Fatalf("err: %v", err) + tb.Fatalf("err: %v", err) } } // MustOpenAnother opens another repository backend by the same storage. -func (e *Environment) MustOpenAnother(t *testing.T) repo.RepositoryWriter { - t.Helper() +func (e *Environment) MustOpenAnother(tb testing.TB) repo.RepositoryWriter { + tb.Helper() - ctx := testlogging.Context(t) + ctx := testlogging.Context(tb) rep2, err := repo.Open(ctx, e.ConfigFile(), e.Password, &repo.Options{}) if err != nil { - t.Fatalf("err: %v", err) + tb.Fatalf("err: %v", err) } - t.Cleanup(func() { + tb.Cleanup(func() { rep2.Close(ctx) }) _, w, err := rep2.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "test"}) if err != nil { - t.Fatal(err) + tb.Fatal(err) } return w @@ -179,43 +179,43 @@ func (e *Environment) MustOpenAnother(t *testing.T) repo.RepositoryWriter { // MustConnectOpenAnother opens another repository backend by the same storage, // with independent config and cache options. -func (e *Environment) MustConnectOpenAnother(t *testing.T, openOpts ...func(*repo.Options)) repo.Repository { - t.Helper() +func (e *Environment) MustConnectOpenAnother(tb testing.TB, openOpts ...func(*repo.Options)) repo.Repository { + tb.Helper() - ctx := testlogging.Context(t) + ctx := testlogging.Context(tb) - config := filepath.Join(testutil.TempDirectory(t), "kopia.config") + config := filepath.Join(testutil.TempDirectory(tb), "kopia.config") connOpts := &repo.ConnectOptions{ CachingOptions: content.CachingOptions{ - CacheDirectory: testutil.TempDirectory(t), + CacheDirectory: testutil.TempDirectory(tb), }, } if err := repo.Connect(ctx, config, e.st, e.Password, connOpts); err != nil { - t.Fatal("can't connect:", err) + tb.Fatal("can't connect:", err) } rep, err := repo.Open(ctx, e.ConfigFile(), e.Password, repoOptions(openOpts)) if err != nil { - t.Fatal("can't open:", err) + tb.Fatal("can't open:", err) } return rep } // VerifyBlobCount verifies that the underlying storage contains the specified number of blobs. -func (e *Environment) VerifyBlobCount(t *testing.T, want int) { - t.Helper() +func (e *Environment) VerifyBlobCount(tb testing.TB, want int) { + tb.Helper() var got int - _ = e.RepositoryWriter.BlobReader().ListBlobs(testlogging.Context(t), "", func(_ blob.Metadata) error { + _ = e.RepositoryWriter.BlobReader().ListBlobs(testlogging.Context(tb), "", func(_ blob.Metadata) error { got++ return nil }) if got != want { - t.Errorf("got unexpected number of BLOBs: %v, wanted %v", got, want) + tb.Errorf("got unexpected number of BLOBs: %v, wanted %v", got, want) } } @@ -235,17 +235,17 @@ func repoOptions(openOpts []func(*repo.Options)) *repo.Options { const FormatNotImportant = content.FormatVersion2 // NewEnvironment creates a new repository testing environment and ensures its cleanup at the end of the test. -func NewEnvironment(t *testing.T, version content.FormatVersion, opts ...Options) (context.Context, *Environment) { - t.Helper() +func NewEnvironment(tb testing.TB, version content.FormatVersion, opts ...Options) (context.Context, *Environment) { + tb.Helper() - ctx := testlogging.Context(t) + ctx := testlogging.Context(tb) var env Environment - env.setup(t, version, opts...) + env.setup(tb, version, opts...) - t.Cleanup(func() { - env.Close(ctx, t) + tb.Cleanup(func() { + env.Close(ctx, tb) }) return ctx, &env diff --git a/internal/repotesting/repotesting_test.go b/internal/repotesting/repotesting_test.go index fc14be837..f2002cf1b 100644 --- a/internal/repotesting/repotesting_test.go +++ b/internal/repotesting/repotesting_test.go @@ -5,6 +5,7 @@ "time" "github.com/kopia/kopia/internal/faketime" + "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/mockfs" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/content" @@ -43,7 +44,7 @@ func TestTimeFuncWiring(t *testing.T) { // verify wiring for the content layer nt := ft.Advance(20 * time.Second) - cid, err := env.RepositoryWriter.ContentManager().WriteContent(ctx, []byte("foo"), "", content.NoCompression) + cid, err := env.RepositoryWriter.ContentManager().WriteContent(ctx, gather.FromSlice([]byte("foo")), "", content.NoCompression) if err != nil { t.Fatal("failed to write content:", err) } diff --git a/internal/server/api_content.go b/internal/server/api_content.go index f59476d15..50149bde6 100644 --- a/internal/server/api_content.go +++ b/internal/server/api_content.go @@ -9,6 +9,7 @@ "github.com/gorilla/mux" + "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/serverapi" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/compression" @@ -83,7 +84,7 @@ func (s *Server) handleContentPut(ctx context.Context, r *http.Request, data []b } } - actualCID, err := dr.ContentManager().WriteContent(ctx, data, prefix, comp) + actualCID, err := dr.ContentManager().WriteContent(ctx, gather.FromSlice(data), prefix, comp) if err != nil { return nil, internalServerError(err) } diff --git a/internal/server/grpc_session.go b/internal/server/grpc_session.go index 0e97f0545..bc86f6ad2 100644 --- a/internal/server/grpc_session.go +++ b/internal/server/grpc_session.go @@ -17,6 +17,7 @@ "google.golang.org/grpc/status" "github.com/kopia/kopia/internal/auth" + "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/grpcapi" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/compression" @@ -231,7 +232,7 @@ func handleWriteContentRequest(ctx context.Context, dw repo.DirectRepositoryWrit return accessDeniedResponse() } - contentID, err := dw.ContentManager().WriteContent(ctx, req.GetData(), content.ID(req.GetPrefix()), compression.HeaderID(req.GetCompression())) + contentID, err := dw.ContentManager().WriteContent(ctx, gather.FromSlice(req.GetData()), content.ID(req.GetPrefix()), compression.HeaderID(req.GetCompression())) if err != nil { return errorResponse(err) } diff --git a/repo/api_server_repository.go b/repo/api_server_repository.go index 0c9b89a0b..4f6fb6b23 100644 --- a/repo/api_server_repository.go +++ b/repo/api_server_repository.go @@ -194,14 +194,14 @@ func (r *apiServerRepository) GetContent(ctx context.Context, contentID content. return tmp.ToByteSlice(), nil } -func (r *apiServerRepository) WriteContent(ctx context.Context, data []byte, prefix content.ID, comp compression.HeaderID) (content.ID, error) { +func (r *apiServerRepository) WriteContent(ctx context.Context, data gather.Bytes, prefix content.ID, comp compression.HeaderID) (content.ID, error) { if err := content.ValidatePrefix(prefix); err != nil { return "", errors.Wrap(err, "invalid prefix") } var hashOutput [128]byte - contentID := prefix + content.ID(hex.EncodeToString(r.h(hashOutput[:0], gather.FromSlice(data)))) + contentID := prefix + content.ID(hex.EncodeToString(r.h(hashOutput[:0], data))) // avoid uploading the content body if it already exists. if _, err := r.ContentInfo(ctx, contentID); err == nil { @@ -209,20 +209,20 @@ func (r *apiServerRepository) WriteContent(ctx context.Context, data []byte, pre return contentID, nil } - r.wso.OnUpload(int64(len(data))) + r.wso.OnUpload(int64(data.Length())) maybeCompression := "" if comp != content.NoCompression { maybeCompression = fmt.Sprintf("?compression=%x", comp) } - if err := r.cli.Put(ctx, "contents/"+string(contentID)+maybeCompression, data, nil); err != nil { + if err := r.cli.Put(ctx, "contents/"+string(contentID)+maybeCompression, data.ToByteSlice(), nil); err != nil { return "", errors.Wrapf(err, "error writing content %v", contentID) } if prefix != "" { // add all prefixed contents to the cache. - r.contentCache.Put(ctx, string(contentID), gather.FromSlice(data)) + r.contentCache.Put(ctx, string(contentID), data) } return contentID, nil diff --git a/repo/content/content_formatter_test.go b/repo/content/content_formatter_test.go index 37b51459f..7bcbc7a62 100644 --- a/repo/content/content_formatter_test.go +++ b/repo/content/content_formatter_test.go @@ -117,11 +117,11 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp defer bm.Close(ctx) - cases := [][]byte{ - {}, - {1, 2, 3}, - make([]byte, 256), - bytes.Repeat([]byte{1, 2, 3, 5}, 1024), + cases := []gather.Bytes{ + gather.FromSlice([]byte{}), + gather.FromSlice([]byte{1, 2, 3}), + gather.FromSlice(make([]byte, 256)), + gather.FromSlice(bytes.Repeat([]byte{1, 2, 3, 5}, 1024)), } for _, b := range cases { @@ -138,7 +138,7 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp return } - if got, want := b2, b; !bytes.Equal(got, want) { + if got, want := b2, b.ToByteSlice(); !bytes.Equal(got, want) { t.Errorf("content %q data mismatch: got %x, wanted %x", contentID, got, want) return } @@ -153,7 +153,7 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp return } - if got, want := b3, b; !bytes.Equal(got, want) { + if got, want := b3, b.ToByteSlice(); !bytes.Equal(got, want) { t.Errorf("content %q data mismatch: got %x, wanted %x", contentID, got, want) return } diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 4a55a8070..ded8cec05 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -672,13 +672,13 @@ func (bm *WriteManager) SupportsContentCompression() bool { // WriteContent saves a given content of data to a pack group with a provided name and returns a contentID // that's based on the contents of data written. -func (bm *WriteManager) WriteContent(ctx context.Context, data []byte, prefix ID, comp compression.HeaderID) (ID, error) { +func (bm *WriteManager) WriteContent(ctx context.Context, data gather.Bytes, prefix ID, comp compression.HeaderID) (ID, error) { if err := bm.maybeRetryWritingFailedPacksUnlocked(ctx); err != nil { return "", err } stats.Record(ctx, metricContentWriteContentCount.M(1)) - stats.Record(ctx, metricContentWriteContentBytes.M(int64(len(data)))) + stats.Record(ctx, metricContentWriteContentBytes.M(int64(data.Length()))) if err := ValidatePrefix(prefix); err != nil { return "", err @@ -686,7 +686,7 @@ func (bm *WriteManager) WriteContent(ctx context.Context, data []byte, prefix ID var hashOutput [hashing.MaxHashSize]byte - contentID := prefix + ID(hex.EncodeToString(bm.hashData(hashOutput[:0], gather.FromSlice(data)))) + contentID := prefix + ID(hex.EncodeToString(bm.hashData(hashOutput[:0], data))) bm.mu.RLock() _, bi, err := bm.getContentInfoReadLocked(ctx, contentID) @@ -704,7 +704,7 @@ func (bm *WriteManager) WriteContent(ctx context.Context, data []byte, prefix ID bm.log.Debugf("write-content %v new", contentID) } - return contentID, bm.addToPackUnlocked(ctx, contentID, gather.FromSlice(data), false, comp) + return contentID, bm.addToPackUnlocked(ctx, contentID, data, false, comp) } // GetContent gets the contents of a given content. If the content is not found returns ErrContentNotFound. diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 5ef5a5f07..8e726d030 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -23,6 +23,7 @@ "github.com/kopia/kopia/internal/blobtesting" "github.com/kopia/kopia/internal/epoch" "github.com/kopia/kopia/internal/faketime" + "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/ownwrites" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/internal/testutil" @@ -306,7 +307,7 @@ func (s *contentManagerSuite) TestContentManagerWriteMultiple(t *testing.T) { for i := 0; i < repeatCount; i++ { b := seededRandomData(i, i%113) - blkID, err := bm.WriteContent(ctx, b, "", NoCompression) + blkID, err := bm.WriteContent(ctx, gather.FromSlice(b), "", NoCompression) if err != nil { t.Errorf("err: %v", err) } @@ -376,12 +377,12 @@ func (s *contentManagerSuite) TestContentManagerFailedToWritePack(t *testing.T) }, } - _, err = bm.WriteContent(ctx, seededRandomData(1, 10), "", NoCompression) + _, err = bm.WriteContent(ctx, gather.FromSlice(seededRandomData(1, 10)), "", NoCompression) if !errors.Is(err, sessionPutErr) { t.Fatalf("can't create first content: %v", err) } - b1, err := bm.WriteContent(ctx, seededRandomData(1, 10), "", NoCompression) + b1, err := bm.WriteContent(ctx, gather.FromSlice(seededRandomData(1, 10)), "", NoCompression) if err != nil { t.Fatalf("can't create content: %v", err) } @@ -389,7 +390,7 @@ func (s *contentManagerSuite) TestContentManagerFailedToWritePack(t *testing.T) // advance time enough to cause auto-flush, which will fail (firstPutErr) ta.Advance(1 * time.Hour) - if _, err := bm.WriteContent(ctx, seededRandomData(2, 10), "", NoCompression); !errors.Is(err, firstPutErr) { + if _, err := bm.WriteContent(ctx, gather.FromSlice(seededRandomData(2, 10)), "", NoCompression); !errors.Is(err, firstPutErr) { t.Fatalf("can't create 2nd content: %v", err) } @@ -1857,7 +1858,7 @@ func (s *contentManagerSuite) verifyVersionCompat(t *testing.T, writeVersion For data := make([]byte, i) cryptorand.Read(data) - cid, err := mgr.WriteContent(ctx, data, "", NoCompression) + cid, err := mgr.WriteContent(ctx, gather.FromSlice(data), "", NoCompression) if err != nil { t.Fatalf("unable to write %v bytes: %v", len(data), err) } @@ -2003,7 +2004,7 @@ func (s *contentManagerSuite) TestCompression_Disabled(t *testing.T) { compressibleData := bytes.Repeat([]byte{1, 2, 3, 4}, 1000) // with index v1 the compression is disabled - _, err := bm.WriteContent(ctx, compressibleData, "", compression.ByName["pgzip"].HeaderID()) + _, err := bm.WriteContent(ctx, gather.FromSlice(compressibleData), "", compression.ByName["pgzip"].HeaderID()) require.Error(t, err) } @@ -2020,7 +2021,7 @@ func (s *contentManagerSuite) TestCompression_CompressibleData(t *testing.T) { compressibleData := bytes.Repeat([]byte{1, 2, 3, 4}, 1000) headerID := compression.ByName["gzip"].HeaderID() - cid, err := bm.WriteContent(ctx, compressibleData, "", headerID) + cid, err := bm.WriteContent(ctx, gather.FromSlice(compressibleData), "", headerID) require.NoError(t, err) ci, err := bm.ContentInfo(ctx, cid) @@ -2057,7 +2058,7 @@ func (s *contentManagerSuite) TestCompression_NonCompressibleData(t *testing.T) rand.Read(nonCompressibleData) - cid, err := bm.WriteContent(ctx, nonCompressibleData, "", headerID) + cid, err := bm.WriteContent(ctx, gather.FromSlice(nonCompressibleData), "", headerID) require.NoError(t, err) verifyContent(ctx, t, bm, cid, nonCompressibleData) @@ -2182,7 +2183,7 @@ func verifyContent(ctx context.Context, t *testing.T, bm *WriteManager, contentI func writeContentAndVerify(ctx context.Context, t *testing.T, bm *WriteManager, b []byte) ID { t.Helper() - contentID, err := bm.WriteContent(ctx, b, "", NoCompression) + contentID, err := bm.WriteContent(ctx, gather.FromSlice(b), "", NoCompression) if err != nil { t.Errorf("err: %v", err) } @@ -2220,13 +2221,13 @@ func writeContentWithRetriesAndVerify(ctx context.Context, t *testing.T, bm *Wri t.Logf("*** starting writeContentWithRetriesAndVerify") - contentID, err := bm.WriteContent(ctx, b, "", NoCompression) + contentID, err := bm.WriteContent(ctx, gather.FromSlice(b), "", NoCompression) for i := 0; err != nil && i < maxRetries; i++ { retryCount++ t.Logf("*** try %v", retryCount) - contentID, err = bm.WriteContent(ctx, b, "", NoCompression) + contentID, err = bm.WriteContent(ctx, gather.FromSlice(b), "", NoCompression) } if err != nil { diff --git a/repo/grpc_repository_client.go b/repo/grpc_repository_client.go index 4953359f2..de17b0ed2 100644 --- a/repo/grpc_repository_client.go +++ b/repo/grpc_repository_client.go @@ -599,7 +599,7 @@ func (r *grpcRepositoryClient) doWrite(ctx context.Context, contentID content.ID return nil } -func (r *grpcRepositoryClient) WriteContent(ctx context.Context, data []byte, prefix content.ID, comp compression.HeaderID) (content.ID, error) { +func (r *grpcRepositoryClient) WriteContent(ctx context.Context, data gather.Bytes, prefix content.ID, comp compression.HeaderID) (content.ID, error) { if err := content.ValidatePrefix(prefix); err != nil { return "", errors.Wrap(err, "invalid prefix") } @@ -611,7 +611,7 @@ func (r *grpcRepositoryClient) WriteContent(ctx context.Context, data []byte, pr var hashOutput [128]byte - contentID := prefix + content.ID(hex.EncodeToString(r.h(hashOutput[:0], gather.FromSlice(data)))) + contentID := prefix + content.ID(hex.EncodeToString(r.h(hashOutput[:0], data))) if r.recent.exists(contentID) { return contentID, nil @@ -621,15 +621,18 @@ func (r *grpcRepositoryClient) WriteContent(ctx context.Context, data []byte, pr r.asyncWritesSemaphore <- struct{}{} // clone so that caller can reuse the buffer - data = append([]byte(nil), data...) + var cloneBuf gather.WriteBuffer + clone := cloneBuf.CloneContiguous(data) r.asyncWritesWG.Go(func() error { defer func() { // release semaphore <-r.asyncWritesSemaphore + + defer cloneBuf.Close() }() - return r.doWrite(ctxutil.Detach(ctx), contentID, data, prefix, comp) + return r.doWrite(ctxutil.Detach(ctx), contentID, clone, prefix, comp) }) return contentID, nil diff --git a/repo/manifest/committed_manifest_manager.go b/repo/manifest/committed_manifest_manager.go index cd0fc2e73..97b10f2ad 100644 --- a/repo/manifest/committed_manifest_manager.go +++ b/repo/manifest/committed_manifest_manager.go @@ -13,6 +13,7 @@ "github.com/pkg/errors" + "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/repo/content" ) @@ -96,7 +97,9 @@ func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entri man.Entries = append(man.Entries, e) } - var buf bytes.Buffer + var buf gather.WriteBuffer + defer buf.Close() + gz := gzip.NewWriter(&buf) mustSucceed(json.NewEncoder(gz).Encode(man)) mustSucceed(gz.Flush()) diff --git a/repo/manifest/manifest_manager.go b/repo/manifest/manifest_manager.go index 705214186..5219f447e 100644 --- a/repo/manifest/manifest_manager.go +++ b/repo/manifest/manifest_manager.go @@ -13,6 +13,7 @@ "github.com/pkg/errors" "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/repo/compression" "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/logging" @@ -40,7 +41,7 @@ type contentManager interface { Revision() int64 GetContent(ctx context.Context, contentID content.ID) ([]byte, error) - WriteContent(ctx context.Context, data []byte, prefix content.ID, comp compression.HeaderID) (content.ID, error) + WriteContent(ctx context.Context, data gather.Bytes, prefix content.ID, comp compression.HeaderID) (content.ID, error) DeleteContent(ctx context.Context, contentID content.ID) error IterateContents(ctx context.Context, options content.IterateOptions, callback content.IterateCallback) error DisableIndexFlush(ctx context.Context) diff --git a/repo/object/object_manager.go b/repo/object/object_manager.go index 8b648ed6b..694996d59 100644 --- a/repo/object/object_manager.go +++ b/repo/object/object_manager.go @@ -4,9 +4,11 @@ import ( "context" "io" + "sync" "github.com/pkg/errors" + "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/repo/compression" "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/splitter" @@ -31,7 +33,7 @@ type contentReader interface { type contentManager interface { contentReader SupportsContentCompression() bool - WriteContent(ctx context.Context, data []byte, prefix content.ID, comp compression.HeaderID) (content.ID, error) + WriteContent(ctx context.Context, data gather.Bytes, prefix content.ID, comp compression.HeaderID) (content.ID, error) } // Format describes the format of objects in a repository. @@ -45,29 +47,42 @@ type Manager struct { contentMgr contentManager newSplitter splitter.Factory + writerPool sync.Pool } // NewWriter creates an ObjectWriter for writing to the repository. func (om *Manager) NewWriter(ctx context.Context, opt WriterOptions) Writer { - w := &objectWriter{ - ctx: ctx, - om: om, - splitter: om.newSplitter(), - description: opt.Description, - prefix: opt.Prefix, - compressor: compression.ByName[opt.Compressor], - } + w, _ := om.writerPool.Get().(*objectWriter) + w.ctx = ctx + w.om = om + w.splitter = om.newSplitter() + w.description = opt.Description + w.prefix = opt.Prefix + w.compressor = compression.ByName[opt.Compressor] + w.totalLength = 0 + w.currentPosition = 0 // point the slice at the embedded array, so that we avoid allocations most of the time w.indirectIndex = w.indirectIndexBuf[:0] if opt.AsyncWrites > 0 { - w.asyncWritesSemaphore = make(chan struct{}, opt.AsyncWrites) + if len(w.asyncWritesSemaphore) != 0 || cap(w.asyncWritesSemaphore) != opt.AsyncWrites { + w.asyncWritesSemaphore = make(chan struct{}, opt.AsyncWrites) + } + } else { + w.asyncWritesSemaphore = nil } + w.buffer.Reset() + w.contentWriteError = nil + return w } +func (om *Manager) closedWriter(ow *objectWriter) { + om.writerPool.Put(ow) +} + // Concatenate creates an object that's a result of concatenation of other objects. This is more efficient than reading // and rewriting the objects because Concatenate can efficiently merge index entries without reading the underlying // contents. @@ -174,6 +189,12 @@ func NewObjectManager(ctx context.Context, bm contentManager, f Format) (*Manage Format: f, } + om.writerPool = sync.Pool{ + New: func() interface{} { + return new(objectWriter) + }, + } + splitterID := f.Splitter if splitterID == "" { splitterID = "FIXED" diff --git a/repo/object/object_manager_test.go b/repo/object/object_manager_test.go index 6ccda6d8b..4552a8046 100644 --- a/repo/object/object_manager_test.go +++ b/repo/object/object_manager_test.go @@ -21,6 +21,7 @@ "golang.org/x/sync/errgroup" "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/internal/testutil" "github.com/kopia/kopia/repo/blob" @@ -47,15 +48,15 @@ func (f *fakeContentManager) GetContent(ctx context.Context, contentID content.I return nil, content.ErrContentNotFound } -func (f *fakeContentManager) WriteContent(ctx context.Context, data []byte, prefix content.ID, comp compression.HeaderID) (content.ID, error) { +func (f *fakeContentManager) WriteContent(ctx context.Context, data gather.Bytes, prefix content.ID, comp compression.HeaderID) (content.ID, error) { h := sha256.New() - h.Write(data) + data.WriteTo(h) contentID := prefix + content.ID(hex.EncodeToString(h.Sum(nil))) f.mu.Lock() defer f.mu.Unlock() - f.data[contentID] = append([]byte(nil), data...) + f.data[contentID] = data.ToByteSlice() if f.compresionIDs != nil { f.compresionIDs[contentID] = comp } diff --git a/repo/object/object_writer.go b/repo/object/object_writer.go index b6b09c11b..bb39e441c 100644 --- a/repo/object/object_writer.go +++ b/repo/object/object_writer.go @@ -99,6 +99,10 @@ func (w *objectWriter) Close() error { w.splitter.Close() } + w.buffer.Close() + + w.om.closedWriter(w) + return nil } @@ -200,7 +204,7 @@ func (w *objectWriter) prepareAndWriteContentChunk(chunkID int, data gather.Byte return errors.Wrap(err, "unable to prepare content bytes") } - contentID, err := w.om.contentMgr.WriteContent(w.ctx, contentBytes.ToByteSlice(), w.prefix, comp) + contentID, err := w.om.contentMgr.WriteContent(w.ctx, contentBytes, w.prefix, comp) if err != nil { return errors.Wrapf(err, "unable to write content chunk %v of %v: %v", chunkID, w.description, err) } diff --git a/repo/repo_benchmarks_test.go b/repo/repo_benchmarks_test.go new file mode 100644 index 000000000..ef6169ed9 --- /dev/null +++ b/repo/repo_benchmarks_test.go @@ -0,0 +1,62 @@ +package repo_test + +import ( + "crypto/rand" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/repotesting" + "github.com/kopia/kopia/repo/content" + "github.com/kopia/kopia/repo/object" +) + +func BenchmarkWriterDedup1M(b *testing.B) { + ctx, env := repotesting.NewEnvironment(b, content.FormatVersion2) + dataBuf := make([]byte, 4<<20) + + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer.Write(dataBuf) + _, err := writer.Result() + require.NoError(b, err) + writer.Close() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // write exactly the same data + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer.Write(dataBuf) + writer.Result() + writer.Close() + } +} + +func BenchmarkWriterNoDedup1M(b *testing.B) { + ctx, env := repotesting.NewEnvironment(b, content.FormatVersion2) + dataBuf := make([]byte, 4<<20) + chunkSize := 32 + offset := 0 + + _, err := rand.Read(dataBuf) + require.NoError(b, err) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // write exactly the same data + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + + if i+chunkSize > len(dataBuf) { + chunkSize++ + + offset = 0 + } + + writer.Write(dataBuf[offset : offset+chunkSize]) + writer.Result() + writer.Close() + + offset++ + } +} diff --git a/tests/repository_stress_test/repository_stress_test.go b/tests/repository_stress_test/repository_stress_test.go index 17738e65d..6c94d0a46 100644 --- a/tests/repository_stress_test/repository_stress_test.go +++ b/tests/repository_stress_test/repository_stress_test.go @@ -17,6 +17,7 @@ "golang.org/x/sync/errgroup" "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/internal/testutil" "github.com/kopia/kopia/repo" @@ -365,7 +366,7 @@ func writeRandomContent(ctx context.Context, r repo.DirectRepositoryWriter, rs * data := make([]byte, 1000) cryptorand.Read(data) - contentID, err := r.ContentManager().WriteContent(ctx, data, "", content.NoCompression) + contentID, err := r.ContentManager().WriteContent(ctx, gather.FromSlice(data), "", content.NoCompression) if err != nil { return errors.Wrap(err, "WriteContent error") } diff --git a/tests/stress_test/stress_test.go b/tests/stress_test/stress_test.go index 78d32f058..4715ccfa7 100644 --- a/tests/stress_test/stress_test.go +++ b/tests/stress_test/stress_test.go @@ -11,6 +11,7 @@ "github.com/kopia/kopia/internal/blobtesting" "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/content" @@ -97,7 +98,7 @@ type writtenBlock struct { dataCopy := append([]byte{}, data...) - contentID, err := bm.WriteContent(ctx, data, "", content.NoCompression) + contentID, err := bm.WriteContent(ctx, gather.FromSlice(data), "", content.NoCompression) if err != nil { t.Errorf("err: %v", err) return