repo: allow reusing of object writer buffers (#1315)

This reduces memory consumption and speeds up backups.

1. Backing up kopia repository (3.5 GB files:133102 dirs:20074):

before: 25s, 490 MB
after: 21s, 445 MB

2. Large files (14.8 GB, 76 files)

before: 30s, 597 MB
after: 28s, 495 MB

All tests repeated 5 times for clean local filesystem repo.
This commit is contained in:
Jarek Kowalski
2021-09-25 14:54:31 -07:00
committed by GitHub
parent 0629d007fc
commit 792cc874dc
21 changed files with 228 additions and 100 deletions

View File

@@ -172,6 +172,7 @@ func (b Bytes) WriteTo(w io.Writer) (int64, error) {
// FromSlice creates Bytes from the specified slice.
func FromSlice(b []byte) Bytes {
var r Bytes
r.sliceBuf[0] = b
r.Slices = r.sliceBuf[:]

View File

@@ -33,6 +33,19 @@ func (b *WriteBuffer) Close() {
b.inner.invalidate()
}
// CloneContiguous initializes the write buffer with a contiguous (single-slice) copy of the provided
// slices.
func (b *WriteBuffer) CloneContiguous(byt Bytes) []byte {
contig := b.MakeContiguous(byt.Length())
output := contig[:0]
for _, s := range byt.Slices {
output = append(output, s...)
}
return contig
}
// MakeContiguous ensures the write buffer consists of exactly one contiguous single slice of the provided length
// and returns the slice.
func (b *WriteBuffer) MakeContiguous(length int) []byte {

View File

@@ -81,3 +81,16 @@ func TestGatherWriteBufferContig(t *testing.T) {
require.Equal(t, theCap, len(b))
require.Equal(t, theCap, cap(b))
}
func TestGatherWriteBufferCloneContig(t *testing.T) {
var w WriteBuffer
defer w.Close()
w.Append([]byte{1, 2, 3})
w.Append([]byte{4, 5, 6})
var w2 WriteBuffer
contig := w2.CloneContiguous(w.Bytes())
require.Equal(t, []byte{1, 2, 3, 4, 5, 6}, contig)
}

View File

@@ -31,13 +31,13 @@ type reconnectableStorageOptions struct {
// newReconnectableStorage wraps the provided storage that may or may not be round-trippable
// in a wrapper that globally caches storage instance and ensures its connection info is
// round-trippable.
func newReconnectableStorage(t *testing.T, st blob.Storage) blob.Storage {
t.Helper()
func newReconnectableStorage(tb testing.TB, st blob.Storage) blob.Storage {
tb.Helper()
st2 := reconnectableStorage{st, &reconnectableStorageOptions{UUID: uuid.NewString()}}
reconnectableStorageByUUID.Store(st2.opt.UUID, st2)
t.Cleanup(func() {
tb.Cleanup(func() {
reconnectableStorageByUUID.Delete(st2.opt.UUID)
})

View File

@@ -38,11 +38,11 @@ type Options struct {
}
// setup sets up a test environment.
func (e *Environment) setup(t *testing.T, version content.FormatVersion, opts ...Options) *Environment {
t.Helper()
func (e *Environment) setup(tb testing.TB, version content.FormatVersion, opts ...Options) *Environment {
tb.Helper()
ctx := testlogging.Context(t)
e.configDir = testutil.TempDirectory(t)
ctx := testlogging.Context(tb)
e.configDir = testutil.TempDirectory(tb)
openOpt := &repo.Options{}
opt := &repo.NewRepositoryOptions{
@@ -71,7 +71,7 @@ func (e *Environment) setup(t *testing.T, version content.FormatVersion, opts ..
}
st := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, openOpt.TimeNowFunc)
st = newReconnectableStorage(t, st)
st = newReconnectableStorage(tb, st)
e.st = st
if e.Password == "" {
@@ -79,49 +79,49 @@ func (e *Environment) setup(t *testing.T, version content.FormatVersion, opts ..
}
if err := repo.Initialize(ctx, st, opt, e.Password); err != nil {
t.Fatalf("err: %v", err)
tb.Fatalf("err: %v", err)
}
if err := repo.Connect(ctx, e.ConfigFile(), st, e.Password, nil); err != nil {
t.Fatalf("can't connect: %v", err)
tb.Fatalf("can't connect: %v", err)
}
e.connected = true
rep, err := repo.Open(ctx, e.ConfigFile(), e.Password, openOpt)
if err != nil {
t.Fatalf("can't open: %v", err)
tb.Fatalf("can't open: %v", err)
}
e.Repository = rep
_, e.RepositoryWriter, err = rep.(repo.DirectRepository).NewDirectWriter(ctx, repo.WriteSessionOptions{Purpose: "test"})
if err != nil {
t.Fatal(err)
tb.Fatal(err)
}
t.Cleanup(func() { rep.Close(ctx) })
tb.Cleanup(func() { rep.Close(ctx) })
return e
}
// Close closes testing environment.
func (e *Environment) Close(ctx context.Context, t *testing.T) {
t.Helper()
func (e *Environment) Close(ctx context.Context, tb testing.TB) {
tb.Helper()
if err := e.RepositoryWriter.Close(ctx); err != nil {
t.Fatalf("unable to close: %v", err)
tb.Fatalf("unable to close: %v", err)
}
if e.connected {
if err := repo.Disconnect(ctx, e.ConfigFile()); err != nil {
t.Errorf("error disconnecting: %v", err)
tb.Errorf("error disconnecting: %v", err)
}
}
if err := os.Remove(e.configDir); err != nil {
// should be empty, assuming Disconnect was successful
t.Errorf("error removing config directory: %v", err)
tb.Errorf("error removing config directory: %v", err)
}
}
@@ -131,47 +131,47 @@ func (e *Environment) ConfigFile() string {
}
// MustReopen closes and reopens the repository.
func (e *Environment) MustReopen(t *testing.T, openOpts ...func(*repo.Options)) {
t.Helper()
func (e *Environment) MustReopen(tb testing.TB, openOpts ...func(*repo.Options)) {
tb.Helper()
ctx := testlogging.Context(t)
ctx := testlogging.Context(tb)
err := e.RepositoryWriter.Close(ctx)
if err != nil {
t.Fatalf("close error: %v", err)
tb.Fatalf("close error: %v", err)
}
rep, err := repo.Open(ctx, e.ConfigFile(), e.Password, repoOptions(openOpts))
if err != nil {
t.Fatalf("err: %v", err)
tb.Fatalf("err: %v", err)
}
t.Cleanup(func() { rep.Close(ctx) })
tb.Cleanup(func() { rep.Close(ctx) })
_, e.RepositoryWriter, err = rep.(repo.DirectRepository).NewDirectWriter(ctx, repo.WriteSessionOptions{Purpose: "test"})
if err != nil {
t.Fatalf("err: %v", err)
tb.Fatalf("err: %v", err)
}
}
// MustOpenAnother opens another repository backend by the same storage.
func (e *Environment) MustOpenAnother(t *testing.T) repo.RepositoryWriter {
t.Helper()
func (e *Environment) MustOpenAnother(tb testing.TB) repo.RepositoryWriter {
tb.Helper()
ctx := testlogging.Context(t)
ctx := testlogging.Context(tb)
rep2, err := repo.Open(ctx, e.ConfigFile(), e.Password, &repo.Options{})
if err != nil {
t.Fatalf("err: %v", err)
tb.Fatalf("err: %v", err)
}
t.Cleanup(func() {
tb.Cleanup(func() {
rep2.Close(ctx)
})
_, w, err := rep2.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "test"})
if err != nil {
t.Fatal(err)
tb.Fatal(err)
}
return w
@@ -179,43 +179,43 @@ func (e *Environment) MustOpenAnother(t *testing.T) repo.RepositoryWriter {
// MustConnectOpenAnother opens another repository backend by the same storage,
// with independent config and cache options.
func (e *Environment) MustConnectOpenAnother(t *testing.T, openOpts ...func(*repo.Options)) repo.Repository {
t.Helper()
func (e *Environment) MustConnectOpenAnother(tb testing.TB, openOpts ...func(*repo.Options)) repo.Repository {
tb.Helper()
ctx := testlogging.Context(t)
ctx := testlogging.Context(tb)
config := filepath.Join(testutil.TempDirectory(t), "kopia.config")
config := filepath.Join(testutil.TempDirectory(tb), "kopia.config")
connOpts := &repo.ConnectOptions{
CachingOptions: content.CachingOptions{
CacheDirectory: testutil.TempDirectory(t),
CacheDirectory: testutil.TempDirectory(tb),
},
}
if err := repo.Connect(ctx, config, e.st, e.Password, connOpts); err != nil {
t.Fatal("can't connect:", err)
tb.Fatal("can't connect:", err)
}
rep, err := repo.Open(ctx, e.ConfigFile(), e.Password, repoOptions(openOpts))
if err != nil {
t.Fatal("can't open:", err)
tb.Fatal("can't open:", err)
}
return rep
}
// VerifyBlobCount verifies that the underlying storage contains the specified number of blobs.
func (e *Environment) VerifyBlobCount(t *testing.T, want int) {
t.Helper()
func (e *Environment) VerifyBlobCount(tb testing.TB, want int) {
tb.Helper()
var got int
_ = e.RepositoryWriter.BlobReader().ListBlobs(testlogging.Context(t), "", func(_ blob.Metadata) error {
_ = e.RepositoryWriter.BlobReader().ListBlobs(testlogging.Context(tb), "", func(_ blob.Metadata) error {
got++
return nil
})
if got != want {
t.Errorf("got unexpected number of BLOBs: %v, wanted %v", got, want)
tb.Errorf("got unexpected number of BLOBs: %v, wanted %v", got, want)
}
}
@@ -235,17 +235,17 @@ func repoOptions(openOpts []func(*repo.Options)) *repo.Options {
const FormatNotImportant = content.FormatVersion2
// NewEnvironment creates a new repository testing environment and ensures its cleanup at the end of the test.
func NewEnvironment(t *testing.T, version content.FormatVersion, opts ...Options) (context.Context, *Environment) {
t.Helper()
func NewEnvironment(tb testing.TB, version content.FormatVersion, opts ...Options) (context.Context, *Environment) {
tb.Helper()
ctx := testlogging.Context(t)
ctx := testlogging.Context(tb)
var env Environment
env.setup(t, version, opts...)
env.setup(tb, version, opts...)
t.Cleanup(func() {
env.Close(ctx, t)
tb.Cleanup(func() {
env.Close(ctx, tb)
})
return ctx, &env

View File

@@ -5,6 +5,7 @@
"time"
"github.com/kopia/kopia/internal/faketime"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/mockfs"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/content"
@@ -43,7 +44,7 @@ func TestTimeFuncWiring(t *testing.T) {
// verify wiring for the content layer
nt := ft.Advance(20 * time.Second)
cid, err := env.RepositoryWriter.ContentManager().WriteContent(ctx, []byte("foo"), "", content.NoCompression)
cid, err := env.RepositoryWriter.ContentManager().WriteContent(ctx, gather.FromSlice([]byte("foo")), "", content.NoCompression)
if err != nil {
t.Fatal("failed to write content:", err)
}

View File

@@ -9,6 +9,7 @@
"github.com/gorilla/mux"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/compression"
@@ -83,7 +84,7 @@ func (s *Server) handleContentPut(ctx context.Context, r *http.Request, data []b
}
}
actualCID, err := dr.ContentManager().WriteContent(ctx, data, prefix, comp)
actualCID, err := dr.ContentManager().WriteContent(ctx, gather.FromSlice(data), prefix, comp)
if err != nil {
return nil, internalServerError(err)
}

View File

@@ -17,6 +17,7 @@
"google.golang.org/grpc/status"
"github.com/kopia/kopia/internal/auth"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/grpcapi"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/compression"
@@ -231,7 +232,7 @@ func handleWriteContentRequest(ctx context.Context, dw repo.DirectRepositoryWrit
return accessDeniedResponse()
}
contentID, err := dw.ContentManager().WriteContent(ctx, req.GetData(), content.ID(req.GetPrefix()), compression.HeaderID(req.GetCompression()))
contentID, err := dw.ContentManager().WriteContent(ctx, gather.FromSlice(req.GetData()), content.ID(req.GetPrefix()), compression.HeaderID(req.GetCompression()))
if err != nil {
return errorResponse(err)
}

View File

@@ -194,14 +194,14 @@ func (r *apiServerRepository) GetContent(ctx context.Context, contentID content.
return tmp.ToByteSlice(), nil
}
func (r *apiServerRepository) WriteContent(ctx context.Context, data []byte, prefix content.ID, comp compression.HeaderID) (content.ID, error) {
func (r *apiServerRepository) WriteContent(ctx context.Context, data gather.Bytes, prefix content.ID, comp compression.HeaderID) (content.ID, error) {
if err := content.ValidatePrefix(prefix); err != nil {
return "", errors.Wrap(err, "invalid prefix")
}
var hashOutput [128]byte
contentID := prefix + content.ID(hex.EncodeToString(r.h(hashOutput[:0], gather.FromSlice(data))))
contentID := prefix + content.ID(hex.EncodeToString(r.h(hashOutput[:0], data)))
// avoid uploading the content body if it already exists.
if _, err := r.ContentInfo(ctx, contentID); err == nil {
@@ -209,20 +209,20 @@ func (r *apiServerRepository) WriteContent(ctx context.Context, data []byte, pre
return contentID, nil
}
r.wso.OnUpload(int64(len(data)))
r.wso.OnUpload(int64(data.Length()))
maybeCompression := ""
if comp != content.NoCompression {
maybeCompression = fmt.Sprintf("?compression=%x", comp)
}
if err := r.cli.Put(ctx, "contents/"+string(contentID)+maybeCompression, data, nil); err != nil {
if err := r.cli.Put(ctx, "contents/"+string(contentID)+maybeCompression, data.ToByteSlice(), nil); err != nil {
return "", errors.Wrapf(err, "error writing content %v", contentID)
}
if prefix != "" {
// add all prefixed contents to the cache.
r.contentCache.Put(ctx, string(contentID), gather.FromSlice(data))
r.contentCache.Put(ctx, string(contentID), data)
}
return contentID, nil

View File

@@ -117,11 +117,11 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp
defer bm.Close(ctx)
cases := [][]byte{
{},
{1, 2, 3},
make([]byte, 256),
bytes.Repeat([]byte{1, 2, 3, 5}, 1024),
cases := []gather.Bytes{
gather.FromSlice([]byte{}),
gather.FromSlice([]byte{1, 2, 3}),
gather.FromSlice(make([]byte, 256)),
gather.FromSlice(bytes.Repeat([]byte{1, 2, 3, 5}, 1024)),
}
for _, b := range cases {
@@ -138,7 +138,7 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp
return
}
if got, want := b2, b; !bytes.Equal(got, want) {
if got, want := b2, b.ToByteSlice(); !bytes.Equal(got, want) {
t.Errorf("content %q data mismatch: got %x, wanted %x", contentID, got, want)
return
}
@@ -153,7 +153,7 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp
return
}
if got, want := b3, b; !bytes.Equal(got, want) {
if got, want := b3, b.ToByteSlice(); !bytes.Equal(got, want) {
t.Errorf("content %q data mismatch: got %x, wanted %x", contentID, got, want)
return
}

View File

@@ -672,13 +672,13 @@ func (bm *WriteManager) SupportsContentCompression() bool {
// WriteContent saves a given content of data to a pack group with a provided name and returns a contentID
// that's based on the contents of data written.
func (bm *WriteManager) WriteContent(ctx context.Context, data []byte, prefix ID, comp compression.HeaderID) (ID, error) {
func (bm *WriteManager) WriteContent(ctx context.Context, data gather.Bytes, prefix ID, comp compression.HeaderID) (ID, error) {
if err := bm.maybeRetryWritingFailedPacksUnlocked(ctx); err != nil {
return "", err
}
stats.Record(ctx, metricContentWriteContentCount.M(1))
stats.Record(ctx, metricContentWriteContentBytes.M(int64(len(data))))
stats.Record(ctx, metricContentWriteContentBytes.M(int64(data.Length())))
if err := ValidatePrefix(prefix); err != nil {
return "", err
@@ -686,7 +686,7 @@ func (bm *WriteManager) WriteContent(ctx context.Context, data []byte, prefix ID
var hashOutput [hashing.MaxHashSize]byte
contentID := prefix + ID(hex.EncodeToString(bm.hashData(hashOutput[:0], gather.FromSlice(data))))
contentID := prefix + ID(hex.EncodeToString(bm.hashData(hashOutput[:0], data)))
bm.mu.RLock()
_, bi, err := bm.getContentInfoReadLocked(ctx, contentID)
@@ -704,7 +704,7 @@ func (bm *WriteManager) WriteContent(ctx context.Context, data []byte, prefix ID
bm.log.Debugf("write-content %v new", contentID)
}
return contentID, bm.addToPackUnlocked(ctx, contentID, gather.FromSlice(data), false, comp)
return contentID, bm.addToPackUnlocked(ctx, contentID, data, false, comp)
}
// GetContent gets the contents of a given content. If the content is not found returns ErrContentNotFound.

View File

@@ -23,6 +23,7 @@
"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/epoch"
"github.com/kopia/kopia/internal/faketime"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/ownwrites"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/internal/testutil"
@@ -306,7 +307,7 @@ func (s *contentManagerSuite) TestContentManagerWriteMultiple(t *testing.T) {
for i := 0; i < repeatCount; i++ {
b := seededRandomData(i, i%113)
blkID, err := bm.WriteContent(ctx, b, "", NoCompression)
blkID, err := bm.WriteContent(ctx, gather.FromSlice(b), "", NoCompression)
if err != nil {
t.Errorf("err: %v", err)
}
@@ -376,12 +377,12 @@ func (s *contentManagerSuite) TestContentManagerFailedToWritePack(t *testing.T)
},
}
_, err = bm.WriteContent(ctx, seededRandomData(1, 10), "", NoCompression)
_, err = bm.WriteContent(ctx, gather.FromSlice(seededRandomData(1, 10)), "", NoCompression)
if !errors.Is(err, sessionPutErr) {
t.Fatalf("can't create first content: %v", err)
}
b1, err := bm.WriteContent(ctx, seededRandomData(1, 10), "", NoCompression)
b1, err := bm.WriteContent(ctx, gather.FromSlice(seededRandomData(1, 10)), "", NoCompression)
if err != nil {
t.Fatalf("can't create content: %v", err)
}
@@ -389,7 +390,7 @@ func (s *contentManagerSuite) TestContentManagerFailedToWritePack(t *testing.T)
// advance time enough to cause auto-flush, which will fail (firstPutErr)
ta.Advance(1 * time.Hour)
if _, err := bm.WriteContent(ctx, seededRandomData(2, 10), "", NoCompression); !errors.Is(err, firstPutErr) {
if _, err := bm.WriteContent(ctx, gather.FromSlice(seededRandomData(2, 10)), "", NoCompression); !errors.Is(err, firstPutErr) {
t.Fatalf("can't create 2nd content: %v", err)
}
@@ -1857,7 +1858,7 @@ func (s *contentManagerSuite) verifyVersionCompat(t *testing.T, writeVersion For
data := make([]byte, i)
cryptorand.Read(data)
cid, err := mgr.WriteContent(ctx, data, "", NoCompression)
cid, err := mgr.WriteContent(ctx, gather.FromSlice(data), "", NoCompression)
if err != nil {
t.Fatalf("unable to write %v bytes: %v", len(data), err)
}
@@ -2003,7 +2004,7 @@ func (s *contentManagerSuite) TestCompression_Disabled(t *testing.T) {
compressibleData := bytes.Repeat([]byte{1, 2, 3, 4}, 1000)
// with index v1 the compression is disabled
_, err := bm.WriteContent(ctx, compressibleData, "", compression.ByName["pgzip"].HeaderID())
_, err := bm.WriteContent(ctx, gather.FromSlice(compressibleData), "", compression.ByName["pgzip"].HeaderID())
require.Error(t, err)
}
@@ -2020,7 +2021,7 @@ func (s *contentManagerSuite) TestCompression_CompressibleData(t *testing.T) {
compressibleData := bytes.Repeat([]byte{1, 2, 3, 4}, 1000)
headerID := compression.ByName["gzip"].HeaderID()
cid, err := bm.WriteContent(ctx, compressibleData, "", headerID)
cid, err := bm.WriteContent(ctx, gather.FromSlice(compressibleData), "", headerID)
require.NoError(t, err)
ci, err := bm.ContentInfo(ctx, cid)
@@ -2057,7 +2058,7 @@ func (s *contentManagerSuite) TestCompression_NonCompressibleData(t *testing.T)
rand.Read(nonCompressibleData)
cid, err := bm.WriteContent(ctx, nonCompressibleData, "", headerID)
cid, err := bm.WriteContent(ctx, gather.FromSlice(nonCompressibleData), "", headerID)
require.NoError(t, err)
verifyContent(ctx, t, bm, cid, nonCompressibleData)
@@ -2182,7 +2183,7 @@ func verifyContent(ctx context.Context, t *testing.T, bm *WriteManager, contentI
func writeContentAndVerify(ctx context.Context, t *testing.T, bm *WriteManager, b []byte) ID {
t.Helper()
contentID, err := bm.WriteContent(ctx, b, "", NoCompression)
contentID, err := bm.WriteContent(ctx, gather.FromSlice(b), "", NoCompression)
if err != nil {
t.Errorf("err: %v", err)
}
@@ -2220,13 +2221,13 @@ func writeContentWithRetriesAndVerify(ctx context.Context, t *testing.T, bm *Wri
t.Logf("*** starting writeContentWithRetriesAndVerify")
contentID, err := bm.WriteContent(ctx, b, "", NoCompression)
contentID, err := bm.WriteContent(ctx, gather.FromSlice(b), "", NoCompression)
for i := 0; err != nil && i < maxRetries; i++ {
retryCount++
t.Logf("*** try %v", retryCount)
contentID, err = bm.WriteContent(ctx, b, "", NoCompression)
contentID, err = bm.WriteContent(ctx, gather.FromSlice(b), "", NoCompression)
}
if err != nil {

View File

@@ -599,7 +599,7 @@ func (r *grpcRepositoryClient) doWrite(ctx context.Context, contentID content.ID
return nil
}
func (r *grpcRepositoryClient) WriteContent(ctx context.Context, data []byte, prefix content.ID, comp compression.HeaderID) (content.ID, error) {
func (r *grpcRepositoryClient) WriteContent(ctx context.Context, data gather.Bytes, prefix content.ID, comp compression.HeaderID) (content.ID, error) {
if err := content.ValidatePrefix(prefix); err != nil {
return "", errors.Wrap(err, "invalid prefix")
}
@@ -611,7 +611,7 @@ func (r *grpcRepositoryClient) WriteContent(ctx context.Context, data []byte, pr
var hashOutput [128]byte
contentID := prefix + content.ID(hex.EncodeToString(r.h(hashOutput[:0], gather.FromSlice(data))))
contentID := prefix + content.ID(hex.EncodeToString(r.h(hashOutput[:0], data)))
if r.recent.exists(contentID) {
return contentID, nil
@@ -621,15 +621,18 @@ func (r *grpcRepositoryClient) WriteContent(ctx context.Context, data []byte, pr
r.asyncWritesSemaphore <- struct{}{}
// clone so that caller can reuse the buffer
data = append([]byte(nil), data...)
var cloneBuf gather.WriteBuffer
clone := cloneBuf.CloneContiguous(data)
r.asyncWritesWG.Go(func() error {
defer func() {
// release semaphore
<-r.asyncWritesSemaphore
defer cloneBuf.Close()
}()
return r.doWrite(ctxutil.Detach(ctx), contentID, data, prefix, comp)
return r.doWrite(ctxutil.Detach(ctx), contentID, clone, prefix, comp)
})
return contentID, nil

View File

@@ -13,6 +13,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/content"
)
@@ -96,7 +97,9 @@ func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entri
man.Entries = append(man.Entries, e)
}
var buf bytes.Buffer
var buf gather.WriteBuffer
defer buf.Close()
gz := gzip.NewWriter(&buf)
mustSucceed(json.NewEncoder(gz).Encode(man))
mustSucceed(gz.Flush())

View File

@@ -13,6 +13,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/logging"
@@ -40,7 +41,7 @@
type contentManager interface {
Revision() int64
GetContent(ctx context.Context, contentID content.ID) ([]byte, error)
WriteContent(ctx context.Context, data []byte, prefix content.ID, comp compression.HeaderID) (content.ID, error)
WriteContent(ctx context.Context, data gather.Bytes, prefix content.ID, comp compression.HeaderID) (content.ID, error)
DeleteContent(ctx context.Context, contentID content.ID) error
IterateContents(ctx context.Context, options content.IterateOptions, callback content.IterateCallback) error
DisableIndexFlush(ctx context.Context)

View File

@@ -4,9 +4,11 @@
import (
"context"
"io"
"sync"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/splitter"
@@ -31,7 +33,7 @@ type contentReader interface {
type contentManager interface {
contentReader
SupportsContentCompression() bool
WriteContent(ctx context.Context, data []byte, prefix content.ID, comp compression.HeaderID) (content.ID, error)
WriteContent(ctx context.Context, data gather.Bytes, prefix content.ID, comp compression.HeaderID) (content.ID, error)
}
// Format describes the format of objects in a repository.
@@ -45,29 +47,42 @@ type Manager struct {
contentMgr contentManager
newSplitter splitter.Factory
writerPool sync.Pool
}
// NewWriter creates an ObjectWriter for writing to the repository.
func (om *Manager) NewWriter(ctx context.Context, opt WriterOptions) Writer {
w := &objectWriter{
ctx: ctx,
om: om,
splitter: om.newSplitter(),
description: opt.Description,
prefix: opt.Prefix,
compressor: compression.ByName[opt.Compressor],
}
w, _ := om.writerPool.Get().(*objectWriter)
w.ctx = ctx
w.om = om
w.splitter = om.newSplitter()
w.description = opt.Description
w.prefix = opt.Prefix
w.compressor = compression.ByName[opt.Compressor]
w.totalLength = 0
w.currentPosition = 0
// point the slice at the embedded array, so that we avoid allocations most of the time
w.indirectIndex = w.indirectIndexBuf[:0]
if opt.AsyncWrites > 0 {
w.asyncWritesSemaphore = make(chan struct{}, opt.AsyncWrites)
if len(w.asyncWritesSemaphore) != 0 || cap(w.asyncWritesSemaphore) != opt.AsyncWrites {
w.asyncWritesSemaphore = make(chan struct{}, opt.AsyncWrites)
}
} else {
w.asyncWritesSemaphore = nil
}
w.buffer.Reset()
w.contentWriteError = nil
return w
}
func (om *Manager) closedWriter(ow *objectWriter) {
om.writerPool.Put(ow)
}
// Concatenate creates an object that's a result of concatenation of other objects. This is more efficient than reading
// and rewriting the objects because Concatenate can efficiently merge index entries without reading the underlying
// contents.
@@ -174,6 +189,12 @@ func NewObjectManager(ctx context.Context, bm contentManager, f Format) (*Manage
Format: f,
}
om.writerPool = sync.Pool{
New: func() interface{} {
return new(objectWriter)
},
}
splitterID := f.Splitter
if splitterID == "" {
splitterID = "FIXED"

View File

@@ -21,6 +21,7 @@
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/internal/testutil"
"github.com/kopia/kopia/repo/blob"
@@ -47,15 +48,15 @@ func (f *fakeContentManager) GetContent(ctx context.Context, contentID content.I
return nil, content.ErrContentNotFound
}
func (f *fakeContentManager) WriteContent(ctx context.Context, data []byte, prefix content.ID, comp compression.HeaderID) (content.ID, error) {
func (f *fakeContentManager) WriteContent(ctx context.Context, data gather.Bytes, prefix content.ID, comp compression.HeaderID) (content.ID, error) {
h := sha256.New()
h.Write(data)
data.WriteTo(h)
contentID := prefix + content.ID(hex.EncodeToString(h.Sum(nil)))
f.mu.Lock()
defer f.mu.Unlock()
f.data[contentID] = append([]byte(nil), data...)
f.data[contentID] = data.ToByteSlice()
if f.compresionIDs != nil {
f.compresionIDs[contentID] = comp
}

View File

@@ -99,6 +99,10 @@ func (w *objectWriter) Close() error {
w.splitter.Close()
}
w.buffer.Close()
w.om.closedWriter(w)
return nil
}
@@ -200,7 +204,7 @@ func (w *objectWriter) prepareAndWriteContentChunk(chunkID int, data gather.Byte
return errors.Wrap(err, "unable to prepare content bytes")
}
contentID, err := w.om.contentMgr.WriteContent(w.ctx, contentBytes.ToByteSlice(), w.prefix, comp)
contentID, err := w.om.contentMgr.WriteContent(w.ctx, contentBytes, w.prefix, comp)
if err != nil {
return errors.Wrapf(err, "unable to write content chunk %v of %v: %v", chunkID, w.description, err)
}

View File

@@ -0,0 +1,62 @@
package repo_test
import (
"crypto/rand"
"testing"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/repotesting"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/object"
)
func BenchmarkWriterDedup1M(b *testing.B) {
ctx, env := repotesting.NewEnvironment(b, content.FormatVersion2)
dataBuf := make([]byte, 4<<20)
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
writer.Write(dataBuf)
_, err := writer.Result()
require.NoError(b, err)
writer.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// write exactly the same data
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
writer.Write(dataBuf)
writer.Result()
writer.Close()
}
}
func BenchmarkWriterNoDedup1M(b *testing.B) {
ctx, env := repotesting.NewEnvironment(b, content.FormatVersion2)
dataBuf := make([]byte, 4<<20)
chunkSize := 32
offset := 0
_, err := rand.Read(dataBuf)
require.NoError(b, err)
b.ResetTimer()
for i := 0; i < b.N; i++ {
// write exactly the same data
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
if i+chunkSize > len(dataBuf) {
chunkSize++
offset = 0
}
writer.Write(dataBuf[offset : offset+chunkSize])
writer.Result()
writer.Close()
offset++
}
}

View File

@@ -17,6 +17,7 @@
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/internal/testutil"
"github.com/kopia/kopia/repo"
@@ -365,7 +366,7 @@ func writeRandomContent(ctx context.Context, r repo.DirectRepositoryWriter, rs *
data := make([]byte, 1000)
cryptorand.Read(data)
contentID, err := r.ContentManager().WriteContent(ctx, data, "", content.NoCompression)
contentID, err := r.ContentManager().WriteContent(ctx, gather.FromSlice(data), "", content.NoCompression)
if err != nil {
return errors.Wrap(err, "WriteContent error")
}

View File

@@ -11,6 +11,7 @@
"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content"
@@ -97,7 +98,7 @@ type writtenBlock struct {
dataCopy := append([]byte{}, data...)
contentID, err := bm.WriteContent(ctx, data, "", content.NoCompression)
contentID, err := bm.WriteContent(ctx, gather.FromSlice(data), "", content.NoCompression)
if err != nil {
t.Errorf("err: %v", err)
return