Fixed checkpointing to not restart the entire upload process (#594)

* object: added Checkpoint() method to object writer

* upload: refactored code structure to allow better checkpointing

* upload: removed Checkpoint() method from UploadProgress

* Update fs/entry.go

Co-authored-by: Julio López <julio+gh@kasten.io>
This commit is contained in:
Jarek Kowalski
2020-09-12 22:36:22 -07:00
committed by GitHub
parent 6a14ac8a2a
commit f0b97b960b
10 changed files with 556 additions and 221 deletions

View File

@@ -89,24 +89,6 @@ func (p *cliProgress) CachedFile(fname string, numBytes int64) {
p.maybeOutput()
}
func (p *cliProgress) Checkpoint() {
p.output(noticeColor, "Saving a checkpoint...\n")
if p.shared {
// do not reset counters
return
}
*p = cliProgress{
uploading: 1,
uploadStartTime: clock.Now(),
previousFileCount: p.previousFileCount,
previousTotalSize: p.previousTotalSize,
uploadedBytes: p.uploadedBytes,
uploadedFiles: p.uploadedFiles,
}
}
func (p *cliProgress) maybeOutput() {
if atomic.LoadInt32(&p.uploading) == 0 {
return

View File

@@ -89,6 +89,15 @@ type DirectorySummary struct {
FailedEntries []*EntryWithError `json:"errors,omitempty"`
}
// Clone clones given directory summary.
func (s *DirectorySummary) Clone() DirectorySummary {
res := *s
res.FailedEntries = append([]*EntryWithError(nil), s.FailedEntries...)
return res
}
// Symlink represents a symbolic link entry.
type Symlink interface {
Entry

View File

@@ -148,6 +148,78 @@ func TestWriterCompleteChunkInTwoWrites(t *testing.T) {
}
}
func TestCheckpointing(t *testing.T) {
ctx := testlogging.Context(t)
_, om := setupTest(t)
writer := om.NewWriter(ctx, WriterOptions{})
// write all zeroes
allZeroes := make([]byte, 1<<20)
// empty file, nothing flushed
checkpoint1, err := writer.Checkpoint()
verifyNoError(t, err)
// write some bytes, but not enough to flush.
writer.Write(allZeroes[0:50])
checkpoint2, err := writer.Checkpoint()
verifyNoError(t, err)
// write enough to flush first content.
writer.Write(allZeroes)
checkpoint3, err := writer.Checkpoint()
verifyNoError(t, err)
// write enough to flush second content.
writer.Write(allZeroes)
checkpoint4, err := writer.Checkpoint()
verifyNoError(t, err)
result, err := writer.Result()
verifyNoError(t, err)
if !objectIDsEqual(checkpoint1, "") {
t.Errorf("unexpected checkpoint1: %v err: %v", checkpoint1, err)
}
if !objectIDsEqual(checkpoint2, "") {
t.Errorf("unexpected checkpoint2: %v err: %v", checkpoint2, err)
}
verifyFull(ctx, t, om, checkpoint3, allZeroes)
verifyFull(ctx, t, om, checkpoint4, make([]byte, 2<<20))
verifyFull(ctx, t, om, result, make([]byte, 2<<20+50))
}
func verifyFull(ctx context.Context, t *testing.T, om *Manager, oid ID, want []byte) {
t.Helper()
r, err := om.Open(ctx, oid)
if err != nil {
t.Fatalf("unable to open %v: %v", oid, err)
}
defer r.Close()
data, err := ioutil.ReadAll(r)
if err != nil {
t.Fatalf("unable to read all: %v", err)
}
if !bytes.Equal(data, want) {
t.Fatalf("unexpected data read for %v", oid)
}
}
func verifyNoError(t *testing.T, err error) {
t.Helper()
if err != nil {
t.Fatal(err)
}
}
func verifyIndirectBlock(ctx context.Context, t *testing.T, r *Manager, oid ID) {
for indexContentID, isIndirect := oid.IndexObjectID(); isIndirect; indexContentID, isIndirect = indexContentID.IndexObjectID() {
if c, _, ok := indexContentID.ContentID(); ok {

View File

@@ -25,6 +25,12 @@
type Writer interface {
io.WriteCloser
// Checkpoint returns ID of an object consisting of all contents written to storage so far.
// This may not include some data buffered in the writer.
// In case nothing has been written yet, returns empty object ID.
Checkpoint() (ID, error)
// Result returns object ID representing all bytes written to the writer.
Result() (ID, error)
}
@@ -77,6 +83,8 @@ type objectWriter struct {
splitter splitter.Splitter
writeMutex sync.Mutex
asyncWritesSemaphore chan struct{} // async writes semaphore or nil
asyncWritesWG sync.WaitGroup
@@ -103,6 +111,9 @@ func (w *objectWriter) Close() error {
}
func (w *objectWriter) Write(data []byte) (n int, err error) {
w.writeMutex.Lock()
defer w.writeMutex.Unlock()
dataLen := len(data)
w.totalLength += int64(dataLen)
@@ -229,6 +240,17 @@ func maybeCompressedContentBytes(comp compression.Compressor, output *bytes.Buff
return input, false, nil
}
func (w *objectWriter) drainWrites() {
w.writeMutex.Lock()
// wait for any in-flight asynchronous writes to finish
w.asyncWritesWG.Wait()
}
func (w *objectWriter) undrainWrites() {
w.writeMutex.Unlock()
}
func (w *objectWriter) Result() (ID, error) {
// no need to hold a lock on w.indirectIndexGrowMutex, since growing index only happens synchronously
// and never in parallel with calling Result()
@@ -238,13 +260,23 @@ func (w *objectWriter) Result() (ID, error) {
}
}
// wait for any asynchronous writes to complete.
w.asyncWritesWG.Wait()
return w.Checkpoint()
}
// Checkpoint returns object ID which represents portion of the object that has already been written.
// The result may be an empty object ID if nothing has been flushed yet.
func (w *objectWriter) Checkpoint() (ID, error) {
w.drainWrites()
defer w.undrainWrites()
if w.contentWriteError != nil {
return "", w.contentWriteError
}
if len(w.indirectIndex) == 0 {
return "", nil
}
if len(w.indirectIndex) == 1 {
return w.indirectIndex[0].Object, nil
}

View File

@@ -0,0 +1,62 @@
package snapshotfs
import (
"sync"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/snapshot"
)
// checkpointFunc is invoked when checkpoint occurs. The callback must checkpoint current state of
// file or directory and return directory entry.
type checkpointFunc func() (*snapshot.DirEntry, error)
type checkpointRegistry struct {
mu sync.Mutex
checkpoints map[string]checkpointFunc
}
func (r *checkpointRegistry) addCheckpointCallback(e fs.Entry, f checkpointFunc) {
r.mu.Lock()
defer r.mu.Unlock()
if r.checkpoints == nil {
r.checkpoints = map[string]checkpointFunc{}
}
r.checkpoints[e.Name()] = f
}
func (r *checkpointRegistry) removeCheckpointCallback(e fs.Entry) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.checkpoints, e.Name())
}
// runCheckpoints invokes all registered checkpointers and adds results to the provided builder, while
// randomizing file names for non-directory entries. this is to prevent the use of checkpointed objects
// as authoritative on subsequent runs.
func (r *checkpointRegistry) runCheckpoints(checkpointBuilder *dirManifestBuilder) error {
r.mu.Lock()
defer r.mu.Unlock()
for n, cp := range r.checkpoints {
de, err := cp()
if err != nil {
return errors.Wrapf(err, "error checkpointing %v", n)
}
if de.Type != snapshot.EntryTypeDirectory {
de.Name = ".checkpointed." + de.Name + "." + uuid.New().String()
}
checkpointBuilder.addEntry(de)
}
return nil
}

View File

@@ -0,0 +1,82 @@
package snapshotfs
import (
"os"
"strings"
"testing"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/mockfs"
"github.com/kopia/kopia/snapshot"
)
func TestCheckpointRegistry(t *testing.T) {
var cp checkpointRegistry
d := mockfs.NewDirectory()
dir1 := d.AddDir("dir1", os.FileMode(0o755))
f1 := d.AddFile("f1", []byte{1, 2, 3}, os.FileMode(0o755))
f2 := d.AddFile("f2", []byte{2, 3, 4}, os.FileMode(0o755))
f3 := d.AddFile("f3", []byte{2, 3, 4}, os.FileMode(0o755))
cp.addCheckpointCallback(dir1, func() (*snapshot.DirEntry, error) {
return &snapshot.DirEntry{
Name: "dir1",
Type: snapshot.EntryTypeDirectory,
}, nil
})
cp.addCheckpointCallback(f1, func() (*snapshot.DirEntry, error) {
return &snapshot.DirEntry{
Name: "f1",
}, nil
})
cp.addCheckpointCallback(f2, func() (*snapshot.DirEntry, error) {
return &snapshot.DirEntry{
Name: "f2",
}, nil
})
cp.addCheckpointCallback(f3, func() (*snapshot.DirEntry, error) {
return &snapshot.DirEntry{
Name: "other",
}, nil
})
// remove callback before it has a chance of firing
cp.removeCheckpointCallback(f3)
cp.removeCheckpointCallback(f3)
var dmb dirManifestBuilder
dmb.addEntry(&snapshot.DirEntry{
Name: "pre-existing",
})
if err := cp.runCheckpoints(&dmb); err != nil {
t.Fatalf("error running checkpoints: %v", err)
}
dm := dmb.Build(clock.Now(), "checkpoint")
if got, want := len(dm.Entries), 4; got != want {
t.Fatalf("got %v entries, wanted %v (%+#v)", got, want, dm.Entries)
}
// directory names don't get mangled
if dm.Entries[0].Name != "dir1" {
t.Errorf("invalid entry %v", dm.Entries[0])
}
if !strings.HasPrefix(dm.Entries[1].Name, ".checkpointed.f1.") {
t.Errorf("invalid entry %v", dm.Entries[1])
}
if !strings.HasPrefix(dm.Entries[2].Name, ".checkpointed.f2.") {
t.Errorf("invalid entry %v", dm.Entries[2])
}
if dm.Entries[3].Name != "pre-existing" {
t.Errorf("invalid entry %v", dm.Entries[3])
}
}

View File

@@ -69,11 +69,15 @@ type Uploader struct {
repo repo.Repository
stats snapshot.Stats
canceled int32
nextCheckpointTime time.Time
stats snapshot.Stats
canceled int32
uploadBufPool sync.Pool
getTicker func(time.Duration) <-chan time.Time
// for testing only, when set will write to a given channel whenever checkpoint completes
checkpointFinished chan struct{}
}
// IsCanceled returns true if the upload is canceled.
@@ -87,10 +91,6 @@ func (u *Uploader) incompleteReason() string {
return IncompleteReasonCanceled
}
if !u.nextCheckpointTime.IsZero() && u.repo.Time().After(u.nextCheckpointTime) {
return IncompleteReasonCheckpoint
}
wb := atomic.LoadInt64(&u.totalWrittenBytes)
if mub := u.MaxUploadBytes; mub > 0 && wb > mub {
return IncompleteReasonLimitReached
@@ -99,7 +99,7 @@ func (u *Uploader) incompleteReason() string {
return ""
}
func (u *Uploader) uploadFileInternal(ctx context.Context, relativePath string, f fs.File, pol *policy.Policy, asyncWrites int) (*snapshot.DirEntry, error) {
func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, relativePath string, f fs.File, pol *policy.Policy, asyncWrites int) (*snapshot.DirEntry, error) {
u.Progress.HashingFile(relativePath)
defer u.Progress.FinishedHashingFile(relativePath, f.Size())
@@ -116,6 +116,18 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, relativePath string,
})
defer writer.Close() //nolint:errcheck
parentCheckpointRegistry.addCheckpointCallback(f, func() (*snapshot.DirEntry, error) {
// nolint:govet
checkpointID, err := writer.Checkpoint()
if err != nil {
return nil, err
}
return newDirEntry(f, checkpointID)
})
defer parentCheckpointRegistry.removeCheckpointCallback(f)
written, err := u.copyWithProgress(writer, file, 0, f.Size())
if err != nil {
return nil, err
@@ -138,6 +150,9 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, relativePath string,
de.FileSize = written
atomic.AddInt32(&u.stats.TotalFileCount, 1)
atomic.AddInt64(&u.stats.TotalFileSize, de.FileSize)
return de, nil
}
@@ -225,6 +240,17 @@ func (u *Uploader) copyWithProgress(dst io.Writer, src io.Reader, completed, len
return written, nil
}
func newDirEntryWithSummary(d fs.Entry, oid object.ID, summ *fs.DirectorySummary) (*snapshot.DirEntry, error) {
de, err := newDirEntry(d, oid)
if err != nil {
return nil, err
}
de.DirSummary = summ
return de, nil
}
func newDirEntry(md fs.Entry, oid object.ID) (*snapshot.DirEntry, error) {
var entryType snapshot.EntryType
@@ -251,82 +277,113 @@ func newDirEntry(md fs.Entry, oid object.ID) (*snapshot.DirEntry, error) {
}, nil
}
// uploadFile uploads the specified File to the repository.
func (u *Uploader) uploadFile(ctx context.Context, relativePath string, file fs.File, pol *policy.Policy) (*snapshot.DirEntry, error) {
// uploadFileWithCheckpointing uploads the specified File to the repository.
func (u *Uploader) uploadFileWithCheckpointing(ctx context.Context, relativePath string, file fs.File, pol *policy.Policy, sourceInfo snapshot.SourceInfo) (*snapshot.DirEntry, error) {
par := u.effectiveParallelUploads()
if par == 1 {
par = 0
}
res, err := u.uploadFileInternal(ctx, relativePath, file, pol, par)
var cp checkpointRegistry
cancelCheckpointer := u.periodicallyCheckpoint(ctx, &cp, &snapshot.Manifest{Source: sourceInfo})
defer cancelCheckpointer()
res, err := u.uploadFileInternal(ctx, &cp, relativePath, file, pol, par)
if err != nil {
return nil, err
}
de, err := newDirEntry(file, res.ObjectID)
if err != nil {
return nil, errors.Wrap(err, "unable to create dir entry")
}
de.DirSummary = &fs.DirectorySummary{
return newDirEntryWithSummary(file, res.ObjectID, &fs.DirectorySummary{
TotalFileCount: 1,
TotalFileSize: res.FileSize,
MaxModTime: res.ModTime,
})
}
// checkpointRoot invokes checkpoints on the provided registry and if a checkpoint entry was generated,
// saves it in an incomplete snapshot manifest.
func (u *Uploader) checkpointRoot(ctx context.Context, cp *checkpointRegistry, prototypeManifest *snapshot.Manifest) error {
var dmbCheckpoint dirManifestBuilder
if err := cp.runCheckpoints(&dmbCheckpoint); err != nil {
return errors.Wrap(err, "running checkpointers")
}
return de, nil
checkpointManifest := dmbCheckpoint.Build(u.repo.Time(), "dummy")
if len(checkpointManifest.Entries) == 0 {
// did not produce a checkpoint, that's ok
return nil
}
if len(checkpointManifest.Entries) > 1 {
return errors.Errorf("produced more than one checkpoint: %v", len(checkpointManifest.Entries))
}
rootEntry := checkpointManifest.Entries[0]
log(ctx).Debugf("checkpointed root %v", rootEntry.ObjectID)
man := *prototypeManifest
man.RootEntry = rootEntry
man.EndTime = u.repo.Time()
man.StartTime = man.EndTime
man.IncompleteReason = IncompleteReasonCheckpoint
if _, err := snapshot.SaveSnapshot(ctx, u.repo, &man); err != nil {
return errors.Wrap(err, "error saving checkpoint snapshot")
}
if err := u.repo.Flush(ctx); err != nil {
return errors.Wrap(err, "error flushing after checkpoint")
}
return nil
}
// periodicallyCheckpoint periodically (every CheckpointInterval) invokes checkpointRoot until the
// returned cancelation function has been called.
func (u *Uploader) periodicallyCheckpoint(ctx context.Context, cp *checkpointRegistry, prototypeManifest *snapshot.Manifest) (cancelFunc func()) {
shutdown := make(chan struct{})
ch := u.getTicker(u.CheckpointInterval)
go func() {
for {
select {
case <-shutdown:
return
case <-ch:
if err := u.checkpointRoot(ctx, cp, prototypeManifest); err != nil {
log(ctx).Warningf("error checkpointing: %v", err)
u.Cancel()
return
}
// test hook
if u.checkpointFinished != nil {
u.checkpointFinished <- struct{}{}
}
}
}
}()
return func() {
close(shutdown)
}
}
// uploadDirWithCheckpointing uploads the specified Directory to the repository.
func (u *Uploader) uploadDirWithCheckpointing(ctx context.Context, rootDir fs.Directory, policyTree *policy.Tree, previousDirs []fs.Directory, sourceInfo snapshot.SourceInfo) (*snapshot.DirEntry, error) {
for {
if u.CheckpointInterval != 0 {
u.nextCheckpointTime = u.repo.Time().Add(u.CheckpointInterval)
} else {
u.nextCheckpointTime = time.Time{}
}
var (
dmb dirManifestBuilder
cp checkpointRegistry
)
startTime := u.repo.Time()
cancelCheckpointer := u.periodicallyCheckpoint(ctx, &cp, &snapshot.Manifest{Source: sourceInfo})
defer cancelCheckpointer()
oid, summ, err := uploadDirInternal(ctx, u, rootDir, policyTree, previousDirs, ".")
if err != nil && !errors.Is(err, errCanceled) {
return nil, err
}
de, err := newDirEntry(rootDir, oid)
if err != nil {
return nil, errors.Wrap(err, "unable to create dir entry")
}
de.DirSummary = &summ
if summ.IncompleteReason == IncompleteReasonCheckpoint {
u.Progress.Checkpoint()
// when retrying use the partial snapshot
previousDirs = append(previousDirs, DirectoryEntry(u.repo, oid, &summ))
man := &snapshot.Manifest{
StartTime: startTime,
EndTime: u.repo.Time(),
RootEntry: de,
Source: sourceInfo,
IncompleteReason: summ.IncompleteReason,
}
if _, err = snapshot.SaveSnapshot(ctx, u.repo, man); err != nil {
return nil, errors.Wrap(err, "error saving checkpoint")
}
if err = u.repo.Flush(ctx); err != nil {
return nil, errors.Wrap(err, "error flushing saving checkpoint")
}
continue
}
return de, err
}
return uploadDirInternal(ctx, u, rootDir, policyTree, previousDirs, ".", &dmb, &cp)
}
func (u *Uploader) foreachEntryUnlessCanceled(ctx context.Context, parallel int, relativePath string, entries fs.Entries, cb func(ctx context.Context, entry fs.Entry, entryRelativePath string) error) error {
@@ -377,11 +434,6 @@ func (u *Uploader) foreachEntryUnlessCanceled(ctx context.Context, parallel int,
return eg.Wait()
}
type dirEntryOrError struct {
de *snapshot.DirEntry
failedEntry *fs.EntryWithError
}
func rootCauseError(err error) error {
err = errors.Cause(err)
if oserr, ok := err.(*os.PathError); ok {
@@ -391,106 +443,124 @@ func rootCauseError(err error) error {
return err
}
func (u *Uploader) populateChildEntries(parent *snapshot.DirManifest, children <-chan dirEntryOrError) {
parentSummary := parent.Summary
type dirManifestBuilder struct {
mu sync.Mutex
for it := range children {
if it.failedEntry != nil {
parentSummary.NumFailed++
parentSummary.FailedEntries = append(parentSummary.FailedEntries, it.failedEntry)
summary fs.DirectorySummary
entries []*snapshot.DirEntry
}
continue
// Clone clones the current state of dirManifestBuilder.
func (b *dirManifestBuilder) Clone() *dirManifestBuilder {
b.mu.Lock()
defer b.mu.Unlock()
return &dirManifestBuilder{
summary: b.summary.Clone(),
entries: append([]*snapshot.DirEntry(nil), b.entries...),
}
}
func (b *dirManifestBuilder) addEntry(de *snapshot.DirEntry) {
b.mu.Lock()
defer b.mu.Unlock()
b.entries = append(b.entries, de)
// nolint:exhaustive
switch de.Type {
case snapshot.EntryTypeFile:
b.summary.TotalFileCount++
b.summary.TotalFileSize += de.FileSize
if de.ModTime.After(b.summary.MaxModTime) {
b.summary.MaxModTime = de.ModTime
}
de := it.de
case snapshot.EntryTypeDirectory:
if childSummary := de.DirSummary; childSummary != nil {
b.summary.TotalFileCount += childSummary.TotalFileCount
b.summary.TotalFileSize += childSummary.TotalFileSize
b.summary.TotalDirCount += childSummary.TotalDirCount
b.summary.NumFailed += childSummary.NumFailed
b.summary.FailedEntries = append(b.summary.FailedEntries, childSummary.FailedEntries...)
// nolint:exhaustive
switch de.Type {
case snapshot.EntryTypeFile:
u.stats.TotalFileCount++
u.stats.TotalFileSize += de.FileSize
parentSummary.TotalFileCount++
parentSummary.TotalFileSize += de.FileSize
if de.ModTime.After(parentSummary.MaxModTime) {
parentSummary.MaxModTime = de.ModTime
}
case snapshot.EntryTypeDirectory:
if childSummary := de.DirSummary; childSummary != nil {
parentSummary.TotalFileCount += childSummary.TotalFileCount
parentSummary.TotalFileSize += childSummary.TotalFileSize
parentSummary.TotalDirCount += childSummary.TotalDirCount
parentSummary.NumFailed += childSummary.NumFailed
parentSummary.FailedEntries = append(parentSummary.FailedEntries, childSummary.FailedEntries...)
if childSummary.MaxModTime.After(parentSummary.MaxModTime) {
parentSummary.MaxModTime = childSummary.MaxModTime
}
if childSummary.MaxModTime.After(b.summary.MaxModTime) {
b.summary.MaxModTime = childSummary.MaxModTime
}
}
}
}
parent.Entries = append(parent.Entries, de)
func (b *dirManifestBuilder) addFailedEntry(relPath string, err error) {
b.mu.Lock()
defer b.mu.Unlock()
b.summary.NumFailed++
b.summary.FailedEntries = append(b.summary.FailedEntries, &fs.EntryWithError{
EntryPath: relPath,
Error: err.Error(),
})
}
func (b *dirManifestBuilder) Build(dirModTime time.Time, incompleteReason string) *snapshot.DirManifest {
b.mu.Lock()
defer b.mu.Unlock()
s := b.summary
s.TotalDirCount++
if len(b.entries) == 0 {
s.MaxModTime = dirModTime
}
s.IncompleteReason = incompleteReason
// take top N sorted failed entries
if len(parent.Summary.FailedEntries) > 0 {
sort.Slice(parent.Summary.FailedEntries, func(i, j int) bool {
return parent.Summary.FailedEntries[i].EntryPath < parent.Summary.FailedEntries[j].EntryPath
if len(b.summary.FailedEntries) > 0 {
sort.Slice(b.summary.FailedEntries, func(i, j int) bool {
return b.summary.FailedEntries[i].EntryPath < b.summary.FailedEntries[j].EntryPath
})
if len(parent.Summary.FailedEntries) > fs.MaxFailedEntriesPerDirectorySummary {
parent.Summary.FailedEntries = parent.Summary.FailedEntries[0:fs.MaxFailedEntriesPerDirectorySummary]
if len(b.summary.FailedEntries) > fs.MaxFailedEntriesPerDirectorySummary {
b.summary.FailedEntries = b.summary.FailedEntries[0:fs.MaxFailedEntriesPerDirectorySummary]
}
}
// sort the result, directories first, then non-directories, ordered by name
sort.Slice(parent.Entries, func(i, j int) bool {
if leftDir, rightDir := isDir(parent.Entries[i]), isDir(parent.Entries[j]); leftDir != rightDir {
sort.Slice(b.entries, func(i, j int) bool {
if leftDir, rightDir := isDir(b.entries[i]), isDir(b.entries[j]); leftDir != rightDir {
// directories get sorted before non-directories
return leftDir
}
return parent.Entries[i].Name < parent.Entries[j].Name
return b.entries[i].Name < b.entries[j].Name
})
return &snapshot.DirManifest{
StreamType: directoryStreamType,
Summary: &s,
Entries: b.entries,
}
}
func isDir(e *snapshot.DirEntry) bool {
return e.Type == snapshot.EntryTypeDirectory
}
func (u *Uploader) processChildren(ctx context.Context, dirManifest *snapshot.DirManifest, relativePath string, entries fs.Entries, policyTree *policy.Tree, previousEntries []fs.Entries) error {
var wg sync.WaitGroup
// channel where we will add directory and file entries, possibly in parallel
output := make(chan dirEntryOrError)
// goroutine that will drain data from 'output' and update dirManifest
wg.Add(1)
go func() {
defer wg.Done()
u.populateChildEntries(dirManifest, output)
}()
defer func() {
// before this function returns, close the output channel and wait for the goroutine above to complete.
close(output)
wg.Wait()
}()
if err := u.processSubdirectories(ctx, output, relativePath, entries, policyTree, previousEntries); err != nil {
func (u *Uploader) processChildren(ctx context.Context, parentDirCheckpointRegistry *checkpointRegistry, parentDirBuilder *dirManifestBuilder, relativePath string, entries fs.Entries, policyTree *policy.Tree, previousEntries []fs.Entries) error {
if err := u.processSubdirectories(ctx, parentDirCheckpointRegistry, parentDirBuilder, relativePath, entries, policyTree, previousEntries); err != nil {
return err
}
if err := u.processNonDirectories(ctx, output, relativePath, entries, policyTree, previousEntries); err != nil {
if err := u.processNonDirectories(ctx, parentDirCheckpointRegistry, parentDirBuilder, relativePath, entries, policyTree, previousEntries); err != nil {
return err
}
return nil
}
func (u *Uploader) processSubdirectories(ctx context.Context, output chan dirEntryOrError, relativePath string, entries fs.Entries, policyTree *policy.Tree, previousEntries []fs.Entries) error {
func (u *Uploader) processSubdirectories(ctx context.Context, parentDirCheckpointRegistry *checkpointRegistry, parentDirBuilder *dirManifestBuilder, relativePath string, entries fs.Entries, policyTree *policy.Tree, previousEntries []fs.Entries) error {
// for now don't process subdirectories in parallel, we need a mechanism to
// prevent explosion of parallelism
const parallelism = 1
@@ -511,7 +581,9 @@ func (u *Uploader) processSubdirectories(ctx context.Context, output chan dirEnt
previousDirs = uniqueDirectories(previousDirs)
oid, subdirsumm, err := uploadDirInternal(ctx, u, dir, policyTree.Child(entry.Name()), previousDirs, entryRelativePath)
childDirBuilder := &dirManifestBuilder{}
de, err := uploadDirInternal(ctx, u, dir, policyTree.Child(entry.Name()), previousDirs, entryRelativePath, childDirBuilder, parentDirCheckpointRegistry)
if errors.Is(err, errCanceled) {
return err
}
@@ -525,24 +597,15 @@ func (u *Uploader) processSubdirectories(ctx context.Context, output chan dirEnt
rc := rootCauseError(dre.error)
u.Progress.IgnoredError(entryRelativePath, rc)
output <- dirEntryOrError{
failedEntry: &fs.EntryWithError{
EntryPath: entryRelativePath,
Error: rc.Error(),
},
}
parentDirBuilder.addFailedEntry(entryRelativePath, rc)
return nil
}
return errors.Errorf("unable to process directory %q: %s", entry.Name(), err)
}
de, err := newDirEntry(dir, oid)
if err != nil {
return errors.Wrap(err, "unable to create dir entry")
}
parentDirBuilder.addEntry(de)
de.DirSummary = &subdirsumm
output <- dirEntryOrError{de: de}
return nil
})
}
@@ -605,7 +668,7 @@ func (u *Uploader) effectiveParallelUploads() int {
return p
}
func (u *Uploader) processNonDirectories(ctx context.Context, output chan dirEntryOrError, dirRelativePath string, entries fs.Entries, policyTree *policy.Tree, prevEntries []fs.Entries) error {
func (u *Uploader) processNonDirectories(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, parentDirBuilder *dirManifestBuilder, dirRelativePath string, entries fs.Entries, policyTree *policy.Tree, prevEntries []fs.Entries) error {
workerCount := u.effectiveParallelUploads()
var asyncWritesPerFile int
@@ -639,7 +702,7 @@ func (u *Uploader) processNonDirectories(ctx context.Context, output chan dirEnt
return errors.Wrap(err, "unable to create dir entry")
}
output <- dirEntryOrError{de: cachedDirEntry}
parentDirBuilder.addEntry(cachedDirEntry)
return nil
}
@@ -647,20 +710,20 @@ func (u *Uploader) processNonDirectories(ctx context.Context, output chan dirEnt
case fs.Symlink:
de, err := u.uploadSymlinkInternal(ctx, entryRelativePath, entry)
if err != nil {
return u.maybeIgnoreFileReadError(err, output, entryRelativePath, policyTree)
return u.maybeIgnoreFileReadError(err, parentDirBuilder, entryRelativePath, policyTree)
}
output <- dirEntryOrError{de: de}
parentDirBuilder.addEntry(de)
return nil
case fs.File:
atomic.AddInt32(&u.stats.NonCachedFiles, 1)
de, err := u.uploadFileInternal(ctx, entryRelativePath, entry, policyTree.Child(entry.Name()).EffectivePolicy(), asyncWritesPerFile)
de, err := u.uploadFileInternal(ctx, parentCheckpointRegistry, entryRelativePath, entry, policyTree.Child(entry.Name()).EffectivePolicy(), asyncWritesPerFile)
if err != nil {
return u.maybeIgnoreFileReadError(err, output, entryRelativePath, policyTree)
return u.maybeIgnoreFileReadError(err, parentDirBuilder, entryRelativePath, policyTree)
}
output <- dirEntryOrError{de: de}
parentDirBuilder.addEntry(de)
return nil
default:
@@ -717,25 +780,20 @@ func uploadDirInternal(
policyTree *policy.Tree,
previousDirs []fs.Directory,
dirRelativePath string,
) (object.ID, fs.DirectorySummary, error) {
thisDirBuilder *dirManifestBuilder,
thisCheckpointRegistry *checkpointRegistry,
) (*snapshot.DirEntry, error) {
u.stats.TotalDirectoryCount++
u.Progress.StartedDirectory(dirRelativePath)
defer u.Progress.FinishedDirectory(dirRelativePath)
dirManifest := &snapshot.DirManifest{
StreamType: directoryStreamType,
Summary: &fs.DirectorySummary{
TotalDirCount: 1,
},
}
t0 := u.repo.Time()
entries, direrr := directory.Readdir(ctx)
log(ctx).Debugf("finished reading directory %v in %v", dirRelativePath, u.repo.Time().Sub(t0))
if direrr != nil {
return "", fs.DirectorySummary{}, dirReadError{direrr}
return nil, dirReadError{direrr}
}
var prevEntries []fs.Entries
@@ -746,16 +804,43 @@ func uploadDirInternal(
}
}
if err := u.processChildren(ctx, dirManifest, dirRelativePath, entries, policyTree, prevEntries); err != nil && !errors.Is(err, errCanceled) {
return "", fs.DirectorySummary{}, err
childCheckpointRegistry := &checkpointRegistry{}
thisCheckpointRegistry.addCheckpointCallback(directory, func() (*snapshot.DirEntry, error) {
// when snapshotting the parent, snapshot all our children and tell them to populate
// childCheckpointBuilder
thisCheckpointBuilder := thisDirBuilder.Clone()
// invoke all child checkpoints which will populate thisCheckpointBuilder.
if err := childCheckpointRegistry.runCheckpoints(thisCheckpointBuilder); err != nil {
return nil, errors.Wrapf(err, "error checkpointing children")
}
checkpointManifest := thisCheckpointBuilder.Build(directory.ModTime(), IncompleteReasonCheckpoint)
oid, err := u.writeDirManifest(ctx, dirRelativePath, checkpointManifest)
if err != nil {
return nil, errors.Wrap(err, "error writing dir manifest")
}
return newDirEntryWithSummary(directory, oid, checkpointManifest.Summary)
})
defer thisCheckpointRegistry.removeCheckpointCallback(directory)
if err := u.processChildren(ctx, childCheckpointRegistry, thisDirBuilder, dirRelativePath, entries, policyTree, prevEntries); err != nil && !errors.Is(err, errCanceled) {
return nil, err
}
if len(dirManifest.Entries) == 0 {
dirManifest.Summary.MaxModTime = directory.ModTime()
dirManifest := thisDirBuilder.Build(directory.ModTime(), u.incompleteReason())
oid, err := u.writeDirManifest(ctx, dirRelativePath, dirManifest)
if err != nil {
return nil, errors.Wrapf(err, "error writing dir manifest: %v", directory.Name())
}
// at this point dirManifest is ready to go
return newDirEntryWithSummary(directory, oid, dirManifest.Summary)
}
func (u *Uploader) writeDirManifest(ctx context.Context, dirRelativePath string, dirManifest *snapshot.DirManifest) (object.ID, error) {
writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{
Description: "DIR:" + dirRelativePath,
Prefix: "k",
@@ -764,26 +849,24 @@ func uploadDirInternal(
defer writer.Close() //nolint:errcheck
if err := json.NewEncoder(writer).Encode(dirManifest); err != nil {
return "", fs.DirectorySummary{}, errors.Wrap(err, "unable to encode directory JSON")
return "", errors.Wrap(err, "unable to encode directory JSON")
}
oid, err := writer.Result()
if err != nil {
return "", errors.Wrap(err, "unable to write directory")
}
dirManifest.Summary.IncompleteReason = u.incompleteReason()
return oid, *dirManifest.Summary, err
return oid, nil
}
func (u *Uploader) maybeIgnoreFileReadError(err error, output chan dirEntryOrError, entryRelativePath string, policyTree *policy.Tree) error {
func (u *Uploader) maybeIgnoreFileReadError(err error, dmb *dirManifestBuilder, entryRelativePath string, policyTree *policy.Tree) error {
errHandlingPolicy := policyTree.EffectivePolicy().ErrorHandlingPolicy
if u.IgnoreReadErrors || errHandlingPolicy.IgnoreFileErrorsOrDefault(false) {
err = rootCauseError(err)
u.Progress.IgnoredError(entryRelativePath, err)
output <- dirEntryOrError{failedEntry: &fs.EntryWithError{
EntryPath: entryRelativePath,
Error: err.Error(),
}}
dmb.addFailedEntry(entryRelativePath, err)
return nil
}
@@ -809,6 +892,7 @@ func NewUploader(r repo.Repository) *Uploader {
IgnoreReadErrors: false,
ParallelUploads: 1,
CheckpointInterval: DefaultCheckpointInterval,
getTicker: time.Tick,
uploadBufPool: sync.Pool{
New: func() interface{} {
p := make([]byte, copyBufferSize)
@@ -867,7 +951,7 @@ func (u *Uploader) Upload(
maxPreviousTotalFileSize = s
}
if s := m.Stats.TotalFileCount; s > maxPreviousFileCount {
if s := int(m.Stats.TotalFileCount); s > maxPreviousFileCount {
maxPreviousFileCount = s
}
}
@@ -898,7 +982,7 @@ func (u *Uploader) Upload(
s.RootEntry, err = u.uploadDirWithCheckpointing(ctx, entry, policyTree, previousDirs, sourceInfo)
case fs.File:
s.RootEntry, err = u.uploadFile(ctx, entry.Name(), entry, policyTree.EffectivePolicy())
s.RootEntry, err = u.uploadFileWithCheckpointing(ctx, entry.Name(), entry, policyTree.EffectivePolicy(), sourceInfo)
default:
return nil, errors.Errorf("unsupported source: %v", s.Source)

View File

@@ -36,9 +36,6 @@ type UploadProgress interface {
// FinishedDirectory is emitted whenever a directory is finished uploading.
FinishedDirectory(dirname string)
// Checkpoint is emitted whenever snapshot is checkpointed.
Checkpoint()
}
// NullUploadProgress is an implementation of UploadProgress that does not produce any output.
@@ -75,9 +72,6 @@ func (p *NullUploadProgress) FinishedDirectory(dirname string) {}
// IgnoredError implements UploadProgress.
func (p *NullUploadProgress) IgnoredError(path string, err error) {}
// Checkpoint implements UploadProgress.
func (p *NullUploadProgress) Checkpoint() {}
var _ UploadProgress = (*NullUploadProgress)(nil)
// UploadCounters represents a snapshot of upload counters.

View File

@@ -13,6 +13,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/faketime"
"github.com/kopia/kopia/internal/mockfs"
"github.com/kopia/kopia/internal/testlogging"
@@ -289,6 +290,16 @@ func TestUploadWithCheckpointing(t *testing.T) {
u := NewUploader(th.repo)
fakeTicker := make(chan time.Time)
// inject fake ticker that we can control externally instead of through time passage.
u.getTicker = func(d time.Duration) <-chan time.Time {
return fakeTicker
}
// create a channel that will be sent to whenever checkpoint completes.
u.checkpointFinished = make(chan struct{})
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
si := snapshot.SourceInfo{
@@ -297,18 +308,22 @@ func TestUploadWithCheckpointing(t *testing.T) {
Path: "path",
}
count := 0
// inject a hook into mock filesystem to trigger and wait for checkpoints at few places.
// the places are not important, what's important that those are 3 separate points in time.
dirsToCheckpointAt := []*mockfs.Directory{
th.sourceDir.Subdir("d1"),
th.sourceDir.Subdir("d2"),
th.sourceDir.Subdir("d1").Subdir("d2"),
}
// when reading d1 advanced the time, to trigger checkpoint
th.sourceDir.Subdir("d1").OnReaddir(func() {
count++
// when reading this directory advance the time to cancel the snapshot
// and trigger a checkpoint
if count <= 2 {
th.ft.Advance(DefaultCheckpointInterval + 1)
}
})
for _, d := range dirsToCheckpointAt {
d.OnReaddir(func() {
// trigger checkpoint
fakeTicker <- clock.Now()
// wait for checkpoint
<-u.checkpointFinished
})
}
if _, err := u.Upload(ctx, th.sourceDir, policyTree, si); err != nil {
t.Errorf("Upload error: %v", err)
@@ -319,7 +334,7 @@ func TestUploadWithCheckpointing(t *testing.T) {
t.Fatalf("error listing snapshots: %v", err)
}
if got, want := len(snapshots), 2; got != want {
if got, want := len(snapshots), len(dirsToCheckpointAt); got != want {
t.Fatalf("unexpected number of snapshots: %v, want %v", got, want)
}

View File

@@ -6,17 +6,20 @@
// Stats keeps track of snapshot generation statistics.
type Stats struct {
TotalDirectoryCount int `json:"dirCount"`
TotalFileCount int `json:"fileCount"`
TotalFileSize int64 `json:"totalSize"`
ExcludedFileCount int `json:"excludedFileCount"`
// keep all int64 aligned because they will be atomically updated
TotalFileSize int64 `json:"totalSize"`
ExcludedTotalFileSize int64 `json:"excludedTotalSize"`
ExcludedDirCount int `json:"excludedDirCount"`
// keep all int32 aligned because they will be atomically updated
TotalFileCount int32 `json:"fileCount"`
CachedFiles int32 `json:"cachedFiles"`
NonCachedFiles int32 `json:"nonCachedFiles"`
TotalDirectoryCount int `json:"dirCount"`
ExcludedFileCount int `json:"excludedFileCount"`
ExcludedDirCount int `json:"excludedDirCount"`
ReadErrors int `json:"readErrors"`
}