From e22d22dba2c965cebddfdf85d76ca8aad392205d Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Fri, 11 Sep 2020 20:12:01 -0700 Subject: [PATCH] object: implemented fast concatenation of objects by merging their index entries (#607) --- repo/object/object_manager.go | 99 +++++++++++++++++++++ repo/object/object_manager_test.go | 138 +++++++++++++++++++++++++++++ repo/object/object_writer.go | 22 +++-- 3 files changed, 252 insertions(+), 7 deletions(-) diff --git a/repo/object/object_manager.go b/repo/object/object_manager.go index 3fcac7d32..01088d8b4 100644 --- a/repo/object/object_manager.go +++ b/repo/object/object_manager.go @@ -80,6 +80,105 @@ func (om *Manager) Open(ctx context.Context, objectID ID) (Reader, error) { return om.openAndAssertLength(ctx, objectID, -1) } +// Concatenate creates an object that's a result of concatenation of other objects. This is more efficient than reading +// and rewriting the objects because Concatenate can efficiently merge index entries without reading the underlying +// contents. +// +// This function exists primarily to facilitate efficient parallel uploads of very large files (>1GB). Due to bottleneck of +// splitting which is inherently sequential, we can only one use CPU core for each Writer, which limits throughput. +// +// For example when uploading a 100 GB file it is beneficial to independently upload sections of [0..25GB), +// [25..50GB), [50GB..75GB) and [75GB..100GB) and concatenate them together as this allows us to run four splitters +// in parallel utilizing more CPU cores. Because some split points now start at fixed bounaries and not content-specific, +// this causes some slight loss of deduplication at concatenation points (typically 1-2 contents, usually <10MB), +// so this method should only be used for very large files where this overhead is relatively small. +func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error) { + if len(objectIDs) == 0 { + return "", errors.Errorf("empty list of objects") + } + + if len(objectIDs) == 1 { + return objectIDs[0], nil + } + + var ( + concatenatedEntries []indirectObjectEntry + totalLength int64 + err error + ) + + for _, objectID := range objectIDs { + concatenatedEntries, totalLength, err = om.appendIndexEntriesForObject(ctx, concatenatedEntries, totalLength, objectID) + if err != nil { + return "", errors.Wrapf(err, "error appending %v", objectID) + } + } + + log(ctx).Debugf("concatenated: %v total: %v", concatenatedEntries, totalLength) + + w := om.NewWriter(ctx, WriterOptions{ + Prefix: indirectContentPrefix, + Description: "CONCATENATED INDEX", + }) + defer w.Close() // nolint:errcheck + + if werr := writeIndirectObject(w, concatenatedEntries); werr != nil { + return "", werr + } + + concatID, err := w.Result() + if err != nil { + return "", errors.Wrap(err, "error writing concatenated index") + } + + return IndirectObjectID(concatID), nil +} + +func (om *Manager) appendIndexEntriesForObject(ctx context.Context, indexEntries []indirectObjectEntry, startingLength int64, objectID ID) (result []indirectObjectEntry, totalLength int64, _ error) { + if indexObjectID, ok := objectID.IndexObjectID(); ok { + ndx, err := om.loadSeekTable(ctx, indexObjectID) + if err != nil { + return nil, 0, errors.Wrapf(err, "error reading index of %v", objectID) + } + + indexEntries, totalLength = appendIndexEntries(indexEntries, startingLength, ndx...) + + return indexEntries, totalLength, nil + } + + // non-index object - the precise length of the object cannot be determined from content due to compression and padding, + // so we must open the object to read its length. + r, err := om.Open(ctx, objectID) + if err != nil { + return nil, 0, errors.Wrapf(err, "error opening %v", objectID) + } + defer r.Close() //nolint:errcheck + + indexEntries, totalLength = appendIndexEntries(indexEntries, startingLength, indirectObjectEntry{ + Start: 0, + Length: r.Length(), + Object: objectID, + }) + + return indexEntries, totalLength, nil +} + +func appendIndexEntries(indexEntries []indirectObjectEntry, startingLength int64, incoming ...indirectObjectEntry) (result []indirectObjectEntry, totalLength int64) { + totalLength = startingLength + + for _, inc := range incoming { + indexEntries = append(indexEntries, indirectObjectEntry{ + Start: inc.Start + startingLength, + Length: inc.Length, + Object: inc.Object, + }) + + totalLength += inc.Length + } + + return indexEntries, totalLength +} + func (om *Manager) openAndAssertLength(ctx context.Context, objectID ID, assertLength int64) (Reader, error) { if indexObjectID, ok := objectID.IndexObjectID(); ok { // recursively calls openAndAssertLength diff --git a/repo/object/object_manager_test.go b/repo/object/object_manager_test.go index 5b7582bbb..467841871 100644 --- a/repo/object/object_manager_test.go +++ b/repo/object/object_manager_test.go @@ -16,6 +16,8 @@ "sync" "testing" + "github.com/pkg/errors" + "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/compression" @@ -252,6 +254,142 @@ func TestHMAC(t *testing.T) { } } +// nolint:gocyclo +func TestConcatenate(t *testing.T) { + ctx := testlogging.Context(t) + _, om := setupTest(t) + + phrase := []byte("hello world\n") + phraseLength := len(phrase) + shortRepeatCount := 17 + longRepeatCount := 999999 + + emptyObject := mustWriteObject(t, om, nil, "") + + // short uncompressed object - + shortUncompressedOID := mustWriteObject(t, om, bytes.Repeat(phrase, shortRepeatCount), "") + + // long uncompressed object - Ix + longUncompressedOID := mustWriteObject(t, om, bytes.Repeat(phrase, longRepeatCount), "") + + // short compressed object - Z + shortCompressedOID := mustWriteObject(t, om, bytes.Repeat(phrase, shortRepeatCount), "pgzip") + + // long compressed object - Ix + longCompressedOID := mustWriteObject(t, om, bytes.Repeat(phrase, longRepeatCount), "pgzip") + + if _, compressed, ok := shortUncompressedOID.ContentID(); !ok || compressed { + t.Errorf("invalid test assumption - shortUncompressedOID %v", shortUncompressedOID) + } + + if _, isIndex := longUncompressedOID.IndexObjectID(); !isIndex { + t.Errorf("invalid test assumption - longUncompressedOID %v", longUncompressedOID) + } + + if _, compressed, ok := shortCompressedOID.ContentID(); !ok || !compressed { + t.Errorf("invalid test assumption - shortCompressedOID %v", shortCompressedOID) + } + + if _, isIndex := longCompressedOID.IndexObjectID(); !isIndex { + t.Errorf("invalid test assumption - longCompressedOID %v", longCompressedOID) + } + + shortLength := phraseLength * shortRepeatCount + longLength := phraseLength * longRepeatCount + + cases := []struct { + inputs []ID + wantLength int + }{ + {[]ID{emptyObject}, 0}, + {[]ID{shortUncompressedOID}, shortLength}, + {[]ID{longUncompressedOID}, longLength}, + {[]ID{shortCompressedOID}, shortLength}, + {[]ID{longCompressedOID}, longLength}, + + {[]ID{shortUncompressedOID, shortUncompressedOID}, 2 * shortLength}, + {[]ID{shortUncompressedOID, shortCompressedOID}, 2 * shortLength}, + {[]ID{emptyObject, longCompressedOID, shortCompressedOID, emptyObject, longCompressedOID, shortUncompressedOID, shortUncompressedOID, emptyObject, emptyObject}, 2*longLength + 3*shortLength}, + } + + for _, tc := range cases { + concatenatedOID, err := om.Concatenate(ctx, tc.inputs) + if err != nil { + t.Fatal(err) + } + + r, err := om.Open(ctx, concatenatedOID) + if err != nil { + t.Fatal(err) + } + + gotLength := int(r.Length()) + r.Close() + + if gotLength != tc.wantLength { + t.Errorf("invalid length for %v: %v, want %v", tc.inputs, gotLength, tc.wantLength) + } + + b := make([]byte, len(phrase)) + + // read the concatenated object in buffers the size of a single phrase, each buffer should be identical. + for n, readerr := r.Read(b); ; n, readerr = r.Read(b) { + if errors.Is(readerr, io.EOF) { + break + } + + if n != len(b) { + t.Errorf("invalid length: %v", n) + } + + if !bytes.Equal(b, phrase) { + t.Errorf("invalid buffer: %v", n) + } + } + + if _, err = om.VerifyObject(ctx, concatenatedOID); err != nil { + t.Fatalf("verify error: %v", err) + } + + // make sure results of concatenation can be further concatenated. + concatenated3OID, err := om.Concatenate(ctx, []ID{concatenatedOID, concatenatedOID, concatenatedOID}) + if err != nil { + t.Fatal(err) + } + + r, err = om.Open(ctx, concatenated3OID) + if err != nil { + t.Fatal(err) + } + + gotLength = int(r.Length()) + r.Close() + + if gotLength != tc.wantLength*3 { + t.Errorf("invalid twice-concatenated object length: %v, want %v", gotLength, tc.wantLength*3) + } + } +} + +func mustWriteObject(t *testing.T, om *Manager, data []byte, compressor compression.Name) ID { + t.Helper() + + w := om.NewWriter(testlogging.Context(t), WriterOptions{Compressor: compressor}) + defer w.Close() + + _, err := w.Write(data) + if err != nil { + t.Fatal(err) + } + + oid, err := w.Result() + if err != nil { + t.Fatal(err) + } + + return oid +} + func TestReader(t *testing.T) { ctx := testlogging.Context(t) data, om := setupTest(t) diff --git a/repo/object/object_writer.go b/repo/object/object_writer.go index b0e8e5f0d..9b04c7a4f 100644 --- a/repo/object/object_writer.go +++ b/repo/object/object_writer.go @@ -267,13 +267,8 @@ func (w *objectWriter) Result() (ID, error) { defer iw.Close() //nolint:errcheck - ind := indirectObject{ - StreamID: "kopia:indirect", - Entries: w.indirectIndex, - } - - if err := json.NewEncoder(iw).Encode(ind); err != nil { - return "", errors.Wrap(err, "unable to write indirect object index") + if err := writeIndirectObject(iw, w.indirectIndex); err != nil { + return "", err } oid, err := iw.Result() @@ -284,6 +279,19 @@ func (w *objectWriter) Result() (ID, error) { return IndirectObjectID(oid), nil } +func writeIndirectObject(w io.Writer, entries []indirectObjectEntry) error { + ind := indirectObject{ + StreamID: "kopia:indirect", + Entries: entries, + } + + if err := json.NewEncoder(w).Encode(ind); err != nil { + return errors.Wrap(err, "unable to write indirect object index") + } + + return nil +} + // WriterOptions can be passed to Repository.NewWriter(). type WriterOptions struct { Description string