mirror of
https://github.com/kopia/kopia.git
synced 2026-05-19 12:14:45 -04:00
refactor(snapshots): refactored upload of files (#2063)
This refactors a helper that will later be used for parallel uploading of a large files.
This commit is contained in:
@@ -6,7 +6,6 @@
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
)
|
||||
|
||||
@@ -21,7 +20,7 @@ type checkpointRegistry struct {
|
||||
checkpoints map[string]checkpointFunc
|
||||
}
|
||||
|
||||
func (r *checkpointRegistry) addCheckpointCallback(e fs.Entry, f checkpointFunc) {
|
||||
func (r *checkpointRegistry) addCheckpointCallback(entryName string, f checkpointFunc) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
@@ -29,14 +28,14 @@ func (r *checkpointRegistry) addCheckpointCallback(e fs.Entry, f checkpointFunc)
|
||||
r.checkpoints = map[string]checkpointFunc{}
|
||||
}
|
||||
|
||||
r.checkpoints[e.Name()] = f
|
||||
r.checkpoints[entryName] = f
|
||||
}
|
||||
|
||||
func (r *checkpointRegistry) removeCheckpointCallback(e fs.Entry) {
|
||||
func (r *checkpointRegistry) removeCheckpointCallback(entryName string) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
delete(r.checkpoints, e.Name())
|
||||
delete(r.checkpoints, entryName)
|
||||
}
|
||||
|
||||
// runCheckpoints invokes all registered checkpointers and adds results to the provided builder, while
|
||||
|
||||
@@ -20,38 +20,38 @@ func TestCheckpointRegistry(t *testing.T) {
|
||||
f3 := d.AddFile("f3", []byte{2, 3, 4}, os.FileMode(0o755))
|
||||
f4 := d.AddFile("f3", []byte{2, 3, 4}, os.FileMode(0o755))
|
||||
|
||||
cp.addCheckpointCallback(dir1, func() (*snapshot.DirEntry, error) {
|
||||
cp.addCheckpointCallback(dir1.Name(), func() (*snapshot.DirEntry, error) {
|
||||
return &snapshot.DirEntry{
|
||||
Name: "dir1",
|
||||
Type: snapshot.EntryTypeDirectory,
|
||||
}, nil
|
||||
})
|
||||
|
||||
cp.addCheckpointCallback(f1, func() (*snapshot.DirEntry, error) {
|
||||
cp.addCheckpointCallback(f1.Name(), func() (*snapshot.DirEntry, error) {
|
||||
return &snapshot.DirEntry{
|
||||
Name: "f1",
|
||||
}, nil
|
||||
})
|
||||
|
||||
cp.addCheckpointCallback(f2, func() (*snapshot.DirEntry, error) {
|
||||
cp.addCheckpointCallback(f2.Name(), func() (*snapshot.DirEntry, error) {
|
||||
return &snapshot.DirEntry{
|
||||
Name: "f2",
|
||||
}, nil
|
||||
})
|
||||
|
||||
cp.addCheckpointCallback(f3, func() (*snapshot.DirEntry, error) {
|
||||
cp.addCheckpointCallback(f3.Name(), func() (*snapshot.DirEntry, error) {
|
||||
return &snapshot.DirEntry{
|
||||
Name: "other",
|
||||
}, nil
|
||||
})
|
||||
|
||||
cp.addCheckpointCallback(f4, func() (*snapshot.DirEntry, error) {
|
||||
cp.addCheckpointCallback(f4.Name(), func() (*snapshot.DirEntry, error) {
|
||||
return nil, nil
|
||||
})
|
||||
|
||||
// remove callback before it has a chance of firing
|
||||
cp.removeCheckpointCallback(f3)
|
||||
cp.removeCheckpointCallback(f3)
|
||||
cp.removeCheckpointCallback(f3.Name())
|
||||
cp.removeCheckpointCallback(f3.Name())
|
||||
|
||||
var dmb DirManifestBuilder
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
"github.com/kopia/kopia/internal/timetrack"
|
||||
"github.com/kopia/kopia/internal/workshare"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/compression"
|
||||
"github.com/kopia/kopia/repo/logging"
|
||||
"github.com/kopia/kopia/repo/object"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
@@ -155,6 +156,12 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis
|
||||
}
|
||||
}
|
||||
|
||||
comp := pol.CompressionPolicy.CompressorForFile(f)
|
||||
|
||||
return u.uploadFileData(ctx, parentCheckpointRegistry, f, f.Name(), 0, -1, comp)
|
||||
}
|
||||
|
||||
func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, f fs.File, fname string, offset, length int64, compressor compression.Name) (*snapshot.DirEntry, error) {
|
||||
file, err := f.Open(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to open file")
|
||||
@@ -162,13 +169,13 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis
|
||||
defer file.Close() //nolint:errcheck
|
||||
|
||||
writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{
|
||||
Description: "FILE:" + f.Name(),
|
||||
Compressor: pol.CompressionPolicy.CompressorForFile(f),
|
||||
Description: "FILE:" + fname,
|
||||
Compressor: compressor,
|
||||
AsyncWrites: 1, // upload chunk in parallel to writing another chunk
|
||||
})
|
||||
defer writer.Close() //nolint:errcheck
|
||||
|
||||
parentCheckpointRegistry.addCheckpointCallback(f, func() (*snapshot.DirEntry, error) {
|
||||
parentCheckpointRegistry.addCheckpointCallback(fname, func() (*snapshot.DirEntry, error) {
|
||||
// nolint:govet
|
||||
checkpointID, err := writer.Checkpoint()
|
||||
if err != nil {
|
||||
@@ -179,12 +186,23 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return newDirEntry(f, checkpointID)
|
||||
return newDirEntry(f, fname, checkpointID)
|
||||
})
|
||||
|
||||
defer parentCheckpointRegistry.removeCheckpointCallback(f)
|
||||
defer parentCheckpointRegistry.removeCheckpointCallback(fname)
|
||||
|
||||
written, err := u.copyWithProgress(writer, file, 0, f.Size())
|
||||
if offset != 0 {
|
||||
if _, serr := file.Seek(offset, io.SeekStart); serr != nil {
|
||||
return nil, errors.Wrap(serr, "seek error")
|
||||
}
|
||||
}
|
||||
|
||||
var s io.Reader = file
|
||||
if length >= 0 {
|
||||
s = io.LimitReader(s, length)
|
||||
}
|
||||
|
||||
written, err := u.copyWithProgress(writer, s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -199,7 +217,7 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis
|
||||
return nil, errors.Wrap(err, "unable to get result")
|
||||
}
|
||||
|
||||
de, err := newDirEntry(fi2, r)
|
||||
de, err := newDirEntry(fi2, fname, r)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to create dir entry")
|
||||
}
|
||||
@@ -226,7 +244,7 @@ func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath strin
|
||||
})
|
||||
defer writer.Close() //nolint:errcheck
|
||||
|
||||
written, err := u.copyWithProgress(writer, bytes.NewBufferString(target), 0, f.Size())
|
||||
written, err := u.copyWithProgress(writer, bytes.NewBufferString(target))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -236,7 +254,7 @@ func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath strin
|
||||
return nil, errors.Wrap(err, "unable to get result")
|
||||
}
|
||||
|
||||
de, err := newDirEntry(f, r)
|
||||
de, err := newDirEntry(f, f.Name(), r)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to create dir entry")
|
||||
}
|
||||
@@ -265,7 +283,7 @@ func (u *Uploader) uploadStreamingFileInternal(ctx context.Context, relativePath
|
||||
})
|
||||
defer writer.Close() //nolint:errcheck
|
||||
|
||||
written, err := u.copyWithProgress(writer, reader, 0, f.Size())
|
||||
written, err := u.copyWithProgress(writer, reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -275,7 +293,7 @@ func (u *Uploader) uploadStreamingFileInternal(ctx context.Context, relativePath
|
||||
return nil, errors.Wrap(err, "unable to get result")
|
||||
}
|
||||
|
||||
de, err := newDirEntry(f, r)
|
||||
de, err := newDirEntry(f, f.Name(), r)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to create dir entry")
|
||||
}
|
||||
@@ -290,7 +308,7 @@ func (u *Uploader) uploadStreamingFileInternal(ctx context.Context, relativePath
|
||||
return de, nil
|
||||
}
|
||||
|
||||
func (u *Uploader) copyWithProgress(dst io.Writer, src io.Reader, completed, length int64) (int64, error) {
|
||||
func (u *Uploader) copyWithProgress(dst io.Writer, src io.Reader) (int64, error) {
|
||||
uploadBuf := iocopy.GetBuffer()
|
||||
defer iocopy.ReleaseBuffer(uploadBuf)
|
||||
|
||||
@@ -303,18 +321,12 @@ func (u *Uploader) copyWithProgress(dst io.Writer, src io.Reader, completed, len
|
||||
|
||||
readBytes, readErr := src.Read(uploadBuf)
|
||||
|
||||
// nolint:nestif
|
||||
if readBytes > 0 {
|
||||
wroteBytes, writeErr := dst.Write(uploadBuf[0:readBytes])
|
||||
if wroteBytes > 0 {
|
||||
written += int64(wroteBytes)
|
||||
completed += int64(wroteBytes)
|
||||
atomic.AddInt64(&u.totalWrittenBytes, int64(wroteBytes))
|
||||
u.Progress.HashedBytes(int64(wroteBytes))
|
||||
|
||||
if length < completed {
|
||||
length = completed
|
||||
}
|
||||
}
|
||||
|
||||
if writeErr != nil {
|
||||
@@ -342,7 +354,7 @@ func (u *Uploader) copyWithProgress(dst io.Writer, src io.Reader, completed, len
|
||||
|
||||
// newDirEntryWithSummary makes DirEntry objects for directory Entries that need a DirectorySummary.
|
||||
func newDirEntryWithSummary(d fs.Entry, oid object.ID, summ *fs.DirectorySummary) (*snapshot.DirEntry, error) {
|
||||
de, err := newDirEntry(d, oid)
|
||||
de, err := newDirEntry(d, d.Name(), oid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -353,7 +365,7 @@ func newDirEntryWithSummary(d fs.Entry, oid object.ID, summ *fs.DirectorySummary
|
||||
}
|
||||
|
||||
// newDirEntry makes DirEntry objects for any type of Entry.
|
||||
func newDirEntry(md fs.Entry, oid object.ID) (*snapshot.DirEntry, error) {
|
||||
func newDirEntry(md fs.Entry, fname string, oid object.ID) (*snapshot.DirEntry, error) {
|
||||
var entryType snapshot.EntryType
|
||||
|
||||
switch md := md.(type) {
|
||||
@@ -368,7 +380,7 @@ func newDirEntry(md fs.Entry, oid object.ID) (*snapshot.DirEntry, error) {
|
||||
}
|
||||
|
||||
return &snapshot.DirEntry{
|
||||
Name: md.Name(),
|
||||
Name: fname,
|
||||
Type: entryType,
|
||||
Permissions: snapshot.Permissions(md.Mode() & os.ModePerm),
|
||||
FileSize: md.Size(),
|
||||
@@ -712,7 +724,7 @@ func (u *Uploader) processSingle(
|
||||
u.Progress.CachedFile(entryRelativePath, entry.Size())
|
||||
|
||||
// compute entryResult now, cachedEntry is short-lived
|
||||
cachedDirEntry, err := newDirEntry(entry, cachedEntry.(object.HasObjectID).ObjectID())
|
||||
cachedDirEntry, err := newDirEntry(entry, entry.Name(), cachedEntry.(object.HasObjectID).ObjectID())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to create dir entry")
|
||||
}
|
||||
@@ -1001,7 +1013,7 @@ func uploadDirInternal(
|
||||
|
||||
childCheckpointRegistry := &checkpointRegistry{}
|
||||
|
||||
thisCheckpointRegistry.addCheckpointCallback(directory, func() (*snapshot.DirEntry, error) {
|
||||
thisCheckpointRegistry.addCheckpointCallback(directory.Name(), func() (*snapshot.DirEntry, error) {
|
||||
// when snapshotting the parent, snapshot all our children and tell them to populate
|
||||
// childCheckpointBuilder
|
||||
thisCheckpointBuilder := thisDirBuilder.Clone()
|
||||
@@ -1019,7 +1031,7 @@ func uploadDirInternal(
|
||||
|
||||
return newDirEntryWithSummary(directory, oid, checkpointManifest.Summary)
|
||||
})
|
||||
defer thisCheckpointRegistry.removeCheckpointCallback(directory)
|
||||
defer thisCheckpointRegistry.removeCheckpointCallback(directory.Name())
|
||||
|
||||
if err := u.processChildren(ctx, childCheckpointRegistry, thisDirBuilder, localDirPathOrEmpty, dirRelativePath, directory, policyTree, uniqueDirectories(previousDirs)); err != nil && !errors.Is(err, errCanceled) {
|
||||
return nil, err
|
||||
|
||||
Reference in New Issue
Block a user