From 0d1a627dcbb09b1d8a3a4c5df23dda4e1eecc453 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Thu, 12 Mar 2020 00:27:51 -0700 Subject: [PATCH] Improved upload performance and memory usage (#331) Upload: reduced memory usage during uploads Replaced custom work item management with much simpler solution. Upload: added pooling of upload buffers This reduces a lot of allocations and GC pressure. In a test of 2 dirs x 100K files x 100 bytes each, allocated bytes went from 27 GB to 20 GB. This improves #93 --- snapshot/snapshotfs/upload.go | 417 +++++++++++++++-------------- snapshot/snapshotfs/upload_test.go | 4 +- snapshot/stats.go | 4 +- 3 files changed, 213 insertions(+), 212 deletions(-) diff --git a/snapshot/snapshotfs/upload.go b/snapshot/snapshotfs/upload.go index c7e94b392..b8c7059d3 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/snapshotfs/upload.go @@ -7,12 +7,15 @@ "hash/fnv" "io" "os" + "path" "path/filepath" "runtime" + "sort" "sync" "sync/atomic" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/ignorefs" @@ -51,6 +54,8 @@ type Uploader struct { stats snapshot.Stats canceled int32 + + uploadBufPool sync.Pool } // IsCancelled returns true if the upload is canceled. @@ -71,13 +76,13 @@ func (u *Uploader) cancelReason() string { return "" } -func (u *Uploader) uploadFileInternal(ctx context.Context, relativePath string, f fs.File, pol *policy.Policy) entryResult { +func (u *Uploader) uploadFileInternal(ctx context.Context, relativePath string, f fs.File, pol *policy.Policy) (*snapshot.DirEntry, error) { u.Progress.HashingFile(relativePath) defer u.Progress.FinishedHashingFile(relativePath, f.Size()) file, err := f.Open(ctx) if err != nil { - return entryResult{err: errors.Wrap(err, "unable to open file")} + return nil, errors.Wrap(err, "unable to open file") } defer file.Close() //nolint:errcheck @@ -89,36 +94,36 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, relativePath string, written, err := u.copyWithProgress(writer, file, 0, f.Size()) if err != nil { - return entryResult{err: err} + return nil, err } fi2, err := file.Entry() if err != nil { - return entryResult{err: err} + return nil, err } r, err := writer.Result() if err != nil { - return entryResult{err: err} + return nil, err } de, err := newDirEntry(fi2, r) if err != nil { - return entryResult{err: errors.Wrap(err, "unable to create dir entry")} + return nil, errors.Wrap(err, "unable to create dir entry") } de.FileSize = written - return entryResult{de: de} + return de, nil } -func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath string, f fs.Symlink) entryResult { +func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath string, f fs.Symlink) (*snapshot.DirEntry, error) { u.Progress.HashingFile(relativePath) defer u.Progress.FinishedHashingFile(relativePath, f.Size()) target, err := f.Readlink(ctx) if err != nil { - return entryResult{err: errors.Wrap(err, "unable to read symlink")} + return nil, errors.Wrap(err, "unable to read symlink") } writer := u.repo.Objects.NewWriter(ctx, object.WriterOptions{ @@ -128,26 +133,29 @@ func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath strin written, err := u.copyWithProgress(writer, bytes.NewBufferString(target), 0, f.Size()) if err != nil { - return entryResult{err: err} + return nil, err } r, err := writer.Result() if err != nil { - return entryResult{err: err} + return nil, err } de, err := newDirEntry(f, r) if err != nil { - return entryResult{err: errors.Wrap(err, "unable to create dir entry")} + return nil, errors.Wrap(err, "unable to create dir entry") } de.FileSize = written - return entryResult{de: de} + return de, nil } func (u *Uploader) copyWithProgress(dst io.Writer, src io.Reader, completed, length int64) (int64, error) { - uploadBuf := make([]byte, copyBufferSize) + uploadBufPtr := u.uploadBufPool.Get().(*[]byte) + defer u.uploadBufPool.Put(uploadBufPtr) + + uploadBuf := *uploadBufPtr var written int64 @@ -218,20 +226,20 @@ func newDirEntry(md fs.Entry, oid object.ID) (*snapshot.DirEntry, error) { // uploadFile uploads the specified File to the repository. func (u *Uploader) uploadFile(ctx context.Context, relativePath string, file fs.File, pol *policy.Policy) (*snapshot.DirEntry, error) { - res := u.uploadFileInternal(ctx, relativePath, file, pol) - if res.err != nil { - return nil, res.err + res, err := u.uploadFileInternal(ctx, relativePath, file, pol) + if err != nil { + return nil, err } - de, err := newDirEntry(file, res.de.ObjectID) + de, err := newDirEntry(file, res.ObjectID) if err != nil { return nil, errors.Wrap(err, "unable to create dir entry") } de.DirSummary = &fs.DirectorySummary{ TotalFileCount: 1, - TotalFileSize: res.de.FileSize, - MaxModTime: res.de.ModTime, + TotalFileSize: res.FileSize, + MaxModTime: res.ModTime, } return de, nil @@ -256,29 +264,136 @@ func (u *Uploader) uploadDir(ctx context.Context, rootDir fs.Directory, policyTr return de, err } -func (u *Uploader) foreachEntryUnlessCancelled(relativePath string, entries fs.Entries, cb func(entry fs.Entry, entryRelativePath string) error) error { - for _, entry := range entries { - if u.IsCancelled() { - return errCancelled +func (u *Uploader) foreachEntryUnlessCancelled(ctx context.Context, parallel int, relativePath string, entries fs.Entries, cb func(ctx context.Context, entry fs.Entry, entryRelativePath string) error) error { + if parallel > len(entries) { + // don't launch more goroutines than needed + parallel = len(entries) + } + + if parallel == 0 { + return nil + } + + ch := make(chan fs.Entry) + eg, ctx := errgroup.WithContext(ctx) + + // one goroutine to pump entries into channel until ctx is closed. + eg.Go(func() error { + defer close(ch) + + for _, e := range entries { + select { + case ch <- e: // sent to channel + case <-ctx.Done(): // context closed + return nil + } + } + return nil + }) + + // launch N workers in parallel + for i := 0; i < parallel; i++ { + eg.Go(func() error { + for entry := range ch { + if u.IsCancelled() { + return errCancelled + } + + entryRelativePath := path.Join(relativePath, entry.Name()) + if err := cb(ctx, entry, entryRelativePath); err != nil { + return err + } + } + + return nil + }) + } + + return eg.Wait() +} + +func (u *Uploader) populateChildEntries(parent *snapshot.DirManifest, children <-chan *snapshot.DirEntry) { + parentSummary := parent.Summary + + for de := range children { + switch de.Type { + case snapshot.EntryTypeFile: + u.stats.TotalFileCount++ + u.stats.TotalFileSize += de.FileSize + parentSummary.TotalFileCount++ + parentSummary.TotalFileSize += de.FileSize + + if de.ModTime.After(parentSummary.MaxModTime) { + parentSummary.MaxModTime = de.ModTime + } + + case snapshot.EntryTypeDirectory: + if childSummary := de.DirSummary; childSummary != nil { + parentSummary.TotalFileCount += childSummary.TotalFileCount + parentSummary.TotalFileSize += childSummary.TotalFileSize + parentSummary.TotalDirCount += childSummary.TotalDirCount + + if childSummary.MaxModTime.After(parentSummary.MaxModTime) { + parentSummary.MaxModTime = childSummary.MaxModTime + } + } } - entryRelativePath := relativePath + "/" + entry.Name() + parent.Entries = append(parent.Entries, de) + } - if err := cb(entry, entryRelativePath); err != nil { - return err + // sort the result, directories first, then non-directories, ordered by name + sort.Slice(parent.Entries, func(i, j int) bool { + if leftDir, rightDir := isDir(parent.Entries[i]), isDir(parent.Entries[j]); leftDir != rightDir { + // directories get sorted before non-directories + return leftDir } + + return parent.Entries[i].Name < parent.Entries[j].Name + }) +} + +func isDir(e *snapshot.DirEntry) bool { + return e.Type == snapshot.EntryTypeDirectory +} + +func (u *Uploader) processChildren(ctx context.Context, dirManifest *snapshot.DirManifest, relativePath string, entries fs.Entries, policyTree *policy.Tree, previousEntries []fs.Entries) error { + var wg sync.WaitGroup + + // channel where we will add directory and file entries, possibly in parallel + output := make(chan *snapshot.DirEntry) + + // goroutine that will drain data from 'output' and update dirManifest + wg.Add(1) + + go func() { + defer wg.Done() + u.populateChildEntries(dirManifest, output) + }() + + defer func() { + // before this function returns, close the output channel and wait for the goroutine above to complete. + close(output) + wg.Wait() + }() + + if err := u.processSubdirectories(ctx, output, relativePath, entries, policyTree, previousEntries); err != nil { + return err + } + + if err := u.processNonDirectories(ctx, output, relativePath, entries, policyTree, previousEntries); err != nil { + return err } return nil } -type entryResult struct { - err error - de *snapshot.DirEntry -} +func (u *Uploader) processSubdirectories(ctx context.Context, output chan *snapshot.DirEntry, relativePath string, entries fs.Entries, policyTree *policy.Tree, previousEntries []fs.Entries) error { + // for now don't process subdirectories in parallel, we need a mechanism to + // prevent explosion of parallelism + const parallelism = 1 -func (u *Uploader) processSubdirectories(ctx context.Context, relativePath string, entries fs.Entries, policyTree *policy.Tree, previousEntries []fs.Entries, dirManifest *snapshot.DirManifest, summ *fs.DirectorySummary) error { - return u.foreachEntryUnlessCancelled(relativePath, entries, func(entry fs.Entry, entryRelativePath string) error { + return u.foreachEntryUnlessCancelled(ctx, parallelism, relativePath, entries, func(ctx context.Context, entry fs.Entry, entryRelativePath string) error { dir, ok := entry.(fs.Directory) if !ok { // skip non-directories @@ -299,13 +414,6 @@ func (u *Uploader) processSubdirectories(ctx context.Context, relativePath strin return err } - summ.TotalFileCount += subdirsumm.TotalFileCount - summ.TotalFileSize += subdirsumm.TotalFileSize - summ.TotalDirCount += subdirsumm.TotalDirCount - if subdirsumm.MaxModTime.After(summ.MaxModTime) { - summ.MaxModTime = subdirsumm.MaxModTime - } - if err != nil { // Note: This only catches errors in subdirectories of the snapshot root, not on the snapshot // root itself. The intention is to always fail if the top level directory can't be read, @@ -324,18 +432,11 @@ func (u *Uploader) processSubdirectories(ctx context.Context, relativePath strin } de.DirSummary = &subdirsumm - dirManifest.Entries = append(dirManifest.Entries, de) + output <- de return nil }) } -type uploadWorkItem struct { - entry fs.Entry - entryRelativePath string - uploadFunc func() entryResult - resultChan chan entryResult -} - func metadataEquals(e1, e2 fs.Entry) bool { if l, r := e1.ModTime(), e2.ModTime(); !l.Equal(r) { return false @@ -393,29 +494,22 @@ func (u *Uploader) maybeIgnoreCachedEntry(ctx context.Context, ent fs.Entry) fs. return nil } -func (u *Uploader) prepareWorkItems(ctx context.Context, dirRelativePath string, entries fs.Entries, policyTree *policy.Tree, prevEntries []fs.Entries, summ *fs.DirectorySummary) ([]*uploadWorkItem, error) { - var result []*uploadWorkItem +func (u *Uploader) processNonDirectories(ctx context.Context, output chan *snapshot.DirEntry, dirRelativePath string, entries fs.Entries, policyTree *policy.Tree, prevEntries []fs.Entries) error { + workerCount := u.ParallelUploads + if workerCount == 0 { + workerCount = runtime.NumCPU() + } - resultErr := u.foreachEntryUnlessCancelled(dirRelativePath, entries, func(entry fs.Entry, entryRelativePath string) error { + return u.foreachEntryUnlessCancelled(ctx, workerCount, dirRelativePath, entries, func(ctx context.Context, entry fs.Entry, entryRelativePath string) error { + // note this function runs in parallel and updates 'u.stats', which must be done using atomic operations. if _, ok := entry.(fs.Directory); ok { // skip directories return nil } - // regular file - if entry, ok := entry.(fs.File); ok { - u.stats.TotalFileCount++ - u.stats.TotalFileSize += entry.Size() - summ.TotalFileCount++ - summ.TotalFileSize += entry.Size() - if entry.ModTime().After(summ.MaxModTime) { - summ.MaxModTime = entry.ModTime() - } - } - // See if we had this name during either of previous passes. if cachedEntry := u.maybeIgnoreCachedEntry(ctx, findCachedEntry(ctx, entry, prevEntries)); cachedEntry != nil { - u.stats.CachedFiles++ + atomic.AddInt32(&u.stats.CachedFiles, 1) u.Progress.CachedFile(filepath.Join(dirRelativePath, entry.Name()), entry.Size()) // compute entryResult now, cachedEntry is short-lived @@ -424,117 +518,34 @@ func (u *Uploader) prepareWorkItems(ctx context.Context, dirRelativePath string, return errors.Wrap(err, "unable to create dir entry") } - // Avoid hashing by reusing previous object ID. - result = append(result, &uploadWorkItem{ - entry: entry, - entryRelativePath: entryRelativePath, - uploadFunc: func() entryResult { - return entryResult{de: cachedDirEntry} - }, - }) - } else { - switch entry := entry.(type) { - case fs.Symlink: - result = append(result, &uploadWorkItem{ - entry: entry, - entryRelativePath: entryRelativePath, - uploadFunc: func() entryResult { - return u.uploadSymlinkInternal(ctx, filepath.Join(dirRelativePath, entry.Name()), entry) - }, - }) - - case fs.File: - u.stats.NonCachedFiles++ - result = append(result, &uploadWorkItem{ - entry: entry, - entryRelativePath: entryRelativePath, - uploadFunc: func() entryResult { - return u.uploadFileInternal(ctx, filepath.Join(dirRelativePath, entry.Name()), entry, policyTree.Child(entry.Name()).EffectivePolicy()) - }, - }) - - default: - return errors.Errorf("file type not supported: %v", entry.Mode()) - } + output <- cachedDirEntry + return nil + } + + switch entry := entry.(type) { + case fs.Symlink: + de, err := u.uploadSymlinkInternal(ctx, filepath.Join(dirRelativePath, entry.Name()), entry) + if err != nil { + return u.maybeIgnoreFileReadError(err, policyTree) + } + + output <- de + return nil + + case fs.File: + atomic.AddInt32(&u.stats.NonCachedFiles, 1) + de, err := u.uploadFileInternal(ctx, filepath.Join(dirRelativePath, entry.Name()), entry, policyTree.Child(entry.Name()).EffectivePolicy()) + if err != nil { + return u.maybeIgnoreFileReadError(err, policyTree) + } + + output <- de + return nil + + default: + return errors.Errorf("file type not supported: %v", entry.Mode()) } - return nil }) - - return result, resultErr -} - -func toChannel(items []*uploadWorkItem) <-chan *uploadWorkItem { - ch := make(chan *uploadWorkItem) - - go func() { - defer close(ch) - - for _, wi := range items { - ch <- wi - } - }() - - return ch -} - -func (u *Uploader) launchWorkItems(workItems []*uploadWorkItem, wg *sync.WaitGroup) { - // allocate result channel for each work item. - for _, it := range workItems { - it.resultChan = make(chan entryResult, 1) - } - - workerCount := u.ParallelUploads - if workerCount == 0 { - workerCount = runtime.NumCPU() - } - - ch := toChannel(workItems) - - for i := 0; i < workerCount; i++ { - wg.Add(1) - - go func() { - defer wg.Done() - - for it := range ch { - it.resultChan <- it.uploadFunc() - } - }() - } -} - -func (u *Uploader) processUploadWorkItems(ctx context.Context, workItems []*uploadWorkItem, dirManifest *snapshot.DirManifest, ignoreFileErrs bool) error { - var wg sync.WaitGroup - - u.launchWorkItems(workItems, &wg) - - // Read result channels in order. - for _, it := range workItems { - result := <-it.resultChan - - if result.err == errCancelled { - return errCancelled - } - - if result.err != nil { - if ignoreFileErrs { - u.stats.ReadErrors++ - - log(ctx).Warningf("unable to hash file %q: %s, ignoring", it.entryRelativePath, result.err) - - continue - } - - return errors.Errorf("unable to process %q: %s", it.entryRelativePath, result.err) - } - - dirManifest.Entries = append(dirManifest.Entries, result.de) - } - - // wait for workers, this is technically not needed, but let's make sure we don't leak goroutines - wg.Wait() - - return nil } func maybeReadDirectoryEntries(ctx context.Context, dir fs.Directory) fs.Entries { @@ -591,18 +602,20 @@ func uploadDirInternal( u.Progress.StartedDirectory(dirRelativePath) defer u.Progress.FinishedDirectory(dirRelativePath) - var summ fs.DirectorySummary - summ.TotalDirCount = 1 + dirManifest := &snapshot.DirManifest{ + StreamType: directoryStreamType, + Summary: &fs.DirectorySummary{ + TotalDirCount: 1, + }, + } defer func() { - summ.IncompleteReason = u.cancelReason() + dirManifest.Summary.IncompleteReason = u.cancelReason() }() - log(ctx).Debugf("reading directory %v", dirRelativePath) - + t0 := u.repo.Time() entries, direrr := directory.Readdir(ctx) - - log(ctx).Debugf("finished reading directory %v", dirRelativePath) + log(ctx).Debugf("finished reading directory %v in %v", dirRelativePath, u.repo.Time().Sub(t0)) if direrr != nil { return "", fs.DirectorySummary{}, dirReadError{direrr} @@ -616,57 +629,38 @@ func uploadDirInternal( } } - if len(entries) == 0 { - summ.MaxModTime = directory.ModTime() - } - - dirManifest := &snapshot.DirManifest{ - StreamType: directoryStreamType, - } - - if err := u.processSubdirectories(ctx, dirRelativePath, entries, policyTree, prevEntries, dirManifest, &summ); err != nil && err != errCancelled { + if err := u.processChildren(ctx, dirManifest, dirRelativePath, entries, policyTree, prevEntries); err != nil && err != errCancelled { return "", fs.DirectorySummary{}, err } - log(ctx).Debugf("preparing work items %v", dirRelativePath) - workItems, workItemErr := u.prepareWorkItems(ctx, dirRelativePath, entries, policyTree, prevEntries, &summ) - log(ctx).Debugf("finished preparing work items %v", dirRelativePath) - - if workItemErr != nil && workItemErr != errCancelled { - return "", fs.DirectorySummary{}, workItemErr + if len(dirManifest.Entries) == 0 { + dirManifest.Summary.MaxModTime = directory.ModTime() } - ignoreFileErrs := u.shouldIgnoreFileReadErrors(policyTree) - if err := u.processUploadWorkItems(ctx, workItems, dirManifest, ignoreFileErrs); err != nil && err != errCancelled { - return "", fs.DirectorySummary{}, err - } - - log(ctx).Debugf("finished processing uploads %v", dirRelativePath) - - dirManifest.Summary = &summ + // at this point dirManifest is ready to go writer := u.repo.Objects.NewWriter(ctx, object.WriterOptions{ Description: "DIR:" + dirRelativePath, Prefix: "k", }) - if err := json.NewEncoder(writer).Encode(&dirManifest); err != nil { + if err := json.NewEncoder(writer).Encode(dirManifest); err != nil { return "", fs.DirectorySummary{}, errors.Wrap(err, "unable to encode directory JSON") } oid, err := writer.Result() - return oid, summ, err + return oid, *dirManifest.Summary, err } -func (u *Uploader) shouldIgnoreFileReadErrors(policyTree *policy.Tree) bool { +func (u *Uploader) maybeIgnoreFileReadError(err error, policyTree *policy.Tree) error { errHandlingPolicy := policyTree.EffectivePolicy().ErrorHandlingPolicy - if u.IgnoreReadErrors { - return true + if u.IgnoreReadErrors || errHandlingPolicy.IgnoreFileErrorsOrDefault(false) { + return nil } - return errHandlingPolicy.IgnoreFileErrorsOrDefault(false) + return err } func (u *Uploader) shouldIgnoreDirectoryReadErrors(policyTree *policy.Tree) bool { @@ -686,6 +680,13 @@ func NewUploader(r *repo.Repository) *Uploader { Progress: &NullUploadProgress{}, IgnoreReadErrors: false, ParallelUploads: 1, + uploadBufPool: sync.Pool{ + New: func() interface{} { + p := make([]byte, copyBufferSize) + + return &p + }, + }, } } diff --git a/snapshot/snapshotfs/upload_test.go b/snapshot/snapshotfs/upload_test.go index ca516f2c9..ffb0872be 100644 --- a/snapshot/snapshotfs/upload_test.go +++ b/snapshot/snapshotfs/upload_test.go @@ -126,7 +126,7 @@ func TestUpload(t *testing.T) { t.Errorf("expected s1.RootObjectID==s2.RootObjectID, got %v and %v", s1.RootObjectID().String(), s2.RootObjectID().String()) } - if got, want := s1.Stats.CachedFiles, 0; got != want { + if got, want := s1.Stats.CachedFiles, int32(0); got != want { t.Errorf("unexpected s1 cached files: %v, want %v", got, want) } @@ -135,7 +135,7 @@ func TestUpload(t *testing.T) { t.Errorf("unexpected s2 cached files: %v, want %v", got, want) } - if got, want := s2.Stats.NonCachedFiles, 0; got != want { + if got, want := s2.Stats.NonCachedFiles, int32(0); got != want { t.Errorf("unexpected non-cached files: %v", got) } diff --git a/snapshot/stats.go b/snapshot/stats.go index 895b5c09a..aa57143bf 100644 --- a/snapshot/stats.go +++ b/snapshot/stats.go @@ -14,8 +14,8 @@ type Stats struct { ExcludedTotalFileSize int64 `json:"excludedTotalSize"` ExcludedDirCount int `json:"excludedDirCount"` - CachedFiles int `json:"cachedFiles"` - NonCachedFiles int `json:"nonCachedFiles"` + CachedFiles int32 `json:"cachedFiles"` + NonCachedFiles int32 `json:"nonCachedFiles"` ReadErrors int `json:"readErrors"` }