From 24a2ff6921497360f449f4d8d6a1626b9fb29aad Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 18 Jun 2022 21:04:17 -0700 Subject: [PATCH] refactor(snapshots): refactored upload of files (#2063) This refactors a helper that will later be used for parallel uploading of a large files. --- snapshot/snapshotfs/checkpoint_registry.go | 9 ++- .../snapshotfs/checkpoint_registry_test.go | 14 ++--- snapshot/snapshotfs/upload.go | 60 +++++++++++-------- 3 files changed, 47 insertions(+), 36 deletions(-) diff --git a/snapshot/snapshotfs/checkpoint_registry.go b/snapshot/snapshotfs/checkpoint_registry.go index 6034abae3..653842b5e 100644 --- a/snapshot/snapshotfs/checkpoint_registry.go +++ b/snapshot/snapshotfs/checkpoint_registry.go @@ -6,7 +6,6 @@ "github.com/google/uuid" "github.com/pkg/errors" - "github.com/kopia/kopia/fs" "github.com/kopia/kopia/snapshot" ) @@ -21,7 +20,7 @@ type checkpointRegistry struct { checkpoints map[string]checkpointFunc } -func (r *checkpointRegistry) addCheckpointCallback(e fs.Entry, f checkpointFunc) { +func (r *checkpointRegistry) addCheckpointCallback(entryName string, f checkpointFunc) { r.mu.Lock() defer r.mu.Unlock() @@ -29,14 +28,14 @@ func (r *checkpointRegistry) addCheckpointCallback(e fs.Entry, f checkpointFunc) r.checkpoints = map[string]checkpointFunc{} } - r.checkpoints[e.Name()] = f + r.checkpoints[entryName] = f } -func (r *checkpointRegistry) removeCheckpointCallback(e fs.Entry) { +func (r *checkpointRegistry) removeCheckpointCallback(entryName string) { r.mu.Lock() defer r.mu.Unlock() - delete(r.checkpoints, e.Name()) + delete(r.checkpoints, entryName) } // runCheckpoints invokes all registered checkpointers and adds results to the provided builder, while diff --git a/snapshot/snapshotfs/checkpoint_registry_test.go b/snapshot/snapshotfs/checkpoint_registry_test.go index 2e875de06..6ba38c947 100644 --- a/snapshot/snapshotfs/checkpoint_registry_test.go +++ b/snapshot/snapshotfs/checkpoint_registry_test.go @@ -20,38 +20,38 @@ func TestCheckpointRegistry(t *testing.T) { f3 := d.AddFile("f3", []byte{2, 3, 4}, os.FileMode(0o755)) f4 := d.AddFile("f3", []byte{2, 3, 4}, os.FileMode(0o755)) - cp.addCheckpointCallback(dir1, func() (*snapshot.DirEntry, error) { + cp.addCheckpointCallback(dir1.Name(), func() (*snapshot.DirEntry, error) { return &snapshot.DirEntry{ Name: "dir1", Type: snapshot.EntryTypeDirectory, }, nil }) - cp.addCheckpointCallback(f1, func() (*snapshot.DirEntry, error) { + cp.addCheckpointCallback(f1.Name(), func() (*snapshot.DirEntry, error) { return &snapshot.DirEntry{ Name: "f1", }, nil }) - cp.addCheckpointCallback(f2, func() (*snapshot.DirEntry, error) { + cp.addCheckpointCallback(f2.Name(), func() (*snapshot.DirEntry, error) { return &snapshot.DirEntry{ Name: "f2", }, nil }) - cp.addCheckpointCallback(f3, func() (*snapshot.DirEntry, error) { + cp.addCheckpointCallback(f3.Name(), func() (*snapshot.DirEntry, error) { return &snapshot.DirEntry{ Name: "other", }, nil }) - cp.addCheckpointCallback(f4, func() (*snapshot.DirEntry, error) { + cp.addCheckpointCallback(f4.Name(), func() (*snapshot.DirEntry, error) { return nil, nil }) // remove callback before it has a chance of firing - cp.removeCheckpointCallback(f3) - cp.removeCheckpointCallback(f3) + cp.removeCheckpointCallback(f3.Name()) + cp.removeCheckpointCallback(f3.Name()) var dmb DirManifestBuilder diff --git a/snapshot/snapshotfs/upload.go b/snapshot/snapshotfs/upload.go index 0eb8d4ee6..5da983f26 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/snapshotfs/upload.go @@ -25,6 +25,7 @@ "github.com/kopia/kopia/internal/timetrack" "github.com/kopia/kopia/internal/workshare" "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/compression" "github.com/kopia/kopia/repo/logging" "github.com/kopia/kopia/repo/object" "github.com/kopia/kopia/snapshot" @@ -155,6 +156,12 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis } } + comp := pol.CompressionPolicy.CompressorForFile(f) + + return u.uploadFileData(ctx, parentCheckpointRegistry, f, f.Name(), 0, -1, comp) +} + +func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, f fs.File, fname string, offset, length int64, compressor compression.Name) (*snapshot.DirEntry, error) { file, err := f.Open(ctx) if err != nil { return nil, errors.Wrap(err, "unable to open file") @@ -162,13 +169,13 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis defer file.Close() //nolint:errcheck writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{ - Description: "FILE:" + f.Name(), - Compressor: pol.CompressionPolicy.CompressorForFile(f), + Description: "FILE:" + fname, + Compressor: compressor, AsyncWrites: 1, // upload chunk in parallel to writing another chunk }) defer writer.Close() //nolint:errcheck - parentCheckpointRegistry.addCheckpointCallback(f, func() (*snapshot.DirEntry, error) { + parentCheckpointRegistry.addCheckpointCallback(fname, func() (*snapshot.DirEntry, error) { // nolint:govet checkpointID, err := writer.Checkpoint() if err != nil { @@ -179,12 +186,23 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis return nil, nil } - return newDirEntry(f, checkpointID) + return newDirEntry(f, fname, checkpointID) }) - defer parentCheckpointRegistry.removeCheckpointCallback(f) + defer parentCheckpointRegistry.removeCheckpointCallback(fname) - written, err := u.copyWithProgress(writer, file, 0, f.Size()) + if offset != 0 { + if _, serr := file.Seek(offset, io.SeekStart); serr != nil { + return nil, errors.Wrap(serr, "seek error") + } + } + + var s io.Reader = file + if length >= 0 { + s = io.LimitReader(s, length) + } + + written, err := u.copyWithProgress(writer, s) if err != nil { return nil, err } @@ -199,7 +217,7 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis return nil, errors.Wrap(err, "unable to get result") } - de, err := newDirEntry(fi2, r) + de, err := newDirEntry(fi2, fname, r) if err != nil { return nil, errors.Wrap(err, "unable to create dir entry") } @@ -226,7 +244,7 @@ func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath strin }) defer writer.Close() //nolint:errcheck - written, err := u.copyWithProgress(writer, bytes.NewBufferString(target), 0, f.Size()) + written, err := u.copyWithProgress(writer, bytes.NewBufferString(target)) if err != nil { return nil, err } @@ -236,7 +254,7 @@ func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath strin return nil, errors.Wrap(err, "unable to get result") } - de, err := newDirEntry(f, r) + de, err := newDirEntry(f, f.Name(), r) if err != nil { return nil, errors.Wrap(err, "unable to create dir entry") } @@ -265,7 +283,7 @@ func (u *Uploader) uploadStreamingFileInternal(ctx context.Context, relativePath }) defer writer.Close() //nolint:errcheck - written, err := u.copyWithProgress(writer, reader, 0, f.Size()) + written, err := u.copyWithProgress(writer, reader) if err != nil { return nil, err } @@ -275,7 +293,7 @@ func (u *Uploader) uploadStreamingFileInternal(ctx context.Context, relativePath return nil, errors.Wrap(err, "unable to get result") } - de, err := newDirEntry(f, r) + de, err := newDirEntry(f, f.Name(), r) if err != nil { return nil, errors.Wrap(err, "unable to create dir entry") } @@ -290,7 +308,7 @@ func (u *Uploader) uploadStreamingFileInternal(ctx context.Context, relativePath return de, nil } -func (u *Uploader) copyWithProgress(dst io.Writer, src io.Reader, completed, length int64) (int64, error) { +func (u *Uploader) copyWithProgress(dst io.Writer, src io.Reader) (int64, error) { uploadBuf := iocopy.GetBuffer() defer iocopy.ReleaseBuffer(uploadBuf) @@ -303,18 +321,12 @@ func (u *Uploader) copyWithProgress(dst io.Writer, src io.Reader, completed, len readBytes, readErr := src.Read(uploadBuf) - // nolint:nestif if readBytes > 0 { wroteBytes, writeErr := dst.Write(uploadBuf[0:readBytes]) if wroteBytes > 0 { written += int64(wroteBytes) - completed += int64(wroteBytes) atomic.AddInt64(&u.totalWrittenBytes, int64(wroteBytes)) u.Progress.HashedBytes(int64(wroteBytes)) - - if length < completed { - length = completed - } } if writeErr != nil { @@ -342,7 +354,7 @@ func (u *Uploader) copyWithProgress(dst io.Writer, src io.Reader, completed, len // newDirEntryWithSummary makes DirEntry objects for directory Entries that need a DirectorySummary. func newDirEntryWithSummary(d fs.Entry, oid object.ID, summ *fs.DirectorySummary) (*snapshot.DirEntry, error) { - de, err := newDirEntry(d, oid) + de, err := newDirEntry(d, d.Name(), oid) if err != nil { return nil, err } @@ -353,7 +365,7 @@ func newDirEntryWithSummary(d fs.Entry, oid object.ID, summ *fs.DirectorySummary } // newDirEntry makes DirEntry objects for any type of Entry. -func newDirEntry(md fs.Entry, oid object.ID) (*snapshot.DirEntry, error) { +func newDirEntry(md fs.Entry, fname string, oid object.ID) (*snapshot.DirEntry, error) { var entryType snapshot.EntryType switch md := md.(type) { @@ -368,7 +380,7 @@ func newDirEntry(md fs.Entry, oid object.ID) (*snapshot.DirEntry, error) { } return &snapshot.DirEntry{ - Name: md.Name(), + Name: fname, Type: entryType, Permissions: snapshot.Permissions(md.Mode() & os.ModePerm), FileSize: md.Size(), @@ -712,7 +724,7 @@ func (u *Uploader) processSingle( u.Progress.CachedFile(entryRelativePath, entry.Size()) // compute entryResult now, cachedEntry is short-lived - cachedDirEntry, err := newDirEntry(entry, cachedEntry.(object.HasObjectID).ObjectID()) + cachedDirEntry, err := newDirEntry(entry, entry.Name(), cachedEntry.(object.HasObjectID).ObjectID()) if err != nil { return errors.Wrap(err, "unable to create dir entry") } @@ -1001,7 +1013,7 @@ func uploadDirInternal( childCheckpointRegistry := &checkpointRegistry{} - thisCheckpointRegistry.addCheckpointCallback(directory, func() (*snapshot.DirEntry, error) { + thisCheckpointRegistry.addCheckpointCallback(directory.Name(), func() (*snapshot.DirEntry, error) { // when snapshotting the parent, snapshot all our children and tell them to populate // childCheckpointBuilder thisCheckpointBuilder := thisDirBuilder.Clone() @@ -1019,7 +1031,7 @@ func uploadDirInternal( return newDirEntryWithSummary(directory, oid, checkpointManifest.Summary) }) - defer thisCheckpointRegistry.removeCheckpointCallback(directory) + defer thisCheckpointRegistry.removeCheckpointCallback(directory.Name()) if err := u.processChildren(ctx, childCheckpointRegistry, thisDirBuilder, localDirPathOrEmpty, dirRelativePath, directory, policyTree, uniqueDirectories(previousDirs)); err != nil && !errors.Is(err, errCanceled) { return nil, err