diff --git a/cli/cli_progress.go b/cli/cli_progress.go index 5c8657baa..7dcaafbdb 100644 --- a/cli/cli_progress.go +++ b/cli/cli_progress.go @@ -89,24 +89,6 @@ func (p *cliProgress) CachedFile(fname string, numBytes int64) { p.maybeOutput() } -func (p *cliProgress) Checkpoint() { - p.output(noticeColor, "Saving a checkpoint...\n") - - if p.shared { - // do not reset counters - return - } - - *p = cliProgress{ - uploading: 1, - uploadStartTime: clock.Now(), - previousFileCount: p.previousFileCount, - previousTotalSize: p.previousTotalSize, - uploadedBytes: p.uploadedBytes, - uploadedFiles: p.uploadedFiles, - } -} - func (p *cliProgress) maybeOutput() { if atomic.LoadInt32(&p.uploading) == 0 { return diff --git a/fs/entry.go b/fs/entry.go index cbf21d748..b6bdcbbbf 100644 --- a/fs/entry.go +++ b/fs/entry.go @@ -89,6 +89,15 @@ type DirectorySummary struct { FailedEntries []*EntryWithError `json:"errors,omitempty"` } +// Clone clones given directory summary. +func (s *DirectorySummary) Clone() DirectorySummary { + res := *s + + res.FailedEntries = append([]*EntryWithError(nil), s.FailedEntries...) + + return res +} + // Symlink represents a symbolic link entry. type Symlink interface { Entry diff --git a/repo/object/object_manager_test.go b/repo/object/object_manager_test.go index 467841871..0baca9cd8 100644 --- a/repo/object/object_manager_test.go +++ b/repo/object/object_manager_test.go @@ -148,6 +148,78 @@ func TestWriterCompleteChunkInTwoWrites(t *testing.T) { } } +func TestCheckpointing(t *testing.T) { + ctx := testlogging.Context(t) + _, om := setupTest(t) + + writer := om.NewWriter(ctx, WriterOptions{}) + + // write all zeroes + allZeroes := make([]byte, 1<<20) + + // empty file, nothing flushed + checkpoint1, err := writer.Checkpoint() + verifyNoError(t, err) + + // write some bytes, but not enough to flush. + writer.Write(allZeroes[0:50]) + checkpoint2, err := writer.Checkpoint() + verifyNoError(t, err) + + // write enough to flush first content. + writer.Write(allZeroes) + checkpoint3, err := writer.Checkpoint() + verifyNoError(t, err) + + // write enough to flush second content. + writer.Write(allZeroes) + checkpoint4, err := writer.Checkpoint() + verifyNoError(t, err) + + result, err := writer.Result() + verifyNoError(t, err) + + if !objectIDsEqual(checkpoint1, "") { + t.Errorf("unexpected checkpoint1: %v err: %v", checkpoint1, err) + } + + if !objectIDsEqual(checkpoint2, "") { + t.Errorf("unexpected checkpoint2: %v err: %v", checkpoint2, err) + } + + verifyFull(ctx, t, om, checkpoint3, allZeroes) + verifyFull(ctx, t, om, checkpoint4, make([]byte, 2<<20)) + verifyFull(ctx, t, om, result, make([]byte, 2<<20+50)) +} + +func verifyFull(ctx context.Context, t *testing.T, om *Manager, oid ID, want []byte) { + t.Helper() + + r, err := om.Open(ctx, oid) + if err != nil { + t.Fatalf("unable to open %v: %v", oid, err) + } + + defer r.Close() + + data, err := ioutil.ReadAll(r) + if err != nil { + t.Fatalf("unable to read all: %v", err) + } + + if !bytes.Equal(data, want) { + t.Fatalf("unexpected data read for %v", oid) + } +} + +func verifyNoError(t *testing.T, err error) { + t.Helper() + + if err != nil { + t.Fatal(err) + } +} + func verifyIndirectBlock(ctx context.Context, t *testing.T, r *Manager, oid ID) { for indexContentID, isIndirect := oid.IndexObjectID(); isIndirect; indexContentID, isIndirect = indexContentID.IndexObjectID() { if c, _, ok := indexContentID.ContentID(); ok { diff --git a/repo/object/object_writer.go b/repo/object/object_writer.go index 9b04c7a4f..313bc6f25 100644 --- a/repo/object/object_writer.go +++ b/repo/object/object_writer.go @@ -25,6 +25,12 @@ type Writer interface { io.WriteCloser + // Checkpoint returns ID of an object consisting of all contents written to storage so far. + // This may not include some data buffered in the writer. + // In case nothing has been written yet, returns empty object ID. + Checkpoint() (ID, error) + + // Result returns object ID representing all bytes written to the writer. Result() (ID, error) } @@ -77,6 +83,8 @@ type objectWriter struct { splitter splitter.Splitter + writeMutex sync.Mutex + asyncWritesSemaphore chan struct{} // async writes semaphore or nil asyncWritesWG sync.WaitGroup @@ -103,6 +111,9 @@ func (w *objectWriter) Close() error { } func (w *objectWriter) Write(data []byte) (n int, err error) { + w.writeMutex.Lock() + defer w.writeMutex.Unlock() + dataLen := len(data) w.totalLength += int64(dataLen) @@ -229,6 +240,17 @@ func maybeCompressedContentBytes(comp compression.Compressor, output *bytes.Buff return input, false, nil } +func (w *objectWriter) drainWrites() { + w.writeMutex.Lock() + + // wait for any in-flight asynchronous writes to finish + w.asyncWritesWG.Wait() +} + +func (w *objectWriter) undrainWrites() { + w.writeMutex.Unlock() +} + func (w *objectWriter) Result() (ID, error) { // no need to hold a lock on w.indirectIndexGrowMutex, since growing index only happens synchronously // and never in parallel with calling Result() @@ -238,13 +260,23 @@ func (w *objectWriter) Result() (ID, error) { } } - // wait for any asynchronous writes to complete. - w.asyncWritesWG.Wait() + return w.Checkpoint() +} + +// Checkpoint returns object ID which represents portion of the object that has already been written. +// The result may be an empty object ID if nothing has been flushed yet. +func (w *objectWriter) Checkpoint() (ID, error) { + w.drainWrites() + defer w.undrainWrites() if w.contentWriteError != nil { return "", w.contentWriteError } + if len(w.indirectIndex) == 0 { + return "", nil + } + if len(w.indirectIndex) == 1 { return w.indirectIndex[0].Object, nil } diff --git a/snapshot/snapshotfs/checkpoint_registry.go b/snapshot/snapshotfs/checkpoint_registry.go new file mode 100644 index 000000000..2164545ac --- /dev/null +++ b/snapshot/snapshotfs/checkpoint_registry.go @@ -0,0 +1,62 @@ +package snapshotfs + +import ( + "sync" + + "github.com/google/uuid" + "github.com/pkg/errors" + + "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/snapshot" +) + +// checkpointFunc is invoked when checkpoint occurs. The callback must checkpoint current state of +// file or directory and return directory entry. +type checkpointFunc func() (*snapshot.DirEntry, error) + +type checkpointRegistry struct { + mu sync.Mutex + + checkpoints map[string]checkpointFunc +} + +func (r *checkpointRegistry) addCheckpointCallback(e fs.Entry, f checkpointFunc) { + r.mu.Lock() + defer r.mu.Unlock() + + if r.checkpoints == nil { + r.checkpoints = map[string]checkpointFunc{} + } + + r.checkpoints[e.Name()] = f +} + +func (r *checkpointRegistry) removeCheckpointCallback(e fs.Entry) { + r.mu.Lock() + defer r.mu.Unlock() + + delete(r.checkpoints, e.Name()) +} + +// runCheckpoints invokes all registered checkpointers and adds results to the provided builder, while +// randomizing file names for non-directory entries. this is to prevent the use of checkpointed objects +// as authoritative on subsequent runs. +func (r *checkpointRegistry) runCheckpoints(checkpointBuilder *dirManifestBuilder) error { + r.mu.Lock() + defer r.mu.Unlock() + + for n, cp := range r.checkpoints { + de, err := cp() + if err != nil { + return errors.Wrapf(err, "error checkpointing %v", n) + } + + if de.Type != snapshot.EntryTypeDirectory { + de.Name = ".checkpointed." + de.Name + "." + uuid.New().String() + } + + checkpointBuilder.addEntry(de) + } + + return nil +} diff --git a/snapshot/snapshotfs/checkpoint_registry_test.go b/snapshot/snapshotfs/checkpoint_registry_test.go new file mode 100644 index 000000000..f3e79af81 --- /dev/null +++ b/snapshot/snapshotfs/checkpoint_registry_test.go @@ -0,0 +1,82 @@ +package snapshotfs + +import ( + "os" + "strings" + "testing" + + "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/mockfs" + "github.com/kopia/kopia/snapshot" +) + +func TestCheckpointRegistry(t *testing.T) { + var cp checkpointRegistry + + d := mockfs.NewDirectory() + dir1 := d.AddDir("dir1", os.FileMode(0o755)) + f1 := d.AddFile("f1", []byte{1, 2, 3}, os.FileMode(0o755)) + f2 := d.AddFile("f2", []byte{2, 3, 4}, os.FileMode(0o755)) + f3 := d.AddFile("f3", []byte{2, 3, 4}, os.FileMode(0o755)) + + cp.addCheckpointCallback(dir1, func() (*snapshot.DirEntry, error) { + return &snapshot.DirEntry{ + Name: "dir1", + Type: snapshot.EntryTypeDirectory, + }, nil + }) + + cp.addCheckpointCallback(f1, func() (*snapshot.DirEntry, error) { + return &snapshot.DirEntry{ + Name: "f1", + }, nil + }) + + cp.addCheckpointCallback(f2, func() (*snapshot.DirEntry, error) { + return &snapshot.DirEntry{ + Name: "f2", + }, nil + }) + + cp.addCheckpointCallback(f3, func() (*snapshot.DirEntry, error) { + return &snapshot.DirEntry{ + Name: "other", + }, nil + }) + + // remove callback before it has a chance of firing + cp.removeCheckpointCallback(f3) + cp.removeCheckpointCallback(f3) + + var dmb dirManifestBuilder + + dmb.addEntry(&snapshot.DirEntry{ + Name: "pre-existing", + }) + + if err := cp.runCheckpoints(&dmb); err != nil { + t.Fatalf("error running checkpoints: %v", err) + } + + dm := dmb.Build(clock.Now(), "checkpoint") + if got, want := len(dm.Entries), 4; got != want { + t.Fatalf("got %v entries, wanted %v (%+#v)", got, want, dm.Entries) + } + + // directory names don't get mangled + if dm.Entries[0].Name != "dir1" { + t.Errorf("invalid entry %v", dm.Entries[0]) + } + + if !strings.HasPrefix(dm.Entries[1].Name, ".checkpointed.f1.") { + t.Errorf("invalid entry %v", dm.Entries[1]) + } + + if !strings.HasPrefix(dm.Entries[2].Name, ".checkpointed.f2.") { + t.Errorf("invalid entry %v", dm.Entries[2]) + } + + if dm.Entries[3].Name != "pre-existing" { + t.Errorf("invalid entry %v", dm.Entries[3]) + } +} diff --git a/snapshot/snapshotfs/upload.go b/snapshot/snapshotfs/upload.go index 6bee4b38d..d1f8b5cd1 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/snapshotfs/upload.go @@ -69,11 +69,15 @@ type Uploader struct { repo repo.Repository - stats snapshot.Stats - canceled int32 - nextCheckpointTime time.Time + stats snapshot.Stats + canceled int32 uploadBufPool sync.Pool + + getTicker func(time.Duration) <-chan time.Time + + // for testing only, when set will write to a given channel whenever checkpoint completes + checkpointFinished chan struct{} } // IsCanceled returns true if the upload is canceled. @@ -87,10 +91,6 @@ func (u *Uploader) incompleteReason() string { return IncompleteReasonCanceled } - if !u.nextCheckpointTime.IsZero() && u.repo.Time().After(u.nextCheckpointTime) { - return IncompleteReasonCheckpoint - } - wb := atomic.LoadInt64(&u.totalWrittenBytes) if mub := u.MaxUploadBytes; mub > 0 && wb > mub { return IncompleteReasonLimitReached @@ -99,7 +99,7 @@ func (u *Uploader) incompleteReason() string { return "" } -func (u *Uploader) uploadFileInternal(ctx context.Context, relativePath string, f fs.File, pol *policy.Policy, asyncWrites int) (*snapshot.DirEntry, error) { +func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, relativePath string, f fs.File, pol *policy.Policy, asyncWrites int) (*snapshot.DirEntry, error) { u.Progress.HashingFile(relativePath) defer u.Progress.FinishedHashingFile(relativePath, f.Size()) @@ -116,6 +116,18 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, relativePath string, }) defer writer.Close() //nolint:errcheck + parentCheckpointRegistry.addCheckpointCallback(f, func() (*snapshot.DirEntry, error) { + // nolint:govet + checkpointID, err := writer.Checkpoint() + if err != nil { + return nil, err + } + + return newDirEntry(f, checkpointID) + }) + + defer parentCheckpointRegistry.removeCheckpointCallback(f) + written, err := u.copyWithProgress(writer, file, 0, f.Size()) if err != nil { return nil, err @@ -138,6 +150,9 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, relativePath string, de.FileSize = written + atomic.AddInt32(&u.stats.TotalFileCount, 1) + atomic.AddInt64(&u.stats.TotalFileSize, de.FileSize) + return de, nil } @@ -225,6 +240,17 @@ func (u *Uploader) copyWithProgress(dst io.Writer, src io.Reader, completed, len return written, nil } +func newDirEntryWithSummary(d fs.Entry, oid object.ID, summ *fs.DirectorySummary) (*snapshot.DirEntry, error) { + de, err := newDirEntry(d, oid) + if err != nil { + return nil, err + } + + de.DirSummary = summ + + return de, nil +} + func newDirEntry(md fs.Entry, oid object.ID) (*snapshot.DirEntry, error) { var entryType snapshot.EntryType @@ -251,82 +277,113 @@ func newDirEntry(md fs.Entry, oid object.ID) (*snapshot.DirEntry, error) { }, nil } -// uploadFile uploads the specified File to the repository. -func (u *Uploader) uploadFile(ctx context.Context, relativePath string, file fs.File, pol *policy.Policy) (*snapshot.DirEntry, error) { +// uploadFileWithCheckpointing uploads the specified File to the repository. +func (u *Uploader) uploadFileWithCheckpointing(ctx context.Context, relativePath string, file fs.File, pol *policy.Policy, sourceInfo snapshot.SourceInfo) (*snapshot.DirEntry, error) { par := u.effectiveParallelUploads() if par == 1 { par = 0 } - res, err := u.uploadFileInternal(ctx, relativePath, file, pol, par) + var cp checkpointRegistry + + cancelCheckpointer := u.periodicallyCheckpoint(ctx, &cp, &snapshot.Manifest{Source: sourceInfo}) + defer cancelCheckpointer() + + res, err := u.uploadFileInternal(ctx, &cp, relativePath, file, pol, par) if err != nil { return nil, err } - de, err := newDirEntry(file, res.ObjectID) - if err != nil { - return nil, errors.Wrap(err, "unable to create dir entry") - } - - de.DirSummary = &fs.DirectorySummary{ + return newDirEntryWithSummary(file, res.ObjectID, &fs.DirectorySummary{ TotalFileCount: 1, TotalFileSize: res.FileSize, MaxModTime: res.ModTime, + }) +} + +// checkpointRoot invokes checkpoints on the provided registry and if a checkpoint entry was generated, +// saves it in an incomplete snapshot manifest. +func (u *Uploader) checkpointRoot(ctx context.Context, cp *checkpointRegistry, prototypeManifest *snapshot.Manifest) error { + var dmbCheckpoint dirManifestBuilder + if err := cp.runCheckpoints(&dmbCheckpoint); err != nil { + return errors.Wrap(err, "running checkpointers") } - return de, nil + checkpointManifest := dmbCheckpoint.Build(u.repo.Time(), "dummy") + if len(checkpointManifest.Entries) == 0 { + // did not produce a checkpoint, that's ok + return nil + } + + if len(checkpointManifest.Entries) > 1 { + return errors.Errorf("produced more than one checkpoint: %v", len(checkpointManifest.Entries)) + } + + rootEntry := checkpointManifest.Entries[0] + + log(ctx).Debugf("checkpointed root %v", rootEntry.ObjectID) + + man := *prototypeManifest + man.RootEntry = rootEntry + man.EndTime = u.repo.Time() + man.StartTime = man.EndTime + man.IncompleteReason = IncompleteReasonCheckpoint + + if _, err := snapshot.SaveSnapshot(ctx, u.repo, &man); err != nil { + return errors.Wrap(err, "error saving checkpoint snapshot") + } + + if err := u.repo.Flush(ctx); err != nil { + return errors.Wrap(err, "error flushing after checkpoint") + } + + return nil +} + +// periodicallyCheckpoint periodically (every CheckpointInterval) invokes checkpointRoot until the +// returned cancelation function has been called. +func (u *Uploader) periodicallyCheckpoint(ctx context.Context, cp *checkpointRegistry, prototypeManifest *snapshot.Manifest) (cancelFunc func()) { + shutdown := make(chan struct{}) + ch := u.getTicker(u.CheckpointInterval) + + go func() { + for { + select { + case <-shutdown: + return + + case <-ch: + if err := u.checkpointRoot(ctx, cp, prototypeManifest); err != nil { + log(ctx).Warningf("error checkpointing: %v", err) + u.Cancel() + + return + } + + // test hook + if u.checkpointFinished != nil { + u.checkpointFinished <- struct{}{} + } + } + } + }() + + return func() { + close(shutdown) + } } // uploadDirWithCheckpointing uploads the specified Directory to the repository. func (u *Uploader) uploadDirWithCheckpointing(ctx context.Context, rootDir fs.Directory, policyTree *policy.Tree, previousDirs []fs.Directory, sourceInfo snapshot.SourceInfo) (*snapshot.DirEntry, error) { - for { - if u.CheckpointInterval != 0 { - u.nextCheckpointTime = u.repo.Time().Add(u.CheckpointInterval) - } else { - u.nextCheckpointTime = time.Time{} - } + var ( + dmb dirManifestBuilder + cp checkpointRegistry + ) - startTime := u.repo.Time() + cancelCheckpointer := u.periodicallyCheckpoint(ctx, &cp, &snapshot.Manifest{Source: sourceInfo}) + defer cancelCheckpointer() - oid, summ, err := uploadDirInternal(ctx, u, rootDir, policyTree, previousDirs, ".") - if err != nil && !errors.Is(err, errCanceled) { - return nil, err - } - - de, err := newDirEntry(rootDir, oid) - if err != nil { - return nil, errors.Wrap(err, "unable to create dir entry") - } - - de.DirSummary = &summ - - if summ.IncompleteReason == IncompleteReasonCheckpoint { - u.Progress.Checkpoint() - - // when retrying use the partial snapshot - previousDirs = append(previousDirs, DirectoryEntry(u.repo, oid, &summ)) - - man := &snapshot.Manifest{ - StartTime: startTime, - EndTime: u.repo.Time(), - RootEntry: de, - Source: sourceInfo, - IncompleteReason: summ.IncompleteReason, - } - - if _, err = snapshot.SaveSnapshot(ctx, u.repo, man); err != nil { - return nil, errors.Wrap(err, "error saving checkpoint") - } - - if err = u.repo.Flush(ctx); err != nil { - return nil, errors.Wrap(err, "error flushing saving checkpoint") - } - - continue - } - - return de, err - } + return uploadDirInternal(ctx, u, rootDir, policyTree, previousDirs, ".", &dmb, &cp) } func (u *Uploader) foreachEntryUnlessCanceled(ctx context.Context, parallel int, relativePath string, entries fs.Entries, cb func(ctx context.Context, entry fs.Entry, entryRelativePath string) error) error { @@ -377,11 +434,6 @@ func (u *Uploader) foreachEntryUnlessCanceled(ctx context.Context, parallel int, return eg.Wait() } -type dirEntryOrError struct { - de *snapshot.DirEntry - failedEntry *fs.EntryWithError -} - func rootCauseError(err error) error { err = errors.Cause(err) if oserr, ok := err.(*os.PathError); ok { @@ -391,106 +443,124 @@ func rootCauseError(err error) error { return err } -func (u *Uploader) populateChildEntries(parent *snapshot.DirManifest, children <-chan dirEntryOrError) { - parentSummary := parent.Summary +type dirManifestBuilder struct { + mu sync.Mutex - for it := range children { - if it.failedEntry != nil { - parentSummary.NumFailed++ - parentSummary.FailedEntries = append(parentSummary.FailedEntries, it.failedEntry) + summary fs.DirectorySummary + entries []*snapshot.DirEntry +} - continue +// Clone clones the current state of dirManifestBuilder. +func (b *dirManifestBuilder) Clone() *dirManifestBuilder { + b.mu.Lock() + defer b.mu.Unlock() + + return &dirManifestBuilder{ + summary: b.summary.Clone(), + entries: append([]*snapshot.DirEntry(nil), b.entries...), + } +} + +func (b *dirManifestBuilder) addEntry(de *snapshot.DirEntry) { + b.mu.Lock() + defer b.mu.Unlock() + + b.entries = append(b.entries, de) + + // nolint:exhaustive + switch de.Type { + case snapshot.EntryTypeFile: + b.summary.TotalFileCount++ + b.summary.TotalFileSize += de.FileSize + + if de.ModTime.After(b.summary.MaxModTime) { + b.summary.MaxModTime = de.ModTime } - de := it.de + case snapshot.EntryTypeDirectory: + if childSummary := de.DirSummary; childSummary != nil { + b.summary.TotalFileCount += childSummary.TotalFileCount + b.summary.TotalFileSize += childSummary.TotalFileSize + b.summary.TotalDirCount += childSummary.TotalDirCount + b.summary.NumFailed += childSummary.NumFailed + b.summary.FailedEntries = append(b.summary.FailedEntries, childSummary.FailedEntries...) - // nolint:exhaustive - switch de.Type { - case snapshot.EntryTypeFile: - u.stats.TotalFileCount++ - u.stats.TotalFileSize += de.FileSize - parentSummary.TotalFileCount++ - parentSummary.TotalFileSize += de.FileSize - - if de.ModTime.After(parentSummary.MaxModTime) { - parentSummary.MaxModTime = de.ModTime - } - - case snapshot.EntryTypeDirectory: - if childSummary := de.DirSummary; childSummary != nil { - parentSummary.TotalFileCount += childSummary.TotalFileCount - parentSummary.TotalFileSize += childSummary.TotalFileSize - parentSummary.TotalDirCount += childSummary.TotalDirCount - parentSummary.NumFailed += childSummary.NumFailed - parentSummary.FailedEntries = append(parentSummary.FailedEntries, childSummary.FailedEntries...) - - if childSummary.MaxModTime.After(parentSummary.MaxModTime) { - parentSummary.MaxModTime = childSummary.MaxModTime - } + if childSummary.MaxModTime.After(b.summary.MaxModTime) { + b.summary.MaxModTime = childSummary.MaxModTime } } + } +} - parent.Entries = append(parent.Entries, de) +func (b *dirManifestBuilder) addFailedEntry(relPath string, err error) { + b.mu.Lock() + defer b.mu.Unlock() + + b.summary.NumFailed++ + b.summary.FailedEntries = append(b.summary.FailedEntries, &fs.EntryWithError{ + EntryPath: relPath, + Error: err.Error(), + }) +} + +func (b *dirManifestBuilder) Build(dirModTime time.Time, incompleteReason string) *snapshot.DirManifest { + b.mu.Lock() + defer b.mu.Unlock() + + s := b.summary + s.TotalDirCount++ + + if len(b.entries) == 0 { + s.MaxModTime = dirModTime } + s.IncompleteReason = incompleteReason + // take top N sorted failed entries - if len(parent.Summary.FailedEntries) > 0 { - sort.Slice(parent.Summary.FailedEntries, func(i, j int) bool { - return parent.Summary.FailedEntries[i].EntryPath < parent.Summary.FailedEntries[j].EntryPath + if len(b.summary.FailedEntries) > 0 { + sort.Slice(b.summary.FailedEntries, func(i, j int) bool { + return b.summary.FailedEntries[i].EntryPath < b.summary.FailedEntries[j].EntryPath }) - if len(parent.Summary.FailedEntries) > fs.MaxFailedEntriesPerDirectorySummary { - parent.Summary.FailedEntries = parent.Summary.FailedEntries[0:fs.MaxFailedEntriesPerDirectorySummary] + if len(b.summary.FailedEntries) > fs.MaxFailedEntriesPerDirectorySummary { + b.summary.FailedEntries = b.summary.FailedEntries[0:fs.MaxFailedEntriesPerDirectorySummary] } } // sort the result, directories first, then non-directories, ordered by name - sort.Slice(parent.Entries, func(i, j int) bool { - if leftDir, rightDir := isDir(parent.Entries[i]), isDir(parent.Entries[j]); leftDir != rightDir { + sort.Slice(b.entries, func(i, j int) bool { + if leftDir, rightDir := isDir(b.entries[i]), isDir(b.entries[j]); leftDir != rightDir { // directories get sorted before non-directories return leftDir } - return parent.Entries[i].Name < parent.Entries[j].Name + return b.entries[i].Name < b.entries[j].Name }) + + return &snapshot.DirManifest{ + StreamType: directoryStreamType, + Summary: &s, + Entries: b.entries, + } } func isDir(e *snapshot.DirEntry) bool { return e.Type == snapshot.EntryTypeDirectory } -func (u *Uploader) processChildren(ctx context.Context, dirManifest *snapshot.DirManifest, relativePath string, entries fs.Entries, policyTree *policy.Tree, previousEntries []fs.Entries) error { - var wg sync.WaitGroup - - // channel where we will add directory and file entries, possibly in parallel - output := make(chan dirEntryOrError) - - // goroutine that will drain data from 'output' and update dirManifest - wg.Add(1) - - go func() { - defer wg.Done() - u.populateChildEntries(dirManifest, output) - }() - - defer func() { - // before this function returns, close the output channel and wait for the goroutine above to complete. - close(output) - wg.Wait() - }() - - if err := u.processSubdirectories(ctx, output, relativePath, entries, policyTree, previousEntries); err != nil { +func (u *Uploader) processChildren(ctx context.Context, parentDirCheckpointRegistry *checkpointRegistry, parentDirBuilder *dirManifestBuilder, relativePath string, entries fs.Entries, policyTree *policy.Tree, previousEntries []fs.Entries) error { + if err := u.processSubdirectories(ctx, parentDirCheckpointRegistry, parentDirBuilder, relativePath, entries, policyTree, previousEntries); err != nil { return err } - if err := u.processNonDirectories(ctx, output, relativePath, entries, policyTree, previousEntries); err != nil { + if err := u.processNonDirectories(ctx, parentDirCheckpointRegistry, parentDirBuilder, relativePath, entries, policyTree, previousEntries); err != nil { return err } return nil } -func (u *Uploader) processSubdirectories(ctx context.Context, output chan dirEntryOrError, relativePath string, entries fs.Entries, policyTree *policy.Tree, previousEntries []fs.Entries) error { +func (u *Uploader) processSubdirectories(ctx context.Context, parentDirCheckpointRegistry *checkpointRegistry, parentDirBuilder *dirManifestBuilder, relativePath string, entries fs.Entries, policyTree *policy.Tree, previousEntries []fs.Entries) error { // for now don't process subdirectories in parallel, we need a mechanism to // prevent explosion of parallelism const parallelism = 1 @@ -511,7 +581,9 @@ func (u *Uploader) processSubdirectories(ctx context.Context, output chan dirEnt previousDirs = uniqueDirectories(previousDirs) - oid, subdirsumm, err := uploadDirInternal(ctx, u, dir, policyTree.Child(entry.Name()), previousDirs, entryRelativePath) + childDirBuilder := &dirManifestBuilder{} + + de, err := uploadDirInternal(ctx, u, dir, policyTree.Child(entry.Name()), previousDirs, entryRelativePath, childDirBuilder, parentDirCheckpointRegistry) if errors.Is(err, errCanceled) { return err } @@ -525,24 +597,15 @@ func (u *Uploader) processSubdirectories(ctx context.Context, output chan dirEnt rc := rootCauseError(dre.error) u.Progress.IgnoredError(entryRelativePath, rc) - output <- dirEntryOrError{ - failedEntry: &fs.EntryWithError{ - EntryPath: entryRelativePath, - Error: rc.Error(), - }, - } + parentDirBuilder.addFailedEntry(entryRelativePath, rc) return nil } + return errors.Errorf("unable to process directory %q: %s", entry.Name(), err) } - de, err := newDirEntry(dir, oid) - if err != nil { - return errors.Wrap(err, "unable to create dir entry") - } + parentDirBuilder.addEntry(de) - de.DirSummary = &subdirsumm - output <- dirEntryOrError{de: de} return nil }) } @@ -605,7 +668,7 @@ func (u *Uploader) effectiveParallelUploads() int { return p } -func (u *Uploader) processNonDirectories(ctx context.Context, output chan dirEntryOrError, dirRelativePath string, entries fs.Entries, policyTree *policy.Tree, prevEntries []fs.Entries) error { +func (u *Uploader) processNonDirectories(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, parentDirBuilder *dirManifestBuilder, dirRelativePath string, entries fs.Entries, policyTree *policy.Tree, prevEntries []fs.Entries) error { workerCount := u.effectiveParallelUploads() var asyncWritesPerFile int @@ -639,7 +702,7 @@ func (u *Uploader) processNonDirectories(ctx context.Context, output chan dirEnt return errors.Wrap(err, "unable to create dir entry") } - output <- dirEntryOrError{de: cachedDirEntry} + parentDirBuilder.addEntry(cachedDirEntry) return nil } @@ -647,20 +710,20 @@ func (u *Uploader) processNonDirectories(ctx context.Context, output chan dirEnt case fs.Symlink: de, err := u.uploadSymlinkInternal(ctx, entryRelativePath, entry) if err != nil { - return u.maybeIgnoreFileReadError(err, output, entryRelativePath, policyTree) + return u.maybeIgnoreFileReadError(err, parentDirBuilder, entryRelativePath, policyTree) } - output <- dirEntryOrError{de: de} + parentDirBuilder.addEntry(de) return nil case fs.File: atomic.AddInt32(&u.stats.NonCachedFiles, 1) - de, err := u.uploadFileInternal(ctx, entryRelativePath, entry, policyTree.Child(entry.Name()).EffectivePolicy(), asyncWritesPerFile) + de, err := u.uploadFileInternal(ctx, parentCheckpointRegistry, entryRelativePath, entry, policyTree.Child(entry.Name()).EffectivePolicy(), asyncWritesPerFile) if err != nil { - return u.maybeIgnoreFileReadError(err, output, entryRelativePath, policyTree) + return u.maybeIgnoreFileReadError(err, parentDirBuilder, entryRelativePath, policyTree) } - output <- dirEntryOrError{de: de} + parentDirBuilder.addEntry(de) return nil default: @@ -717,25 +780,20 @@ func uploadDirInternal( policyTree *policy.Tree, previousDirs []fs.Directory, dirRelativePath string, -) (object.ID, fs.DirectorySummary, error) { + thisDirBuilder *dirManifestBuilder, + thisCheckpointRegistry *checkpointRegistry, +) (*snapshot.DirEntry, error) { u.stats.TotalDirectoryCount++ u.Progress.StartedDirectory(dirRelativePath) defer u.Progress.FinishedDirectory(dirRelativePath) - dirManifest := &snapshot.DirManifest{ - StreamType: directoryStreamType, - Summary: &fs.DirectorySummary{ - TotalDirCount: 1, - }, - } - t0 := u.repo.Time() entries, direrr := directory.Readdir(ctx) log(ctx).Debugf("finished reading directory %v in %v", dirRelativePath, u.repo.Time().Sub(t0)) if direrr != nil { - return "", fs.DirectorySummary{}, dirReadError{direrr} + return nil, dirReadError{direrr} } var prevEntries []fs.Entries @@ -746,16 +804,43 @@ func uploadDirInternal( } } - if err := u.processChildren(ctx, dirManifest, dirRelativePath, entries, policyTree, prevEntries); err != nil && !errors.Is(err, errCanceled) { - return "", fs.DirectorySummary{}, err + childCheckpointRegistry := &checkpointRegistry{} + + thisCheckpointRegistry.addCheckpointCallback(directory, func() (*snapshot.DirEntry, error) { + // when snapshotting the parent, snapshot all our children and tell them to populate + // childCheckpointBuilder + thisCheckpointBuilder := thisDirBuilder.Clone() + + // invoke all child checkpoints which will populate thisCheckpointBuilder. + if err := childCheckpointRegistry.runCheckpoints(thisCheckpointBuilder); err != nil { + return nil, errors.Wrapf(err, "error checkpointing children") + } + + checkpointManifest := thisCheckpointBuilder.Build(directory.ModTime(), IncompleteReasonCheckpoint) + oid, err := u.writeDirManifest(ctx, dirRelativePath, checkpointManifest) + if err != nil { + return nil, errors.Wrap(err, "error writing dir manifest") + } + + return newDirEntryWithSummary(directory, oid, checkpointManifest.Summary) + }) + defer thisCheckpointRegistry.removeCheckpointCallback(directory) + + if err := u.processChildren(ctx, childCheckpointRegistry, thisDirBuilder, dirRelativePath, entries, policyTree, prevEntries); err != nil && !errors.Is(err, errCanceled) { + return nil, err } - if len(dirManifest.Entries) == 0 { - dirManifest.Summary.MaxModTime = directory.ModTime() + dirManifest := thisDirBuilder.Build(directory.ModTime(), u.incompleteReason()) + + oid, err := u.writeDirManifest(ctx, dirRelativePath, dirManifest) + if err != nil { + return nil, errors.Wrapf(err, "error writing dir manifest: %v", directory.Name()) } - // at this point dirManifest is ready to go + return newDirEntryWithSummary(directory, oid, dirManifest.Summary) +} +func (u *Uploader) writeDirManifest(ctx context.Context, dirRelativePath string, dirManifest *snapshot.DirManifest) (object.ID, error) { writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{ Description: "DIR:" + dirRelativePath, Prefix: "k", @@ -764,26 +849,24 @@ func uploadDirInternal( defer writer.Close() //nolint:errcheck if err := json.NewEncoder(writer).Encode(dirManifest); err != nil { - return "", fs.DirectorySummary{}, errors.Wrap(err, "unable to encode directory JSON") + return "", errors.Wrap(err, "unable to encode directory JSON") } oid, err := writer.Result() + if err != nil { + return "", errors.Wrap(err, "unable to write directory") + } - dirManifest.Summary.IncompleteReason = u.incompleteReason() - - return oid, *dirManifest.Summary, err + return oid, nil } -func (u *Uploader) maybeIgnoreFileReadError(err error, output chan dirEntryOrError, entryRelativePath string, policyTree *policy.Tree) error { +func (u *Uploader) maybeIgnoreFileReadError(err error, dmb *dirManifestBuilder, entryRelativePath string, policyTree *policy.Tree) error { errHandlingPolicy := policyTree.EffectivePolicy().ErrorHandlingPolicy if u.IgnoreReadErrors || errHandlingPolicy.IgnoreFileErrorsOrDefault(false) { err = rootCauseError(err) u.Progress.IgnoredError(entryRelativePath, err) - output <- dirEntryOrError{failedEntry: &fs.EntryWithError{ - EntryPath: entryRelativePath, - Error: err.Error(), - }} + dmb.addFailedEntry(entryRelativePath, err) return nil } @@ -809,6 +892,7 @@ func NewUploader(r repo.Repository) *Uploader { IgnoreReadErrors: false, ParallelUploads: 1, CheckpointInterval: DefaultCheckpointInterval, + getTicker: time.Tick, uploadBufPool: sync.Pool{ New: func() interface{} { p := make([]byte, copyBufferSize) @@ -867,7 +951,7 @@ func (u *Uploader) Upload( maxPreviousTotalFileSize = s } - if s := m.Stats.TotalFileCount; s > maxPreviousFileCount { + if s := int(m.Stats.TotalFileCount); s > maxPreviousFileCount { maxPreviousFileCount = s } } @@ -898,7 +982,7 @@ func (u *Uploader) Upload( s.RootEntry, err = u.uploadDirWithCheckpointing(ctx, entry, policyTree, previousDirs, sourceInfo) case fs.File: - s.RootEntry, err = u.uploadFile(ctx, entry.Name(), entry, policyTree.EffectivePolicy()) + s.RootEntry, err = u.uploadFileWithCheckpointing(ctx, entry.Name(), entry, policyTree.EffectivePolicy(), sourceInfo) default: return nil, errors.Errorf("unsupported source: %v", s.Source) diff --git a/snapshot/snapshotfs/upload_progress.go b/snapshot/snapshotfs/upload_progress.go index 57213cc1c..6cc8cb1dc 100644 --- a/snapshot/snapshotfs/upload_progress.go +++ b/snapshot/snapshotfs/upload_progress.go @@ -36,9 +36,6 @@ type UploadProgress interface { // FinishedDirectory is emitted whenever a directory is finished uploading. FinishedDirectory(dirname string) - - // Checkpoint is emitted whenever snapshot is checkpointed. - Checkpoint() } // NullUploadProgress is an implementation of UploadProgress that does not produce any output. @@ -75,9 +72,6 @@ func (p *NullUploadProgress) FinishedDirectory(dirname string) {} // IgnoredError implements UploadProgress. func (p *NullUploadProgress) IgnoredError(path string, err error) {} -// Checkpoint implements UploadProgress. -func (p *NullUploadProgress) Checkpoint() {} - var _ UploadProgress = (*NullUploadProgress)(nil) // UploadCounters represents a snapshot of upload counters. diff --git a/snapshot/snapshotfs/upload_test.go b/snapshot/snapshotfs/upload_test.go index 96b0d4b83..c25787a14 100644 --- a/snapshot/snapshotfs/upload_test.go +++ b/snapshot/snapshotfs/upload_test.go @@ -13,6 +13,7 @@ "github.com/pkg/errors" "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/faketime" "github.com/kopia/kopia/internal/mockfs" "github.com/kopia/kopia/internal/testlogging" @@ -289,6 +290,16 @@ func TestUploadWithCheckpointing(t *testing.T) { u := NewUploader(th.repo) + fakeTicker := make(chan time.Time) + + // inject fake ticker that we can control externally instead of through time passage. + u.getTicker = func(d time.Duration) <-chan time.Time { + return fakeTicker + } + + // create a channel that will be sent to whenever checkpoint completes. + u.checkpointFinished = make(chan struct{}) + policyTree := policy.BuildTree(nil, policy.DefaultPolicy) si := snapshot.SourceInfo{ @@ -297,18 +308,22 @@ func TestUploadWithCheckpointing(t *testing.T) { Path: "path", } - count := 0 + // inject a hook into mock filesystem to trigger and wait for checkpoints at few places. + // the places are not important, what's important that those are 3 separate points in time. + dirsToCheckpointAt := []*mockfs.Directory{ + th.sourceDir.Subdir("d1"), + th.sourceDir.Subdir("d2"), + th.sourceDir.Subdir("d1").Subdir("d2"), + } - // when reading d1 advanced the time, to trigger checkpoint - th.sourceDir.Subdir("d1").OnReaddir(func() { - count++ - - // when reading this directory advance the time to cancel the snapshot - // and trigger a checkpoint - if count <= 2 { - th.ft.Advance(DefaultCheckpointInterval + 1) - } - }) + for _, d := range dirsToCheckpointAt { + d.OnReaddir(func() { + // trigger checkpoint + fakeTicker <- clock.Now() + // wait for checkpoint + <-u.checkpointFinished + }) + } if _, err := u.Upload(ctx, th.sourceDir, policyTree, si); err != nil { t.Errorf("Upload error: %v", err) @@ -319,7 +334,7 @@ func TestUploadWithCheckpointing(t *testing.T) { t.Fatalf("error listing snapshots: %v", err) } - if got, want := len(snapshots), 2; got != want { + if got, want := len(snapshots), len(dirsToCheckpointAt); got != want { t.Fatalf("unexpected number of snapshots: %v, want %v", got, want) } diff --git a/snapshot/stats.go b/snapshot/stats.go index aa57143bf..470d18031 100644 --- a/snapshot/stats.go +++ b/snapshot/stats.go @@ -6,17 +6,20 @@ // Stats keeps track of snapshot generation statistics. type Stats struct { - TotalDirectoryCount int `json:"dirCount"` - TotalFileCount int `json:"fileCount"` - TotalFileSize int64 `json:"totalSize"` - - ExcludedFileCount int `json:"excludedFileCount"` + // keep all int64 aligned because they will be atomically updated + TotalFileSize int64 `json:"totalSize"` ExcludedTotalFileSize int64 `json:"excludedTotalSize"` - ExcludedDirCount int `json:"excludedDirCount"` + // keep all int32 aligned because they will be atomically updated + TotalFileCount int32 `json:"fileCount"` CachedFiles int32 `json:"cachedFiles"` NonCachedFiles int32 `json:"nonCachedFiles"` + TotalDirectoryCount int `json:"dirCount"` + + ExcludedFileCount int `json:"excludedFileCount"` + ExcludedDirCount int `json:"excludedDirCount"` + ReadErrors int `json:"readErrors"` }