Improved upload performance and memory usage (#331)

Upload: reduced memory usage during uploads
Replaced custom work item management with much simpler solution.

Upload: added pooling of upload buffers
This reduces a lot of allocations and GC pressure.

In a test of 2 dirs x 100K files x 100 bytes each, allocated bytes
went from 27 GB to 20 GB.

This improves #93
This commit is contained in:
Jarek Kowalski
2020-03-12 00:27:51 -07:00
committed by GitHub
parent 89c0c6bac4
commit 0d1a627dcb
3 changed files with 213 additions and 212 deletions

View File

@@ -7,12 +7,15 @@
"hash/fnv"
"io"
"os"
"path"
"path/filepath"
"runtime"
"sort"
"sync"
"sync/atomic"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/ignorefs"
@@ -51,6 +54,8 @@ type Uploader struct {
stats snapshot.Stats
canceled int32
uploadBufPool sync.Pool
}
// IsCancelled returns true if the upload is canceled.
@@ -71,13 +76,13 @@ func (u *Uploader) cancelReason() string {
return ""
}
func (u *Uploader) uploadFileInternal(ctx context.Context, relativePath string, f fs.File, pol *policy.Policy) entryResult {
func (u *Uploader) uploadFileInternal(ctx context.Context, relativePath string, f fs.File, pol *policy.Policy) (*snapshot.DirEntry, error) {
u.Progress.HashingFile(relativePath)
defer u.Progress.FinishedHashingFile(relativePath, f.Size())
file, err := f.Open(ctx)
if err != nil {
return entryResult{err: errors.Wrap(err, "unable to open file")}
return nil, errors.Wrap(err, "unable to open file")
}
defer file.Close() //nolint:errcheck
@@ -89,36 +94,36 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, relativePath string,
written, err := u.copyWithProgress(writer, file, 0, f.Size())
if err != nil {
return entryResult{err: err}
return nil, err
}
fi2, err := file.Entry()
if err != nil {
return entryResult{err: err}
return nil, err
}
r, err := writer.Result()
if err != nil {
return entryResult{err: err}
return nil, err
}
de, err := newDirEntry(fi2, r)
if err != nil {
return entryResult{err: errors.Wrap(err, "unable to create dir entry")}
return nil, errors.Wrap(err, "unable to create dir entry")
}
de.FileSize = written
return entryResult{de: de}
return de, nil
}
func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath string, f fs.Symlink) entryResult {
func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath string, f fs.Symlink) (*snapshot.DirEntry, error) {
u.Progress.HashingFile(relativePath)
defer u.Progress.FinishedHashingFile(relativePath, f.Size())
target, err := f.Readlink(ctx)
if err != nil {
return entryResult{err: errors.Wrap(err, "unable to read symlink")}
return nil, errors.Wrap(err, "unable to read symlink")
}
writer := u.repo.Objects.NewWriter(ctx, object.WriterOptions{
@@ -128,26 +133,29 @@ func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath strin
written, err := u.copyWithProgress(writer, bytes.NewBufferString(target), 0, f.Size())
if err != nil {
return entryResult{err: err}
return nil, err
}
r, err := writer.Result()
if err != nil {
return entryResult{err: err}
return nil, err
}
de, err := newDirEntry(f, r)
if err != nil {
return entryResult{err: errors.Wrap(err, "unable to create dir entry")}
return nil, errors.Wrap(err, "unable to create dir entry")
}
de.FileSize = written
return entryResult{de: de}
return de, nil
}
func (u *Uploader) copyWithProgress(dst io.Writer, src io.Reader, completed, length int64) (int64, error) {
uploadBuf := make([]byte, copyBufferSize)
uploadBufPtr := u.uploadBufPool.Get().(*[]byte)
defer u.uploadBufPool.Put(uploadBufPtr)
uploadBuf := *uploadBufPtr
var written int64
@@ -218,20 +226,20 @@ func newDirEntry(md fs.Entry, oid object.ID) (*snapshot.DirEntry, error) {
// uploadFile uploads the specified File to the repository.
func (u *Uploader) uploadFile(ctx context.Context, relativePath string, file fs.File, pol *policy.Policy) (*snapshot.DirEntry, error) {
res := u.uploadFileInternal(ctx, relativePath, file, pol)
if res.err != nil {
return nil, res.err
res, err := u.uploadFileInternal(ctx, relativePath, file, pol)
if err != nil {
return nil, err
}
de, err := newDirEntry(file, res.de.ObjectID)
de, err := newDirEntry(file, res.ObjectID)
if err != nil {
return nil, errors.Wrap(err, "unable to create dir entry")
}
de.DirSummary = &fs.DirectorySummary{
TotalFileCount: 1,
TotalFileSize: res.de.FileSize,
MaxModTime: res.de.ModTime,
TotalFileSize: res.FileSize,
MaxModTime: res.ModTime,
}
return de, nil
@@ -256,29 +264,136 @@ func (u *Uploader) uploadDir(ctx context.Context, rootDir fs.Directory, policyTr
return de, err
}
func (u *Uploader) foreachEntryUnlessCancelled(relativePath string, entries fs.Entries, cb func(entry fs.Entry, entryRelativePath string) error) error {
for _, entry := range entries {
if u.IsCancelled() {
return errCancelled
func (u *Uploader) foreachEntryUnlessCancelled(ctx context.Context, parallel int, relativePath string, entries fs.Entries, cb func(ctx context.Context, entry fs.Entry, entryRelativePath string) error) error {
if parallel > len(entries) {
// don't launch more goroutines than needed
parallel = len(entries)
}
if parallel == 0 {
return nil
}
ch := make(chan fs.Entry)
eg, ctx := errgroup.WithContext(ctx)
// one goroutine to pump entries into channel until ctx is closed.
eg.Go(func() error {
defer close(ch)
for _, e := range entries {
select {
case ch <- e: // sent to channel
case <-ctx.Done(): // context closed
return nil
}
}
return nil
})
// launch N workers in parallel
for i := 0; i < parallel; i++ {
eg.Go(func() error {
for entry := range ch {
if u.IsCancelled() {
return errCancelled
}
entryRelativePath := path.Join(relativePath, entry.Name())
if err := cb(ctx, entry, entryRelativePath); err != nil {
return err
}
}
return nil
})
}
return eg.Wait()
}
func (u *Uploader) populateChildEntries(parent *snapshot.DirManifest, children <-chan *snapshot.DirEntry) {
parentSummary := parent.Summary
for de := range children {
switch de.Type {
case snapshot.EntryTypeFile:
u.stats.TotalFileCount++
u.stats.TotalFileSize += de.FileSize
parentSummary.TotalFileCount++
parentSummary.TotalFileSize += de.FileSize
if de.ModTime.After(parentSummary.MaxModTime) {
parentSummary.MaxModTime = de.ModTime
}
case snapshot.EntryTypeDirectory:
if childSummary := de.DirSummary; childSummary != nil {
parentSummary.TotalFileCount += childSummary.TotalFileCount
parentSummary.TotalFileSize += childSummary.TotalFileSize
parentSummary.TotalDirCount += childSummary.TotalDirCount
if childSummary.MaxModTime.After(parentSummary.MaxModTime) {
parentSummary.MaxModTime = childSummary.MaxModTime
}
}
}
entryRelativePath := relativePath + "/" + entry.Name()
parent.Entries = append(parent.Entries, de)
}
if err := cb(entry, entryRelativePath); err != nil {
return err
// sort the result, directories first, then non-directories, ordered by name
sort.Slice(parent.Entries, func(i, j int) bool {
if leftDir, rightDir := isDir(parent.Entries[i]), isDir(parent.Entries[j]); leftDir != rightDir {
// directories get sorted before non-directories
return leftDir
}
return parent.Entries[i].Name < parent.Entries[j].Name
})
}
func isDir(e *snapshot.DirEntry) bool {
return e.Type == snapshot.EntryTypeDirectory
}
func (u *Uploader) processChildren(ctx context.Context, dirManifest *snapshot.DirManifest, relativePath string, entries fs.Entries, policyTree *policy.Tree, previousEntries []fs.Entries) error {
var wg sync.WaitGroup
// channel where we will add directory and file entries, possibly in parallel
output := make(chan *snapshot.DirEntry)
// goroutine that will drain data from 'output' and update dirManifest
wg.Add(1)
go func() {
defer wg.Done()
u.populateChildEntries(dirManifest, output)
}()
defer func() {
// before this function returns, close the output channel and wait for the goroutine above to complete.
close(output)
wg.Wait()
}()
if err := u.processSubdirectories(ctx, output, relativePath, entries, policyTree, previousEntries); err != nil {
return err
}
if err := u.processNonDirectories(ctx, output, relativePath, entries, policyTree, previousEntries); err != nil {
return err
}
return nil
}
type entryResult struct {
err error
de *snapshot.DirEntry
}
func (u *Uploader) processSubdirectories(ctx context.Context, output chan *snapshot.DirEntry, relativePath string, entries fs.Entries, policyTree *policy.Tree, previousEntries []fs.Entries) error {
// for now don't process subdirectories in parallel, we need a mechanism to
// prevent explosion of parallelism
const parallelism = 1
func (u *Uploader) processSubdirectories(ctx context.Context, relativePath string, entries fs.Entries, policyTree *policy.Tree, previousEntries []fs.Entries, dirManifest *snapshot.DirManifest, summ *fs.DirectorySummary) error {
return u.foreachEntryUnlessCancelled(relativePath, entries, func(entry fs.Entry, entryRelativePath string) error {
return u.foreachEntryUnlessCancelled(ctx, parallelism, relativePath, entries, func(ctx context.Context, entry fs.Entry, entryRelativePath string) error {
dir, ok := entry.(fs.Directory)
if !ok {
// skip non-directories
@@ -299,13 +414,6 @@ func (u *Uploader) processSubdirectories(ctx context.Context, relativePath strin
return err
}
summ.TotalFileCount += subdirsumm.TotalFileCount
summ.TotalFileSize += subdirsumm.TotalFileSize
summ.TotalDirCount += subdirsumm.TotalDirCount
if subdirsumm.MaxModTime.After(summ.MaxModTime) {
summ.MaxModTime = subdirsumm.MaxModTime
}
if err != nil {
// Note: This only catches errors in subdirectories of the snapshot root, not on the snapshot
// root itself. The intention is to always fail if the top level directory can't be read,
@@ -324,18 +432,11 @@ func (u *Uploader) processSubdirectories(ctx context.Context, relativePath strin
}
de.DirSummary = &subdirsumm
dirManifest.Entries = append(dirManifest.Entries, de)
output <- de
return nil
})
}
type uploadWorkItem struct {
entry fs.Entry
entryRelativePath string
uploadFunc func() entryResult
resultChan chan entryResult
}
func metadataEquals(e1, e2 fs.Entry) bool {
if l, r := e1.ModTime(), e2.ModTime(); !l.Equal(r) {
return false
@@ -393,29 +494,22 @@ func (u *Uploader) maybeIgnoreCachedEntry(ctx context.Context, ent fs.Entry) fs.
return nil
}
func (u *Uploader) prepareWorkItems(ctx context.Context, dirRelativePath string, entries fs.Entries, policyTree *policy.Tree, prevEntries []fs.Entries, summ *fs.DirectorySummary) ([]*uploadWorkItem, error) {
var result []*uploadWorkItem
func (u *Uploader) processNonDirectories(ctx context.Context, output chan *snapshot.DirEntry, dirRelativePath string, entries fs.Entries, policyTree *policy.Tree, prevEntries []fs.Entries) error {
workerCount := u.ParallelUploads
if workerCount == 0 {
workerCount = runtime.NumCPU()
}
resultErr := u.foreachEntryUnlessCancelled(dirRelativePath, entries, func(entry fs.Entry, entryRelativePath string) error {
return u.foreachEntryUnlessCancelled(ctx, workerCount, dirRelativePath, entries, func(ctx context.Context, entry fs.Entry, entryRelativePath string) error {
// note this function runs in parallel and updates 'u.stats', which must be done using atomic operations.
if _, ok := entry.(fs.Directory); ok {
// skip directories
return nil
}
// regular file
if entry, ok := entry.(fs.File); ok {
u.stats.TotalFileCount++
u.stats.TotalFileSize += entry.Size()
summ.TotalFileCount++
summ.TotalFileSize += entry.Size()
if entry.ModTime().After(summ.MaxModTime) {
summ.MaxModTime = entry.ModTime()
}
}
// See if we had this name during either of previous passes.
if cachedEntry := u.maybeIgnoreCachedEntry(ctx, findCachedEntry(ctx, entry, prevEntries)); cachedEntry != nil {
u.stats.CachedFiles++
atomic.AddInt32(&u.stats.CachedFiles, 1)
u.Progress.CachedFile(filepath.Join(dirRelativePath, entry.Name()), entry.Size())
// compute entryResult now, cachedEntry is short-lived
@@ -424,117 +518,34 @@ func (u *Uploader) prepareWorkItems(ctx context.Context, dirRelativePath string,
return errors.Wrap(err, "unable to create dir entry")
}
// Avoid hashing by reusing previous object ID.
result = append(result, &uploadWorkItem{
entry: entry,
entryRelativePath: entryRelativePath,
uploadFunc: func() entryResult {
return entryResult{de: cachedDirEntry}
},
})
} else {
switch entry := entry.(type) {
case fs.Symlink:
result = append(result, &uploadWorkItem{
entry: entry,
entryRelativePath: entryRelativePath,
uploadFunc: func() entryResult {
return u.uploadSymlinkInternal(ctx, filepath.Join(dirRelativePath, entry.Name()), entry)
},
})
case fs.File:
u.stats.NonCachedFiles++
result = append(result, &uploadWorkItem{
entry: entry,
entryRelativePath: entryRelativePath,
uploadFunc: func() entryResult {
return u.uploadFileInternal(ctx, filepath.Join(dirRelativePath, entry.Name()), entry, policyTree.Child(entry.Name()).EffectivePolicy())
},
})
default:
return errors.Errorf("file type not supported: %v", entry.Mode())
}
output <- cachedDirEntry
return nil
}
switch entry := entry.(type) {
case fs.Symlink:
de, err := u.uploadSymlinkInternal(ctx, filepath.Join(dirRelativePath, entry.Name()), entry)
if err != nil {
return u.maybeIgnoreFileReadError(err, policyTree)
}
output <- de
return nil
case fs.File:
atomic.AddInt32(&u.stats.NonCachedFiles, 1)
de, err := u.uploadFileInternal(ctx, filepath.Join(dirRelativePath, entry.Name()), entry, policyTree.Child(entry.Name()).EffectivePolicy())
if err != nil {
return u.maybeIgnoreFileReadError(err, policyTree)
}
output <- de
return nil
default:
return errors.Errorf("file type not supported: %v", entry.Mode())
}
return nil
})
return result, resultErr
}
func toChannel(items []*uploadWorkItem) <-chan *uploadWorkItem {
ch := make(chan *uploadWorkItem)
go func() {
defer close(ch)
for _, wi := range items {
ch <- wi
}
}()
return ch
}
func (u *Uploader) launchWorkItems(workItems []*uploadWorkItem, wg *sync.WaitGroup) {
// allocate result channel for each work item.
for _, it := range workItems {
it.resultChan = make(chan entryResult, 1)
}
workerCount := u.ParallelUploads
if workerCount == 0 {
workerCount = runtime.NumCPU()
}
ch := toChannel(workItems)
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for it := range ch {
it.resultChan <- it.uploadFunc()
}
}()
}
}
func (u *Uploader) processUploadWorkItems(ctx context.Context, workItems []*uploadWorkItem, dirManifest *snapshot.DirManifest, ignoreFileErrs bool) error {
var wg sync.WaitGroup
u.launchWorkItems(workItems, &wg)
// Read result channels in order.
for _, it := range workItems {
result := <-it.resultChan
if result.err == errCancelled {
return errCancelled
}
if result.err != nil {
if ignoreFileErrs {
u.stats.ReadErrors++
log(ctx).Warningf("unable to hash file %q: %s, ignoring", it.entryRelativePath, result.err)
continue
}
return errors.Errorf("unable to process %q: %s", it.entryRelativePath, result.err)
}
dirManifest.Entries = append(dirManifest.Entries, result.de)
}
// wait for workers, this is technically not needed, but let's make sure we don't leak goroutines
wg.Wait()
return nil
}
func maybeReadDirectoryEntries(ctx context.Context, dir fs.Directory) fs.Entries {
@@ -591,18 +602,20 @@ func uploadDirInternal(
u.Progress.StartedDirectory(dirRelativePath)
defer u.Progress.FinishedDirectory(dirRelativePath)
var summ fs.DirectorySummary
summ.TotalDirCount = 1
dirManifest := &snapshot.DirManifest{
StreamType: directoryStreamType,
Summary: &fs.DirectorySummary{
TotalDirCount: 1,
},
}
defer func() {
summ.IncompleteReason = u.cancelReason()
dirManifest.Summary.IncompleteReason = u.cancelReason()
}()
log(ctx).Debugf("reading directory %v", dirRelativePath)
t0 := u.repo.Time()
entries, direrr := directory.Readdir(ctx)
log(ctx).Debugf("finished reading directory %v", dirRelativePath)
log(ctx).Debugf("finished reading directory %v in %v", dirRelativePath, u.repo.Time().Sub(t0))
if direrr != nil {
return "", fs.DirectorySummary{}, dirReadError{direrr}
@@ -616,57 +629,38 @@ func uploadDirInternal(
}
}
if len(entries) == 0 {
summ.MaxModTime = directory.ModTime()
}
dirManifest := &snapshot.DirManifest{
StreamType: directoryStreamType,
}
if err := u.processSubdirectories(ctx, dirRelativePath, entries, policyTree, prevEntries, dirManifest, &summ); err != nil && err != errCancelled {
if err := u.processChildren(ctx, dirManifest, dirRelativePath, entries, policyTree, prevEntries); err != nil && err != errCancelled {
return "", fs.DirectorySummary{}, err
}
log(ctx).Debugf("preparing work items %v", dirRelativePath)
workItems, workItemErr := u.prepareWorkItems(ctx, dirRelativePath, entries, policyTree, prevEntries, &summ)
log(ctx).Debugf("finished preparing work items %v", dirRelativePath)
if workItemErr != nil && workItemErr != errCancelled {
return "", fs.DirectorySummary{}, workItemErr
if len(dirManifest.Entries) == 0 {
dirManifest.Summary.MaxModTime = directory.ModTime()
}
ignoreFileErrs := u.shouldIgnoreFileReadErrors(policyTree)
if err := u.processUploadWorkItems(ctx, workItems, dirManifest, ignoreFileErrs); err != nil && err != errCancelled {
return "", fs.DirectorySummary{}, err
}
log(ctx).Debugf("finished processing uploads %v", dirRelativePath)
dirManifest.Summary = &summ
// at this point dirManifest is ready to go
writer := u.repo.Objects.NewWriter(ctx, object.WriterOptions{
Description: "DIR:" + dirRelativePath,
Prefix: "k",
})
if err := json.NewEncoder(writer).Encode(&dirManifest); err != nil {
if err := json.NewEncoder(writer).Encode(dirManifest); err != nil {
return "", fs.DirectorySummary{}, errors.Wrap(err, "unable to encode directory JSON")
}
oid, err := writer.Result()
return oid, summ, err
return oid, *dirManifest.Summary, err
}
func (u *Uploader) shouldIgnoreFileReadErrors(policyTree *policy.Tree) bool {
func (u *Uploader) maybeIgnoreFileReadError(err error, policyTree *policy.Tree) error {
errHandlingPolicy := policyTree.EffectivePolicy().ErrorHandlingPolicy
if u.IgnoreReadErrors {
return true
if u.IgnoreReadErrors || errHandlingPolicy.IgnoreFileErrorsOrDefault(false) {
return nil
}
return errHandlingPolicy.IgnoreFileErrorsOrDefault(false)
return err
}
func (u *Uploader) shouldIgnoreDirectoryReadErrors(policyTree *policy.Tree) bool {
@@ -686,6 +680,13 @@ func NewUploader(r *repo.Repository) *Uploader {
Progress: &NullUploadProgress{},
IgnoreReadErrors: false,
ParallelUploads: 1,
uploadBufPool: sync.Pool{
New: func() interface{} {
p := make([]byte, copyBufferSize)
return &p
},
},
}
}

View File

@@ -126,7 +126,7 @@ func TestUpload(t *testing.T) {
t.Errorf("expected s1.RootObjectID==s2.RootObjectID, got %v and %v", s1.RootObjectID().String(), s2.RootObjectID().String())
}
if got, want := s1.Stats.CachedFiles, 0; got != want {
if got, want := s1.Stats.CachedFiles, int32(0); got != want {
t.Errorf("unexpected s1 cached files: %v, want %v", got, want)
}
@@ -135,7 +135,7 @@ func TestUpload(t *testing.T) {
t.Errorf("unexpected s2 cached files: %v, want %v", got, want)
}
if got, want := s2.Stats.NonCachedFiles, 0; got != want {
if got, want := s2.Stats.NonCachedFiles, int32(0); got != want {
t.Errorf("unexpected non-cached files: %v", got)
}

View File

@@ -14,8 +14,8 @@ type Stats struct {
ExcludedTotalFileSize int64 `json:"excludedTotalSize"`
ExcludedDirCount int `json:"excludedDirCount"`
CachedFiles int `json:"cachedFiles"`
NonCachedFiles int `json:"nonCachedFiles"`
CachedFiles int32 `json:"cachedFiles"`
NonCachedFiles int32 `json:"nonCachedFiles"`
ReadErrors int `json:"readErrors"`
}