mirror of
https://github.com/kopia/kopia.git
synced 2026-05-24 22:54:55 -04:00
feat(snapshots): improved performance when uploading huge files (#2064)
* feat(snapshots): improved performance when uploading huge files This is controlled by an upload policy which specifies the size threshold above which indvidual files are uploaded in parts and concatenated. This allows multiple threads to run splitting, hashing, compression and encryption in parallel, which was previously only possible across multiple files, but not when a single file was being uploaded. The default is 2GiB for now, so this feature only kicks in for very larger files. In the future we may lower this. Benchmark involved uploading a single 42.1 GB file which was a VM disk snapshot of fresh Ubuntu installation (fresh EXT4 partition with lots of zero bytes) to a brand-new filesystem repository on local SSD of M1 Pro Macbook Pro 2021. * before: 59-63s (~700 MB/s) * after: 15-17s (~2.6 GB/s) * additional test to ensure files are really e2e readable
This commit is contained in:
@@ -7,6 +7,7 @@
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/units"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/compression"
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
@@ -201,6 +202,41 @@ func applyOptionalInt(ctx context.Context, desc string, val **policy.OptionalInt
|
||||
return nil
|
||||
}
|
||||
|
||||
func applyOptionalInt64MiB(ctx context.Context, desc string, val **policy.OptionalInt64, str string, changeCount *int) error {
|
||||
if str == "" {
|
||||
// not changed
|
||||
return nil
|
||||
}
|
||||
|
||||
if str == inheritPolicyString || str == defaultPolicyString {
|
||||
*changeCount++
|
||||
|
||||
log(ctx).Infof(" - resetting %q to a default value inherited from parent.", desc)
|
||||
|
||||
*val = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// nolint:gomnd
|
||||
v, err := strconv.ParseInt(str, 10, 32)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "can't parse the %v %q", desc, str)
|
||||
}
|
||||
|
||||
// convert MiB to bytes
|
||||
v *= 1 << 20 // nolint:gomnd
|
||||
|
||||
i := policy.OptionalInt64(v)
|
||||
*changeCount++
|
||||
|
||||
log(ctx).Infof(" - setting %q to %v.", desc, units.BytesStringBase2(v))
|
||||
|
||||
*val = &i
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func applyPolicyNumber64(ctx context.Context, desc string, val *int64, str string, changeCount *int) error {
|
||||
if str == "" {
|
||||
// not changed
|
||||
|
||||
@@ -9,13 +9,15 @@
|
||||
)
|
||||
|
||||
type policyUploadFlags struct {
|
||||
maxParallelUploads string
|
||||
maxParallelFileReads string
|
||||
maxParallelUploads string
|
||||
maxParallelFileReads string
|
||||
parallelizeUploadAboveSizeMiB string
|
||||
}
|
||||
|
||||
func (c *policyUploadFlags) setup(cmd *kingpin.CmdClause) {
|
||||
cmd.Flag("max-parallel-file-reads", "Maximum number of parallel file reads").StringVar(&c.maxParallelFileReads)
|
||||
cmd.Flag("max-parallel-snapshots", "Maximum number of parallel snapshots (server, KopiaUI only)").StringVar(&c.maxParallelUploads)
|
||||
cmd.Flag("parallel-upload-above-size-mib", "Use parallel uploads above size").StringVar(&c.parallelizeUploadAboveSizeMiB)
|
||||
}
|
||||
|
||||
func (c *policyUploadFlags) setUploadPolicyFromFlags(ctx context.Context, up *policy.UploadPolicy, changeCount *int) error {
|
||||
@@ -27,5 +29,9 @@ func (c *policyUploadFlags) setUploadPolicyFromFlags(ctx context.Context, up *po
|
||||
return err
|
||||
}
|
||||
|
||||
if err := applyOptionalInt64MiB(ctx, "parallel upload above size", &up.ParallelUploadAboveSize, c.parallelizeUploadAboveSizeMiB, changeCount); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ func TestSetUploadPolicy(t *testing.T) {
|
||||
lines = compressSpaces(lines)
|
||||
require.Contains(t, lines, " Max parallel snapshots (server/UI): 1 (defined for this target)")
|
||||
require.Contains(t, lines, " Max parallel file reads: - (defined for this target)")
|
||||
require.Contains(t, lines, " Parallel upload above size: 2 GiB (defined for this target)")
|
||||
|
||||
// make some directory we'll be setting policy on
|
||||
td := testutil.TempDirectory(t)
|
||||
@@ -27,20 +28,23 @@ func TestSetUploadPolicy(t *testing.T) {
|
||||
lines = compressSpaces(lines)
|
||||
require.Contains(t, lines, " Max parallel snapshots (server/UI): 1 inherited from (global)")
|
||||
require.Contains(t, lines, " Max parallel file reads: - inherited from (global)")
|
||||
require.Contains(t, lines, " Parallel upload above size: 2 GiB inherited from (global)")
|
||||
|
||||
e.RunAndExpectSuccess(t, "policy", "set", "--global", "--max-parallel-snapshots=7", "--max-parallel-file-reads=33")
|
||||
e.RunAndExpectSuccess(t, "policy", "set", "--global", "--max-parallel-snapshots=7", "--max-parallel-file-reads=33", "--parallel-upload-above-size-mib=4096")
|
||||
|
||||
lines = e.RunAndExpectSuccess(t, "policy", "show", td)
|
||||
lines = compressSpaces(lines)
|
||||
|
||||
require.Contains(t, lines, " Max parallel snapshots (server/UI): 7 inherited from (global)")
|
||||
require.Contains(t, lines, " Max parallel file reads: 33 inherited from (global)")
|
||||
require.Contains(t, lines, " Parallel upload above size: 4 GiB inherited from (global)")
|
||||
|
||||
e.RunAndExpectSuccess(t, "policy", "set", "--global", "--max-parallel-snapshots=default", "--max-parallel-file-reads=default")
|
||||
e.RunAndExpectSuccess(t, "policy", "set", "--global", "--max-parallel-snapshots=default", "--max-parallel-file-reads=default", "--parallel-upload-above-size-mib=default")
|
||||
|
||||
lines = e.RunAndExpectSuccess(t, "policy", "show", td)
|
||||
lines = compressSpaces(lines)
|
||||
|
||||
require.Contains(t, lines, " Max parallel snapshots (server/UI): 1 inherited from (global)")
|
||||
require.Contains(t, lines, " Max parallel file reads: - inherited from (global)")
|
||||
require.Contains(t, lines, " Parallel upload above size: 2 GiB inherited from (global)")
|
||||
}
|
||||
|
||||
@@ -270,6 +270,7 @@ func appendUploadPolicyRows(rows []policyTableRow, p *policy.Policy, def *policy
|
||||
policyTableRow{"Uploads:", "", ""},
|
||||
policyTableRow{" Max parallel snapshots (server/UI):", valueOrNotSet(p.UploadPolicy.MaxParallelSnapshots), definitionPointToString(p.Target(), def.UploadPolicy.MaxParallelSnapshots)},
|
||||
policyTableRow{" Max parallel file reads:", valueOrNotSet(p.UploadPolicy.MaxParallelFileReads), definitionPointToString(p.Target(), def.UploadPolicy.MaxParallelFileReads)},
|
||||
policyTableRow{" Parallel upload above size:", valueOrNotSetOptionalInt64Bytes(p.UploadPolicy.ParallelUploadAboveSize), definitionPointToString(p.Target(), def.UploadPolicy.ParallelUploadAboveSize)},
|
||||
)
|
||||
}
|
||||
|
||||
@@ -435,3 +436,11 @@ func valueOrNotSet(p *policy.OptionalInt) string {
|
||||
|
||||
return fmt.Sprintf("%v", *p)
|
||||
}
|
||||
|
||||
func valueOrNotSetOptionalInt64Bytes(p *policy.OptionalInt64) string {
|
||||
if p == nil {
|
||||
return "-"
|
||||
}
|
||||
|
||||
return units.BytesStringBase2(int64(*p))
|
||||
}
|
||||
|
||||
@@ -68,6 +68,12 @@ func (r *apiServerRepository) NewObjectWriter(ctx context.Context, opt object.Wr
|
||||
return r.omgr.NewWriter(ctx, opt)
|
||||
}
|
||||
|
||||
// ConcatenateObjects creates a concatenated objects from the provided object IDs.
|
||||
func (r *apiServerRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) {
|
||||
// nolint:wrapcheck
|
||||
return r.omgr.Concatenate(ctx, objectIDs)
|
||||
}
|
||||
|
||||
func (r *apiServerRepository) VerifyObject(ctx context.Context, id object.ID) ([]content.ID, error) {
|
||||
// nolint:wrapcheck
|
||||
return object.VerifyObject(ctx, r, id)
|
||||
|
||||
@@ -473,6 +473,12 @@ func (r *grpcRepositoryClient) NewWriter(ctx context.Context, opt WriteSessionOp
|
||||
return ctx, w, nil
|
||||
}
|
||||
|
||||
// ConcatenateObjects creates a concatenated objects from the provided object IDs.
|
||||
func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) {
|
||||
// nolint:wrapcheck
|
||||
return r.omgr.Concatenate(ctx, objectIDs)
|
||||
}
|
||||
|
||||
type sessionAttemptFunc func(ctx context.Context, sess *grpcInnerSession) (interface{}, error)
|
||||
|
||||
// maybeRetry executes the provided callback with or without automatic retries depending on how
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
package object
|
||||
|
||||
// indirectObjectEntry represents an entry in indirect object stream.
|
||||
type indirectObjectEntry struct {
|
||||
// IndirectObjectEntry represents an entry in indirect object stream.
|
||||
type IndirectObjectEntry struct {
|
||||
Start int64 `json:"s,omitempty"`
|
||||
Length int64 `json:"l,omitempty"`
|
||||
Object ID `json:"o,omitempty"`
|
||||
}
|
||||
|
||||
func (i *indirectObjectEntry) endOffset() int64 {
|
||||
func (i *IndirectObjectEntry) endOffset() int64 {
|
||||
return i.Start + i.Length
|
||||
}
|
||||
|
||||
|
||||
@@ -106,7 +106,7 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error)
|
||||
}
|
||||
|
||||
var (
|
||||
concatenatedEntries []indirectObjectEntry
|
||||
concatenatedEntries []IndirectObjectEntry
|
||||
totalLength int64
|
||||
err error
|
||||
)
|
||||
@@ -138,9 +138,9 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error)
|
||||
return IndirectObjectID(concatID), nil
|
||||
}
|
||||
|
||||
func appendIndexEntriesForObject(ctx context.Context, cr contentReader, indexEntries []indirectObjectEntry, startingLength int64, objectID ID) (result []indirectObjectEntry, totalLength int64, _ error) {
|
||||
func appendIndexEntriesForObject(ctx context.Context, cr contentReader, indexEntries []IndirectObjectEntry, startingLength int64, objectID ID) (result []IndirectObjectEntry, totalLength int64, _ error) {
|
||||
if indexObjectID, ok := objectID.IndexObjectID(); ok {
|
||||
ndx, err := loadSeekTable(ctx, cr, indexObjectID)
|
||||
ndx, err := LoadIndexObject(ctx, cr, indexObjectID)
|
||||
if err != nil {
|
||||
return nil, 0, errors.Wrapf(err, "error reading index of %v", objectID)
|
||||
}
|
||||
@@ -158,7 +158,7 @@ func appendIndexEntriesForObject(ctx context.Context, cr contentReader, indexEnt
|
||||
}
|
||||
defer r.Close() //nolint:errcheck
|
||||
|
||||
indexEntries, totalLength = appendIndexEntries(indexEntries, startingLength, indirectObjectEntry{
|
||||
indexEntries, totalLength = appendIndexEntries(indexEntries, startingLength, IndirectObjectEntry{
|
||||
Start: 0,
|
||||
Length: r.Length(),
|
||||
Object: objectID,
|
||||
@@ -167,11 +167,11 @@ func appendIndexEntriesForObject(ctx context.Context, cr contentReader, indexEnt
|
||||
return indexEntries, totalLength, nil
|
||||
}
|
||||
|
||||
func appendIndexEntries(indexEntries []indirectObjectEntry, startingLength int64, incoming ...indirectObjectEntry) (result []indirectObjectEntry, totalLength int64) {
|
||||
func appendIndexEntries(indexEntries []IndirectObjectEntry, startingLength int64, incoming ...IndirectObjectEntry) (result []IndirectObjectEntry, totalLength int64) {
|
||||
totalLength = startingLength
|
||||
|
||||
for _, inc := range incoming {
|
||||
indexEntries = append(indexEntries, indirectObjectEntry{
|
||||
indexEntries = append(indexEntries, IndirectObjectEntry{
|
||||
Start: inc.Start + startingLength,
|
||||
Length: inc.Length,
|
||||
Object: inc.Object,
|
||||
|
||||
@@ -41,7 +41,7 @@ type objectReader struct {
|
||||
|
||||
cr contentReader
|
||||
|
||||
seekTable []indirectObjectEntry
|
||||
seekTable []IndirectObjectEntry
|
||||
|
||||
currentPosition int64 // Overall position in the objectReader
|
||||
totalLength int64 // Overall length
|
||||
@@ -202,7 +202,7 @@ func (r *objectReader) Length() int64 {
|
||||
func openAndAssertLength(ctx context.Context, cr contentReader, objectID ID, assertLength int64) (Reader, error) {
|
||||
if indexObjectID, ok := objectID.IndexObjectID(); ok {
|
||||
// recursively calls openAndAssertLength
|
||||
seekTable, err := loadSeekTable(ctx, cr, indexObjectID)
|
||||
seekTable, err := LoadIndexObject(ctx, cr, indexObjectID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -225,7 +225,7 @@ func iterateIndirectObjectContents(ctx context.Context, cr contentReader, indexO
|
||||
return errors.Wrap(err, "unable to read index")
|
||||
}
|
||||
|
||||
seekTable, err := loadSeekTable(ctx, cr, indexObjectID)
|
||||
seekTable, err := LoadIndexObject(ctx, cr, indexObjectID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -260,10 +260,11 @@ func iterateBackingContents(ctx context.Context, r contentReader, oid ID, tracke
|
||||
|
||||
type indirectObject struct {
|
||||
StreamID string `json:"stream"`
|
||||
Entries []indirectObjectEntry `json:"entries"`
|
||||
Entries []IndirectObjectEntry `json:"entries"`
|
||||
}
|
||||
|
||||
func loadSeekTable(ctx context.Context, cr contentReader, indexObjectID ID) ([]indirectObjectEntry, error) {
|
||||
// LoadIndexObject returns entries comprising index object.
|
||||
func LoadIndexObject(ctx context.Context, cr contentReader, indexObjectID ID) ([]IndirectObjectEntry, error) {
|
||||
r, err := openAndAssertLength(ctx, cr, indexObjectID, -1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -77,8 +77,8 @@ type objectWriter struct {
|
||||
currentPosition int64
|
||||
|
||||
indirectIndexGrowMutex sync.Mutex
|
||||
indirectIndex []indirectObjectEntry
|
||||
indirectIndexBuf [4]indirectObjectEntry // small buffer so that we avoid allocations most of the time
|
||||
indirectIndex []IndirectObjectEntry
|
||||
indirectIndexBuf [4]IndirectObjectEntry // small buffer so that we avoid allocations most of the time
|
||||
|
||||
description string
|
||||
|
||||
@@ -143,7 +143,7 @@ func (w *objectWriter) flushBuffer() error {
|
||||
// hold a lock as we may grow the index
|
||||
w.indirectIndexGrowMutex.Lock()
|
||||
chunkID := len(w.indirectIndex)
|
||||
w.indirectIndex = append(w.indirectIndex, indirectObjectEntry{})
|
||||
w.indirectIndex = append(w.indirectIndex, IndirectObjectEntry{})
|
||||
w.indirectIndex[chunkID].Start = w.currentPosition
|
||||
w.indirectIndex[chunkID].Length = int64(length)
|
||||
w.currentPosition += int64(length)
|
||||
@@ -316,7 +316,7 @@ func (w *objectWriter) checkpointLocked() (ID, error) {
|
||||
return IndirectObjectID(oid), nil
|
||||
}
|
||||
|
||||
func writeIndirectObject(w io.Writer, entries []indirectObjectEntry) error {
|
||||
func writeIndirectObject(w io.Writer, entries []IndirectObjectEntry) error {
|
||||
ind := indirectObject{
|
||||
StreamID: "kopia:indirect",
|
||||
Entries: entries,
|
||||
|
||||
@@ -41,6 +41,7 @@ type RepositoryWriter interface {
|
||||
Repository
|
||||
|
||||
NewObjectWriter(ctx context.Context, opt object.WriterOptions) object.Writer
|
||||
ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error)
|
||||
PutManifest(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error)
|
||||
DeleteManifest(ctx context.Context, id manifest.ID) error
|
||||
Flush(ctx context.Context) error
|
||||
@@ -153,6 +154,12 @@ func (r *directRepository) NewObjectWriter(ctx context.Context, opt object.Write
|
||||
return r.omgr.NewWriter(ctx, opt)
|
||||
}
|
||||
|
||||
// ConcatenateObjects creates a concatenated objects from the provided object IDs.
|
||||
func (r *directRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) {
|
||||
// nolint:wrapcheck
|
||||
return r.omgr.Concatenate(ctx, objectIDs)
|
||||
}
|
||||
|
||||
// DisableIndexRefresh disables index refresh for the duration of the write session.
|
||||
func (r *directRepository) DisableIndexRefresh() {
|
||||
r.cmgr.DisableIndexRefresh()
|
||||
|
||||
@@ -31,3 +31,19 @@ func (b *OptionalInt) OrDefault(def int) int {
|
||||
func newOptionalInt(b OptionalInt) *OptionalInt {
|
||||
return &b
|
||||
}
|
||||
|
||||
// OptionalInt64 provides convenience methods for manipulating optional integers.
|
||||
type OptionalInt64 int64
|
||||
|
||||
// OrDefault returns the value of the integer or provided default if it's nil.
|
||||
func (b *OptionalInt64) OrDefault(def int64) int64 {
|
||||
if b == nil {
|
||||
return def
|
||||
}
|
||||
|
||||
return int64(*b)
|
||||
}
|
||||
|
||||
func newOptionalInt64(b OptionalInt64) *OptionalInt64 {
|
||||
return &b
|
||||
}
|
||||
|
||||
@@ -67,6 +67,15 @@ func mergeOptionalInt(target **OptionalInt, src *OptionalInt, def *snapshot.Sour
|
||||
}
|
||||
}
|
||||
|
||||
func mergeOptionalInt64(target **OptionalInt64, src *OptionalInt64, def *snapshot.SourceInfo, si snapshot.SourceInfo) {
|
||||
if *target == nil && src != nil {
|
||||
v := *src
|
||||
|
||||
*target = &v
|
||||
*def = si
|
||||
}
|
||||
}
|
||||
|
||||
func mergeStringsReplace(target *[]string, src []string, def *snapshot.SourceInfo, si snapshot.SourceInfo) {
|
||||
if len(*target) == 0 && len(src) > 0 {
|
||||
*target = src
|
||||
|
||||
@@ -117,6 +117,14 @@ func testPolicyMergeSingleField(t *testing.T, fieldName string, typ reflect.Type
|
||||
v1 = reflect.ValueOf(&ob1)
|
||||
v2 = reflect.ValueOf(&ob2)
|
||||
|
||||
case "*policy.OptionalInt64":
|
||||
ob1 := policy.OptionalInt64(1)
|
||||
ob2 := policy.OptionalInt64(7)
|
||||
|
||||
v0 = reflect.ValueOf((*policy.OptionalInt64)(nil))
|
||||
v1 = reflect.ValueOf(&ob1)
|
||||
v2 = reflect.ValueOf(&ob2)
|
||||
|
||||
case "bool":
|
||||
v0 = reflect.ValueOf(false)
|
||||
v1 = reflect.ValueOf(false)
|
||||
|
||||
@@ -54,6 +54,9 @@
|
||||
defaultUploadPolicy = UploadPolicy{
|
||||
MaxParallelSnapshots: newOptionalInt(1),
|
||||
MaxParallelFileReads: nil, // defaults to runtime.NumCPUs()
|
||||
|
||||
// upload large files in chunks of 2 GiB
|
||||
ParallelUploadAboveSize: newOptionalInt64(2 << 30), // nolint:gomnd
|
||||
}
|
||||
|
||||
// DefaultPolicy is a default policy returned by policy tree in absence of other policies.
|
||||
|
||||
@@ -8,20 +8,23 @@
|
||||
|
||||
// UploadPolicy describes policy to apply when uploading snapshots.
|
||||
type UploadPolicy struct {
|
||||
MaxParallelSnapshots *OptionalInt `json:"maxParallelSnapshots,omitempty"`
|
||||
MaxParallelFileReads *OptionalInt `json:"maxParallelFileReads,omitempty"`
|
||||
MaxParallelSnapshots *OptionalInt `json:"maxParallelSnapshots,omitempty"`
|
||||
MaxParallelFileReads *OptionalInt `json:"maxParallelFileReads,omitempty"`
|
||||
ParallelUploadAboveSize *OptionalInt64 `json:"parallelUploadAboveSize,omitempty"`
|
||||
}
|
||||
|
||||
// UploadPolicyDefinition specifies which policy definition provided the value of a particular field.
|
||||
type UploadPolicyDefinition struct {
|
||||
MaxParallelSnapshots snapshot.SourceInfo `json:"maxParallelSnapshots,omitempty"`
|
||||
MaxParallelFileReads snapshot.SourceInfo `json:"maxParallelFileReads,omitempty"`
|
||||
MaxParallelSnapshots snapshot.SourceInfo `json:"maxParallelSnapshots,omitempty"`
|
||||
MaxParallelFileReads snapshot.SourceInfo `json:"maxParallelFileReads,omitempty"`
|
||||
ParallelUploadAboveSize snapshot.SourceInfo `json:"parallelUploadAboveSize,omitempty"`
|
||||
}
|
||||
|
||||
// Merge applies default values from the provided policy.
|
||||
func (p *UploadPolicy) Merge(src UploadPolicy, def *UploadPolicyDefinition, si snapshot.SourceInfo) {
|
||||
mergeOptionalInt(&p.MaxParallelSnapshots, src.MaxParallelSnapshots, &def.MaxParallelSnapshots, si)
|
||||
mergeOptionalInt(&p.MaxParallelFileReads, src.MaxParallelFileReads, &def.MaxParallelFileReads, si)
|
||||
mergeOptionalInt64(&p.ParallelUploadAboveSize, src.ParallelUploadAboveSize, &def.ParallelUploadAboveSize, si)
|
||||
}
|
||||
|
||||
// ValidateUploadPolicy returns an error if manual field is set along with Upload fields.
|
||||
|
||||
@@ -13,10 +13,12 @@
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/multierr"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
"github.com/kopia/kopia/fs/ignorefs"
|
||||
@@ -158,7 +160,78 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis
|
||||
|
||||
comp := pol.CompressionPolicy.CompressorForFile(f)
|
||||
|
||||
return u.uploadFileData(ctx, parentCheckpointRegistry, f, f.Name(), 0, -1, comp)
|
||||
chunkSize := pol.UploadPolicy.ParallelUploadAboveSize.OrDefault(-1)
|
||||
if chunkSize < 0 || f.Size() <= chunkSize {
|
||||
// all data fits in 1 full chunks, upload directly
|
||||
return u.uploadFileData(ctx, parentCheckpointRegistry, f, f.Name(), 0, -1, comp)
|
||||
}
|
||||
|
||||
// we always have N+1 parts, first N are exactly chunkSize, last one has undetermined length
|
||||
fullParts := f.Size() / chunkSize
|
||||
|
||||
// directory entries and errors for partial upload results
|
||||
parts := make([]*snapshot.DirEntry, fullParts+1)
|
||||
partErrors := make([]error, fullParts+1)
|
||||
|
||||
uploadLog(ctx).Debugf("performing chunked upload for %v (%v parts)", relativePath, len(parts))
|
||||
|
||||
var wg workshare.AsyncGroup
|
||||
defer wg.Close()
|
||||
|
||||
for i := 0; i < len(parts); i++ {
|
||||
i := i
|
||||
offset := int64(i) * chunkSize
|
||||
|
||||
length := chunkSize
|
||||
if i == len(parts)-1 {
|
||||
// last part has unknown length to accommodate the file that may be growing as we're snapshotting it
|
||||
length = -1
|
||||
}
|
||||
|
||||
if wg.CanShareWork(u.workerPool) {
|
||||
// another goroutine is available, delegate to them
|
||||
wg.RunAsync(u.workerPool, func(c *workshare.Pool, request interface{}) {
|
||||
parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp)
|
||||
}, nil)
|
||||
} else {
|
||||
// just do the work in the current goroutine
|
||||
parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp)
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// see if we got any errors
|
||||
if err := multierr.Combine(partErrors...); err != nil {
|
||||
return nil, errors.Wrap(err, "error uploading parts")
|
||||
}
|
||||
|
||||
return concatenateParts(ctx, u.repo, f.Name(), parts)
|
||||
}
|
||||
|
||||
func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name string, parts []*snapshot.DirEntry) (*snapshot.DirEntry, error) {
|
||||
var (
|
||||
objectIDs []object.ID
|
||||
totalSize int64
|
||||
)
|
||||
|
||||
// resulting size is the sum of all parts and resulting object ID is concatenation of individual object IDs.
|
||||
for _, part := range parts {
|
||||
totalSize += part.FileSize
|
||||
objectIDs = append(objectIDs, part.ObjectID)
|
||||
}
|
||||
|
||||
resultObject, err := rep.ConcatenateObjects(ctx, objectIDs)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "concatenate")
|
||||
}
|
||||
|
||||
de := parts[0]
|
||||
de.Name = name
|
||||
de.FileSize = totalSize
|
||||
de.ObjectID = resultObject
|
||||
|
||||
return de, nil
|
||||
}
|
||||
|
||||
func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, f fs.File, fname string, offset, length int64, compressor compression.Name) (*snapshot.DirEntry, error) {
|
||||
|
||||
@@ -4,11 +4,13 @@
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -929,6 +931,140 @@ func TestParallelUploadDedup(t *testing.T) {
|
||||
require.Less(t, testutil.MustGetTotalDirSize(t, th.repoDir), int64(51000000))
|
||||
}
|
||||
|
||||
func TestParallelUploadOfLargeFiles(t *testing.T) {
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
|
||||
u := NewUploader(th.repo)
|
||||
u.ParallelUploads = 10
|
||||
|
||||
pol := *policy.DefaultPolicy
|
||||
|
||||
// change policies so that all files above this size are uploaded in parallel
|
||||
// use an unusual number so that it's easy to spot.
|
||||
const chunkSize = 10203040
|
||||
|
||||
// future reader, the chunk size must be greater than 4 MiB to make sure splitters are
|
||||
// not used in degenerate form.
|
||||
require.Greater(t, chunkSize, 4<<20)
|
||||
|
||||
n := policy.OptionalInt64(chunkSize)
|
||||
pol.UploadPolicy.ParallelUploadAboveSize = &n
|
||||
|
||||
policyTree := policy.BuildTree(nil, &pol)
|
||||
|
||||
testutil.TestSkipOnCIUnlessLinuxAMD64(t)
|
||||
td := testutil.TempDirectory(t)
|
||||
|
||||
// Write 2 x 50MB files
|
||||
var files []*os.File
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
f, cerr := os.Create(filepath.Join(td, fmt.Sprintf("file-%v", i)))
|
||||
require.NoError(t, cerr)
|
||||
|
||||
files = append(files, f)
|
||||
}
|
||||
|
||||
for j := 0; j < 1000; j++ {
|
||||
buf := make([]byte, 50000)
|
||||
|
||||
for _, f := range files {
|
||||
rand.Read(buf)
|
||||
|
||||
_, werr := f.Write(buf)
|
||||
require.NoError(t, werr)
|
||||
}
|
||||
}
|
||||
|
||||
for _, f := range files {
|
||||
f.Close()
|
||||
}
|
||||
|
||||
srcdir, err := localfs.Directory(td)
|
||||
require.NoError(t, err)
|
||||
|
||||
man, err := u.Upload(ctx, srcdir, policyTree, snapshot.SourceInfo{})
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Logf("man: %v", man.RootObjectID())
|
||||
|
||||
dir := EntryFromDirEntry(th.repo, man.RootEntry).(fs.Directory)
|
||||
|
||||
successCount := 0
|
||||
|
||||
dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error {
|
||||
if f, ok := e.(fs.File); ok {
|
||||
oid, err := object.ParseID(strings.TrimPrefix(f.(object.HasObjectID).ObjectID().String(), "I"))
|
||||
require.NoError(t, err)
|
||||
|
||||
entries, err := object.LoadIndexObject(ctx, th.repo.(repo.DirectRepositoryWriter).ContentManager(), oid)
|
||||
require.NoError(t, err)
|
||||
|
||||
// ensure that index object contains breakpoints at all multiples of 'chunkSize'.
|
||||
// Because we picked unusual chunkSize, this proves that uploads happened individually
|
||||
// and were concatenated
|
||||
for offset := int64(0); offset < f.Size(); offset += chunkSize {
|
||||
verifyContainsOffset(t, entries, chunkSize)
|
||||
successCount++
|
||||
}
|
||||
|
||||
verifyFileContent(t, f, filepath.Join(td, f.Name()))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
// make sure we actually tested something
|
||||
require.Greater(t, successCount, 0)
|
||||
}
|
||||
|
||||
func verifyFileContent(t *testing.T, f1Entry fs.File, f2Name string) {
|
||||
t.Helper()
|
||||
|
||||
f1, err := f1Entry.Open(testlogging.Context(t))
|
||||
require.NoError(t, err)
|
||||
|
||||
defer f1.Close()
|
||||
|
||||
f2, err := os.Open(f2Name)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer f2.Close()
|
||||
|
||||
buf1 := make([]byte, 1e6)
|
||||
buf2 := make([]byte, 1e6)
|
||||
|
||||
for {
|
||||
n1, err1 := f1.Read(buf1)
|
||||
n2, err2 := f2.Read(buf2)
|
||||
|
||||
if errors.Is(err1, io.EOF) {
|
||||
require.ErrorIs(t, err2, io.EOF)
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, err1)
|
||||
require.NoError(t, err2)
|
||||
|
||||
require.Equal(t, buf1[0:n1], buf2[0:n2])
|
||||
}
|
||||
}
|
||||
|
||||
func verifyContainsOffset(t *testing.T, entries []object.IndirectObjectEntry, want int64) {
|
||||
t.Helper()
|
||||
|
||||
for _, e := range entries {
|
||||
if e.Start == want {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
t.Fatalf("entry set %v does not contain offset %v", entries, want)
|
||||
}
|
||||
|
||||
type loggedAction struct {
|
||||
msg string
|
||||
keysAndValues map[string]interface{}
|
||||
|
||||
Reference in New Issue
Block a user