From 68b8afd43fc24a1d7fe93880c850e8ff2d3588d6 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Fri, 24 Jun 2022 00:38:07 -0700 Subject: [PATCH] feat(snapshots): improved performance when uploading huge files (#2064) * feat(snapshots): improved performance when uploading huge files This is controlled by an upload policy which specifies the size threshold above which indvidual files are uploaded in parts and concatenated. This allows multiple threads to run splitting, hashing, compression and encryption in parallel, which was previously only possible across multiple files, but not when a single file was being uploaded. The default is 2GiB for now, so this feature only kicks in for very larger files. In the future we may lower this. Benchmark involved uploading a single 42.1 GB file which was a VM disk snapshot of fresh Ubuntu installation (fresh EXT4 partition with lots of zero bytes) to a brand-new filesystem repository on local SSD of M1 Pro Macbook Pro 2021. * before: 59-63s (~700 MB/s) * after: 15-17s (~2.6 GB/s) * additional test to ensure files are really e2e readable --- cli/command_policy_set.go | 36 +++++++ cli/command_policy_set_upload.go | 10 +- cli/command_policy_set_upload_test.go | 8 +- cli/command_policy_show.go | 9 ++ repo/api_server_repository.go | 6 ++ repo/grpc_repository_client.go | 6 ++ repo/object/indirect.go | 6 +- repo/object/object_manager.go | 12 +-- repo/object/object_reader.go | 11 ++- repo/object/object_writer.go | 8 +- repo/repository.go | 7 ++ snapshot/policy/optional.go | 16 +++ snapshot/policy/policy_merge.go | 9 ++ snapshot/policy/policy_merge_test.go | 8 ++ snapshot/policy/policy_tree.go | 3 + snapshot/policy/upload_policy.go | 11 ++- snapshot/snapshotfs/upload.go | 75 +++++++++++++- snapshot/snapshotfs/upload_test.go | 136 ++++++++++++++++++++++++++ 18 files changed, 350 insertions(+), 27 deletions(-) diff --git a/cli/command_policy_set.go b/cli/command_policy_set.go index eb6159495..9671fd7cf 100644 --- a/cli/command_policy_set.go +++ b/cli/command_policy_set.go @@ -7,6 +7,7 @@ "github.com/pkg/errors" + "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/compression" "github.com/kopia/kopia/snapshot/policy" @@ -201,6 +202,41 @@ func applyOptionalInt(ctx context.Context, desc string, val **policy.OptionalInt return nil } +func applyOptionalInt64MiB(ctx context.Context, desc string, val **policy.OptionalInt64, str string, changeCount *int) error { + if str == "" { + // not changed + return nil + } + + if str == inheritPolicyString || str == defaultPolicyString { + *changeCount++ + + log(ctx).Infof(" - resetting %q to a default value inherited from parent.", desc) + + *val = nil + + return nil + } + + // nolint:gomnd + v, err := strconv.ParseInt(str, 10, 32) + if err != nil { + return errors.Wrapf(err, "can't parse the %v %q", desc, str) + } + + // convert MiB to bytes + v *= 1 << 20 // nolint:gomnd + + i := policy.OptionalInt64(v) + *changeCount++ + + log(ctx).Infof(" - setting %q to %v.", desc, units.BytesStringBase2(v)) + + *val = &i + + return nil +} + func applyPolicyNumber64(ctx context.Context, desc string, val *int64, str string, changeCount *int) error { if str == "" { // not changed diff --git a/cli/command_policy_set_upload.go b/cli/command_policy_set_upload.go index fcc0afd79..9f9c699db 100644 --- a/cli/command_policy_set_upload.go +++ b/cli/command_policy_set_upload.go @@ -9,13 +9,15 @@ ) type policyUploadFlags struct { - maxParallelUploads string - maxParallelFileReads string + maxParallelUploads string + maxParallelFileReads string + parallelizeUploadAboveSizeMiB string } func (c *policyUploadFlags) setup(cmd *kingpin.CmdClause) { cmd.Flag("max-parallel-file-reads", "Maximum number of parallel file reads").StringVar(&c.maxParallelFileReads) cmd.Flag("max-parallel-snapshots", "Maximum number of parallel snapshots (server, KopiaUI only)").StringVar(&c.maxParallelUploads) + cmd.Flag("parallel-upload-above-size-mib", "Use parallel uploads above size").StringVar(&c.parallelizeUploadAboveSizeMiB) } func (c *policyUploadFlags) setUploadPolicyFromFlags(ctx context.Context, up *policy.UploadPolicy, changeCount *int) error { @@ -27,5 +29,9 @@ func (c *policyUploadFlags) setUploadPolicyFromFlags(ctx context.Context, up *po return err } + if err := applyOptionalInt64MiB(ctx, "parallel upload above size", &up.ParallelUploadAboveSize, c.parallelizeUploadAboveSizeMiB, changeCount); err != nil { + return err + } + return nil } diff --git a/cli/command_policy_set_upload_test.go b/cli/command_policy_set_upload_test.go index 5a4c2dce4..0469b10d8 100644 --- a/cli/command_policy_set_upload_test.go +++ b/cli/command_policy_set_upload_test.go @@ -19,6 +19,7 @@ func TestSetUploadPolicy(t *testing.T) { lines = compressSpaces(lines) require.Contains(t, lines, " Max parallel snapshots (server/UI): 1 (defined for this target)") require.Contains(t, lines, " Max parallel file reads: - (defined for this target)") + require.Contains(t, lines, " Parallel upload above size: 2 GiB (defined for this target)") // make some directory we'll be setting policy on td := testutil.TempDirectory(t) @@ -27,20 +28,23 @@ func TestSetUploadPolicy(t *testing.T) { lines = compressSpaces(lines) require.Contains(t, lines, " Max parallel snapshots (server/UI): 1 inherited from (global)") require.Contains(t, lines, " Max parallel file reads: - inherited from (global)") + require.Contains(t, lines, " Parallel upload above size: 2 GiB inherited from (global)") - e.RunAndExpectSuccess(t, "policy", "set", "--global", "--max-parallel-snapshots=7", "--max-parallel-file-reads=33") + e.RunAndExpectSuccess(t, "policy", "set", "--global", "--max-parallel-snapshots=7", "--max-parallel-file-reads=33", "--parallel-upload-above-size-mib=4096") lines = e.RunAndExpectSuccess(t, "policy", "show", td) lines = compressSpaces(lines) require.Contains(t, lines, " Max parallel snapshots (server/UI): 7 inherited from (global)") require.Contains(t, lines, " Max parallel file reads: 33 inherited from (global)") + require.Contains(t, lines, " Parallel upload above size: 4 GiB inherited from (global)") - e.RunAndExpectSuccess(t, "policy", "set", "--global", "--max-parallel-snapshots=default", "--max-parallel-file-reads=default") + e.RunAndExpectSuccess(t, "policy", "set", "--global", "--max-parallel-snapshots=default", "--max-parallel-file-reads=default", "--parallel-upload-above-size-mib=default") lines = e.RunAndExpectSuccess(t, "policy", "show", td) lines = compressSpaces(lines) require.Contains(t, lines, " Max parallel snapshots (server/UI): 1 inherited from (global)") require.Contains(t, lines, " Max parallel file reads: - inherited from (global)") + require.Contains(t, lines, " Parallel upload above size: 2 GiB inherited from (global)") } diff --git a/cli/command_policy_show.go b/cli/command_policy_show.go index e7fca4583..53588b6e2 100644 --- a/cli/command_policy_show.go +++ b/cli/command_policy_show.go @@ -270,6 +270,7 @@ func appendUploadPolicyRows(rows []policyTableRow, p *policy.Policy, def *policy policyTableRow{"Uploads:", "", ""}, policyTableRow{" Max parallel snapshots (server/UI):", valueOrNotSet(p.UploadPolicy.MaxParallelSnapshots), definitionPointToString(p.Target(), def.UploadPolicy.MaxParallelSnapshots)}, policyTableRow{" Max parallel file reads:", valueOrNotSet(p.UploadPolicy.MaxParallelFileReads), definitionPointToString(p.Target(), def.UploadPolicy.MaxParallelFileReads)}, + policyTableRow{" Parallel upload above size:", valueOrNotSetOptionalInt64Bytes(p.UploadPolicy.ParallelUploadAboveSize), definitionPointToString(p.Target(), def.UploadPolicy.ParallelUploadAboveSize)}, ) } @@ -435,3 +436,11 @@ func valueOrNotSet(p *policy.OptionalInt) string { return fmt.Sprintf("%v", *p) } + +func valueOrNotSetOptionalInt64Bytes(p *policy.OptionalInt64) string { + if p == nil { + return "-" + } + + return units.BytesStringBase2(int64(*p)) +} diff --git a/repo/api_server_repository.go b/repo/api_server_repository.go index 30224fe50..4439d1da6 100644 --- a/repo/api_server_repository.go +++ b/repo/api_server_repository.go @@ -68,6 +68,12 @@ func (r *apiServerRepository) NewObjectWriter(ctx context.Context, opt object.Wr return r.omgr.NewWriter(ctx, opt) } +// ConcatenateObjects creates a concatenated objects from the provided object IDs. +func (r *apiServerRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) { + // nolint:wrapcheck + return r.omgr.Concatenate(ctx, objectIDs) +} + func (r *apiServerRepository) VerifyObject(ctx context.Context, id object.ID) ([]content.ID, error) { // nolint:wrapcheck return object.VerifyObject(ctx, r, id) diff --git a/repo/grpc_repository_client.go b/repo/grpc_repository_client.go index 2b1d9b716..78441b55a 100644 --- a/repo/grpc_repository_client.go +++ b/repo/grpc_repository_client.go @@ -473,6 +473,12 @@ func (r *grpcRepositoryClient) NewWriter(ctx context.Context, opt WriteSessionOp return ctx, w, nil } +// ConcatenateObjects creates a concatenated objects from the provided object IDs. +func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) { + // nolint:wrapcheck + return r.omgr.Concatenate(ctx, objectIDs) +} + type sessionAttemptFunc func(ctx context.Context, sess *grpcInnerSession) (interface{}, error) // maybeRetry executes the provided callback with or without automatic retries depending on how diff --git a/repo/object/indirect.go b/repo/object/indirect.go index ce5d31dae..252e6c181 100644 --- a/repo/object/indirect.go +++ b/repo/object/indirect.go @@ -1,13 +1,13 @@ package object -// indirectObjectEntry represents an entry in indirect object stream. -type indirectObjectEntry struct { +// IndirectObjectEntry represents an entry in indirect object stream. +type IndirectObjectEntry struct { Start int64 `json:"s,omitempty"` Length int64 `json:"l,omitempty"` Object ID `json:"o,omitempty"` } -func (i *indirectObjectEntry) endOffset() int64 { +func (i *IndirectObjectEntry) endOffset() int64 { return i.Start + i.Length } diff --git a/repo/object/object_manager.go b/repo/object/object_manager.go index f16b93a4f..c8f6180a6 100644 --- a/repo/object/object_manager.go +++ b/repo/object/object_manager.go @@ -106,7 +106,7 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error) } var ( - concatenatedEntries []indirectObjectEntry + concatenatedEntries []IndirectObjectEntry totalLength int64 err error ) @@ -138,9 +138,9 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error) return IndirectObjectID(concatID), nil } -func appendIndexEntriesForObject(ctx context.Context, cr contentReader, indexEntries []indirectObjectEntry, startingLength int64, objectID ID) (result []indirectObjectEntry, totalLength int64, _ error) { +func appendIndexEntriesForObject(ctx context.Context, cr contentReader, indexEntries []IndirectObjectEntry, startingLength int64, objectID ID) (result []IndirectObjectEntry, totalLength int64, _ error) { if indexObjectID, ok := objectID.IndexObjectID(); ok { - ndx, err := loadSeekTable(ctx, cr, indexObjectID) + ndx, err := LoadIndexObject(ctx, cr, indexObjectID) if err != nil { return nil, 0, errors.Wrapf(err, "error reading index of %v", objectID) } @@ -158,7 +158,7 @@ func appendIndexEntriesForObject(ctx context.Context, cr contentReader, indexEnt } defer r.Close() //nolint:errcheck - indexEntries, totalLength = appendIndexEntries(indexEntries, startingLength, indirectObjectEntry{ + indexEntries, totalLength = appendIndexEntries(indexEntries, startingLength, IndirectObjectEntry{ Start: 0, Length: r.Length(), Object: objectID, @@ -167,11 +167,11 @@ func appendIndexEntriesForObject(ctx context.Context, cr contentReader, indexEnt return indexEntries, totalLength, nil } -func appendIndexEntries(indexEntries []indirectObjectEntry, startingLength int64, incoming ...indirectObjectEntry) (result []indirectObjectEntry, totalLength int64) { +func appendIndexEntries(indexEntries []IndirectObjectEntry, startingLength int64, incoming ...IndirectObjectEntry) (result []IndirectObjectEntry, totalLength int64) { totalLength = startingLength for _, inc := range incoming { - indexEntries = append(indexEntries, indirectObjectEntry{ + indexEntries = append(indexEntries, IndirectObjectEntry{ Start: inc.Start + startingLength, Length: inc.Length, Object: inc.Object, diff --git a/repo/object/object_reader.go b/repo/object/object_reader.go index 34d780d5a..c7e1b3d0e 100644 --- a/repo/object/object_reader.go +++ b/repo/object/object_reader.go @@ -41,7 +41,7 @@ type objectReader struct { cr contentReader - seekTable []indirectObjectEntry + seekTable []IndirectObjectEntry currentPosition int64 // Overall position in the objectReader totalLength int64 // Overall length @@ -202,7 +202,7 @@ func (r *objectReader) Length() int64 { func openAndAssertLength(ctx context.Context, cr contentReader, objectID ID, assertLength int64) (Reader, error) { if indexObjectID, ok := objectID.IndexObjectID(); ok { // recursively calls openAndAssertLength - seekTable, err := loadSeekTable(ctx, cr, indexObjectID) + seekTable, err := LoadIndexObject(ctx, cr, indexObjectID) if err != nil { return nil, err } @@ -225,7 +225,7 @@ func iterateIndirectObjectContents(ctx context.Context, cr contentReader, indexO return errors.Wrap(err, "unable to read index") } - seekTable, err := loadSeekTable(ctx, cr, indexObjectID) + seekTable, err := LoadIndexObject(ctx, cr, indexObjectID) if err != nil { return err } @@ -260,10 +260,11 @@ func iterateBackingContents(ctx context.Context, r contentReader, oid ID, tracke type indirectObject struct { StreamID string `json:"stream"` - Entries []indirectObjectEntry `json:"entries"` + Entries []IndirectObjectEntry `json:"entries"` } -func loadSeekTable(ctx context.Context, cr contentReader, indexObjectID ID) ([]indirectObjectEntry, error) { +// LoadIndexObject returns entries comprising index object. +func LoadIndexObject(ctx context.Context, cr contentReader, indexObjectID ID) ([]IndirectObjectEntry, error) { r, err := openAndAssertLength(ctx, cr, indexObjectID, -1) if err != nil { return nil, err diff --git a/repo/object/object_writer.go b/repo/object/object_writer.go index 78ea671a7..a22e4475d 100644 --- a/repo/object/object_writer.go +++ b/repo/object/object_writer.go @@ -77,8 +77,8 @@ type objectWriter struct { currentPosition int64 indirectIndexGrowMutex sync.Mutex - indirectIndex []indirectObjectEntry - indirectIndexBuf [4]indirectObjectEntry // small buffer so that we avoid allocations most of the time + indirectIndex []IndirectObjectEntry + indirectIndexBuf [4]IndirectObjectEntry // small buffer so that we avoid allocations most of the time description string @@ -143,7 +143,7 @@ func (w *objectWriter) flushBuffer() error { // hold a lock as we may grow the index w.indirectIndexGrowMutex.Lock() chunkID := len(w.indirectIndex) - w.indirectIndex = append(w.indirectIndex, indirectObjectEntry{}) + w.indirectIndex = append(w.indirectIndex, IndirectObjectEntry{}) w.indirectIndex[chunkID].Start = w.currentPosition w.indirectIndex[chunkID].Length = int64(length) w.currentPosition += int64(length) @@ -316,7 +316,7 @@ func (w *objectWriter) checkpointLocked() (ID, error) { return IndirectObjectID(oid), nil } -func writeIndirectObject(w io.Writer, entries []indirectObjectEntry) error { +func writeIndirectObject(w io.Writer, entries []IndirectObjectEntry) error { ind := indirectObject{ StreamID: "kopia:indirect", Entries: entries, diff --git a/repo/repository.go b/repo/repository.go index 8d5b84409..1b0b3d189 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -41,6 +41,7 @@ type RepositoryWriter interface { Repository NewObjectWriter(ctx context.Context, opt object.WriterOptions) object.Writer + ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) PutManifest(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error) DeleteManifest(ctx context.Context, id manifest.ID) error Flush(ctx context.Context) error @@ -153,6 +154,12 @@ func (r *directRepository) NewObjectWriter(ctx context.Context, opt object.Write return r.omgr.NewWriter(ctx, opt) } +// ConcatenateObjects creates a concatenated objects from the provided object IDs. +func (r *directRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) { + // nolint:wrapcheck + return r.omgr.Concatenate(ctx, objectIDs) +} + // DisableIndexRefresh disables index refresh for the duration of the write session. func (r *directRepository) DisableIndexRefresh() { r.cmgr.DisableIndexRefresh() diff --git a/snapshot/policy/optional.go b/snapshot/policy/optional.go index d57d4682d..f9e6a2afd 100644 --- a/snapshot/policy/optional.go +++ b/snapshot/policy/optional.go @@ -31,3 +31,19 @@ func (b *OptionalInt) OrDefault(def int) int { func newOptionalInt(b OptionalInt) *OptionalInt { return &b } + +// OptionalInt64 provides convenience methods for manipulating optional integers. +type OptionalInt64 int64 + +// OrDefault returns the value of the integer or provided default if it's nil. +func (b *OptionalInt64) OrDefault(def int64) int64 { + if b == nil { + return def + } + + return int64(*b) +} + +func newOptionalInt64(b OptionalInt64) *OptionalInt64 { + return &b +} diff --git a/snapshot/policy/policy_merge.go b/snapshot/policy/policy_merge.go index e3bb060bf..b2e87f518 100644 --- a/snapshot/policy/policy_merge.go +++ b/snapshot/policy/policy_merge.go @@ -67,6 +67,15 @@ func mergeOptionalInt(target **OptionalInt, src *OptionalInt, def *snapshot.Sour } } +func mergeOptionalInt64(target **OptionalInt64, src *OptionalInt64, def *snapshot.SourceInfo, si snapshot.SourceInfo) { + if *target == nil && src != nil { + v := *src + + *target = &v + *def = si + } +} + func mergeStringsReplace(target *[]string, src []string, def *snapshot.SourceInfo, si snapshot.SourceInfo) { if len(*target) == 0 && len(src) > 0 { *target = src diff --git a/snapshot/policy/policy_merge_test.go b/snapshot/policy/policy_merge_test.go index c38c8004e..cb4bb9f46 100644 --- a/snapshot/policy/policy_merge_test.go +++ b/snapshot/policy/policy_merge_test.go @@ -117,6 +117,14 @@ func testPolicyMergeSingleField(t *testing.T, fieldName string, typ reflect.Type v1 = reflect.ValueOf(&ob1) v2 = reflect.ValueOf(&ob2) + case "*policy.OptionalInt64": + ob1 := policy.OptionalInt64(1) + ob2 := policy.OptionalInt64(7) + + v0 = reflect.ValueOf((*policy.OptionalInt64)(nil)) + v1 = reflect.ValueOf(&ob1) + v2 = reflect.ValueOf(&ob2) + case "bool": v0 = reflect.ValueOf(false) v1 = reflect.ValueOf(false) diff --git a/snapshot/policy/policy_tree.go b/snapshot/policy/policy_tree.go index efc1314ae..109c6baf3 100644 --- a/snapshot/policy/policy_tree.go +++ b/snapshot/policy/policy_tree.go @@ -54,6 +54,9 @@ defaultUploadPolicy = UploadPolicy{ MaxParallelSnapshots: newOptionalInt(1), MaxParallelFileReads: nil, // defaults to runtime.NumCPUs() + + // upload large files in chunks of 2 GiB + ParallelUploadAboveSize: newOptionalInt64(2 << 30), // nolint:gomnd } // DefaultPolicy is a default policy returned by policy tree in absence of other policies. diff --git a/snapshot/policy/upload_policy.go b/snapshot/policy/upload_policy.go index 5b322e045..3019334d5 100644 --- a/snapshot/policy/upload_policy.go +++ b/snapshot/policy/upload_policy.go @@ -8,20 +8,23 @@ // UploadPolicy describes policy to apply when uploading snapshots. type UploadPolicy struct { - MaxParallelSnapshots *OptionalInt `json:"maxParallelSnapshots,omitempty"` - MaxParallelFileReads *OptionalInt `json:"maxParallelFileReads,omitempty"` + MaxParallelSnapshots *OptionalInt `json:"maxParallelSnapshots,omitempty"` + MaxParallelFileReads *OptionalInt `json:"maxParallelFileReads,omitempty"` + ParallelUploadAboveSize *OptionalInt64 `json:"parallelUploadAboveSize,omitempty"` } // UploadPolicyDefinition specifies which policy definition provided the value of a particular field. type UploadPolicyDefinition struct { - MaxParallelSnapshots snapshot.SourceInfo `json:"maxParallelSnapshots,omitempty"` - MaxParallelFileReads snapshot.SourceInfo `json:"maxParallelFileReads,omitempty"` + MaxParallelSnapshots snapshot.SourceInfo `json:"maxParallelSnapshots,omitempty"` + MaxParallelFileReads snapshot.SourceInfo `json:"maxParallelFileReads,omitempty"` + ParallelUploadAboveSize snapshot.SourceInfo `json:"parallelUploadAboveSize,omitempty"` } // Merge applies default values from the provided policy. func (p *UploadPolicy) Merge(src UploadPolicy, def *UploadPolicyDefinition, si snapshot.SourceInfo) { mergeOptionalInt(&p.MaxParallelSnapshots, src.MaxParallelSnapshots, &def.MaxParallelSnapshots, si) mergeOptionalInt(&p.MaxParallelFileReads, src.MaxParallelFileReads, &def.MaxParallelFileReads, si) + mergeOptionalInt64(&p.ParallelUploadAboveSize, src.ParallelUploadAboveSize, &def.ParallelUploadAboveSize, si) } // ValidateUploadPolicy returns an error if manual field is set along with Upload fields. diff --git a/snapshot/snapshotfs/upload.go b/snapshot/snapshotfs/upload.go index 5da983f26..d1c1e7899 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/snapshotfs/upload.go @@ -13,10 +13,12 @@ "sync/atomic" "time" + "github.com/google/uuid" "github.com/pkg/errors" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "go.uber.org/multierr" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/ignorefs" @@ -158,7 +160,78 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis comp := pol.CompressionPolicy.CompressorForFile(f) - return u.uploadFileData(ctx, parentCheckpointRegistry, f, f.Name(), 0, -1, comp) + chunkSize := pol.UploadPolicy.ParallelUploadAboveSize.OrDefault(-1) + if chunkSize < 0 || f.Size() <= chunkSize { + // all data fits in 1 full chunks, upload directly + return u.uploadFileData(ctx, parentCheckpointRegistry, f, f.Name(), 0, -1, comp) + } + + // we always have N+1 parts, first N are exactly chunkSize, last one has undetermined length + fullParts := f.Size() / chunkSize + + // directory entries and errors for partial upload results + parts := make([]*snapshot.DirEntry, fullParts+1) + partErrors := make([]error, fullParts+1) + + uploadLog(ctx).Debugf("performing chunked upload for %v (%v parts)", relativePath, len(parts)) + + var wg workshare.AsyncGroup + defer wg.Close() + + for i := 0; i < len(parts); i++ { + i := i + offset := int64(i) * chunkSize + + length := chunkSize + if i == len(parts)-1 { + // last part has unknown length to accommodate the file that may be growing as we're snapshotting it + length = -1 + } + + if wg.CanShareWork(u.workerPool) { + // another goroutine is available, delegate to them + wg.RunAsync(u.workerPool, func(c *workshare.Pool, request interface{}) { + parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp) + }, nil) + } else { + // just do the work in the current goroutine + parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp) + } + } + + wg.Wait() + + // see if we got any errors + if err := multierr.Combine(partErrors...); err != nil { + return nil, errors.Wrap(err, "error uploading parts") + } + + return concatenateParts(ctx, u.repo, f.Name(), parts) +} + +func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name string, parts []*snapshot.DirEntry) (*snapshot.DirEntry, error) { + var ( + objectIDs []object.ID + totalSize int64 + ) + + // resulting size is the sum of all parts and resulting object ID is concatenation of individual object IDs. + for _, part := range parts { + totalSize += part.FileSize + objectIDs = append(objectIDs, part.ObjectID) + } + + resultObject, err := rep.ConcatenateObjects(ctx, objectIDs) + if err != nil { + return nil, errors.Wrap(err, "concatenate") + } + + de := parts[0] + de.Name = name + de.FileSize = totalSize + de.ObjectID = resultObject + + return de, nil } func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, f fs.File, fname string, offset, length int64, compressor compression.Name) (*snapshot.DirEntry, error) { diff --git a/snapshot/snapshotfs/upload_test.go b/snapshot/snapshotfs/upload_test.go index 3c764b5d2..f79160d64 100644 --- a/snapshot/snapshotfs/upload_test.go +++ b/snapshot/snapshotfs/upload_test.go @@ -4,11 +4,13 @@ "context" "crypto/rand" "fmt" + "io" "os" "path/filepath" "reflect" "runtime/debug" "sort" + "strings" "sync/atomic" "testing" "time" @@ -929,6 +931,140 @@ func TestParallelUploadDedup(t *testing.T) { require.Less(t, testutil.MustGetTotalDirSize(t, th.repoDir), int64(51000000)) } +func TestParallelUploadOfLargeFiles(t *testing.T) { + ctx := testlogging.Context(t) + th := newUploadTestHarness(ctx, t) + + defer th.cleanup() + + u := NewUploader(th.repo) + u.ParallelUploads = 10 + + pol := *policy.DefaultPolicy + + // change policies so that all files above this size are uploaded in parallel + // use an unusual number so that it's easy to spot. + const chunkSize = 10203040 + + // future reader, the chunk size must be greater than 4 MiB to make sure splitters are + // not used in degenerate form. + require.Greater(t, chunkSize, 4<<20) + + n := policy.OptionalInt64(chunkSize) + pol.UploadPolicy.ParallelUploadAboveSize = &n + + policyTree := policy.BuildTree(nil, &pol) + + testutil.TestSkipOnCIUnlessLinuxAMD64(t) + td := testutil.TempDirectory(t) + + // Write 2 x 50MB files + var files []*os.File + + for i := 0; i < 2; i++ { + f, cerr := os.Create(filepath.Join(td, fmt.Sprintf("file-%v", i))) + require.NoError(t, cerr) + + files = append(files, f) + } + + for j := 0; j < 1000; j++ { + buf := make([]byte, 50000) + + for _, f := range files { + rand.Read(buf) + + _, werr := f.Write(buf) + require.NoError(t, werr) + } + } + + for _, f := range files { + f.Close() + } + + srcdir, err := localfs.Directory(td) + require.NoError(t, err) + + man, err := u.Upload(ctx, srcdir, policyTree, snapshot.SourceInfo{}) + require.NoError(t, err) + + t.Logf("man: %v", man.RootObjectID()) + + dir := EntryFromDirEntry(th.repo, man.RootEntry).(fs.Directory) + + successCount := 0 + + dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error { + if f, ok := e.(fs.File); ok { + oid, err := object.ParseID(strings.TrimPrefix(f.(object.HasObjectID).ObjectID().String(), "I")) + require.NoError(t, err) + + entries, err := object.LoadIndexObject(ctx, th.repo.(repo.DirectRepositoryWriter).ContentManager(), oid) + require.NoError(t, err) + + // ensure that index object contains breakpoints at all multiples of 'chunkSize'. + // Because we picked unusual chunkSize, this proves that uploads happened individually + // and were concatenated + for offset := int64(0); offset < f.Size(); offset += chunkSize { + verifyContainsOffset(t, entries, chunkSize) + successCount++ + } + + verifyFileContent(t, f, filepath.Join(td, f.Name())) + } + + return nil + }) + + // make sure we actually tested something + require.Greater(t, successCount, 0) +} + +func verifyFileContent(t *testing.T, f1Entry fs.File, f2Name string) { + t.Helper() + + f1, err := f1Entry.Open(testlogging.Context(t)) + require.NoError(t, err) + + defer f1.Close() + + f2, err := os.Open(f2Name) + require.NoError(t, err) + + defer f2.Close() + + buf1 := make([]byte, 1e6) + buf2 := make([]byte, 1e6) + + for { + n1, err1 := f1.Read(buf1) + n2, err2 := f2.Read(buf2) + + if errors.Is(err1, io.EOF) { + require.ErrorIs(t, err2, io.EOF) + return + } + + require.NoError(t, err1) + require.NoError(t, err2) + + require.Equal(t, buf1[0:n1], buf2[0:n2]) + } +} + +func verifyContainsOffset(t *testing.T, entries []object.IndirectObjectEntry, want int64) { + t.Helper() + + for _, e := range entries { + if e.Start == want { + return + } + } + + t.Fatalf("entry set %v does not contain offset %v", entries, want) +} + type loggedAction struct { msg string keysAndValues map[string]interface{}