fix(snapshots): cached stream file size (#2690)

* Properly populate file size for cached entries

StreamingFiles don't know their size until they are read. This leads to
entries marked with size 0 because size is not compared when determining
if something is cached or not as the size is sourced from the current
(unread) entry.

Instead, create the dir entry from the previous entry. As StreamingFile
do not allow setting file mode, permissions, owner, etc using the old
entry suffices.

* Use cached entry size for stats

Also use the cached entry size when calculating stats since
StreamingFile has 0 size.

* Update tests for cached files to check size

Check at least the total size of the snapshot is updated when handling
StreamingFiles.
This commit is contained in:
ashmrtn
2023-01-20 14:07:39 -08:00
committed by GitHub
parent ba938cf58c
commit 3bb7879a8c
2 changed files with 22 additions and 4 deletions

View File

@@ -463,6 +463,22 @@ func newDirEntry(md fs.Entry, fname string, oid object.ID) (*snapshot.DirEntry,
}, nil
}
// newCachedDirEntry makes DirEntry objects for entries that are also in
// previous snapshots. It ensures file sizes are populated correctly for
// StreamingFiles.
func newCachedDirEntry(md, cached fs.Entry, fname string) (*snapshot.DirEntry, error) {
hoid, ok := cached.(object.HasObjectID)
if !ok {
return nil, errors.New("cached entry does not implement HasObjectID")
}
if _, ok := md.(fs.StreamingFile); ok {
return newDirEntry(cached, fname, hoid.ObjectID())
}
return newDirEntry(md, fname, hoid.ObjectID())
}
// uploadFileWithCheckpointing uploads the specified File to the repository.
func (u *Uploader) uploadFileWithCheckpointing(ctx context.Context, relativePath string, file fs.File, pol *policy.Policy, sourceInfo snapshot.SourceInfo) (*snapshot.DirEntry, error) {
var cp checkpointRegistry
@@ -809,11 +825,11 @@ func (u *Uploader) processSingle(
// See if we had this name during either of previous passes.
if cachedEntry := u.maybeIgnoreCachedEntry(ctx, findCachedEntry(ctx, entryRelativePath, entry, prevDirs, policyTree)); cachedEntry != nil {
atomic.AddInt32(&u.stats.CachedFiles, 1)
atomic.AddInt64(&u.stats.TotalFileSize, entry.Size())
u.Progress.CachedFile(entryRelativePath, entry.Size())
atomic.AddInt64(&u.stats.TotalFileSize, cachedEntry.Size())
u.Progress.CachedFile(entryRelativePath, cachedEntry.Size())
cachedDirEntry, err := newCachedDirEntry(entry, cachedEntry, entry.Name())
// compute entryResult now, cachedEntry is short-lived
cachedDirEntry, err := newDirEntry(entry, entry.Name(), cachedEntry.(object.HasObjectID).ObjectID())
u.Progress.FinishedFile(entryRelativePath, err)
if err != nil {

View File

@@ -923,6 +923,7 @@ func TestUpload_VirtualDirectoryWithStreamingFileWithModTime(t *testing.T) {
require.Equal(t, int32(1), atomic.LoadInt32(&man1.Stats.NonCachedFiles))
require.Equal(t, int32(1), atomic.LoadInt32(&man1.Stats.TotalDirectoryCount))
require.Equal(t, int32(1), atomic.LoadInt32(&man1.Stats.TotalFileCount))
require.Equal(t, int64(len(content)), atomic.LoadInt64(&man1.Stats.TotalFileSize))
// wait a little bit to ensure clock moves forward which is not always the case on Windows.
time.Sleep(100 * time.Millisecond)
@@ -941,6 +942,7 @@ func TestUpload_VirtualDirectoryWithStreamingFileWithModTime(t *testing.T) {
assert.Equal(t, tc.uploadedFiles, atomic.LoadInt32(&man2.Stats.NonCachedFiles))
// Cached files don't count towards the total file count.
assert.Equal(t, tc.uploadedFiles, atomic.LoadInt32(&man2.Stats.TotalFileCount))
require.Equal(t, int64(len(content)), atomic.LoadInt64(&man2.Stats.TotalFileSize))
})
}
}