diff --git a/snapshot/snapshotfs/upload.go b/snapshot/snapshotfs/upload.go index ac5a5cbb6..a945122e3 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/snapshotfs/upload.go @@ -75,7 +75,7 @@ func (u *Uploader) cancelReason() string { return "" } -func (u *Uploader) uploadFileInternal(ctx context.Context, f fs.File) entryResult { +func (u *Uploader) uploadFileInternal(ctx context.Context, f fs.File, pol *policy.Policy) entryResult { file, err := f.Open(ctx) if err != nil { return entryResult{err: errors.Wrap(err, "unable to open file")} @@ -84,6 +84,7 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, f fs.File) entryResul writer := u.repo.Objects.NewWriter(ctx, object.WriterOptions{ Description: "FILE:" + f.Name(), + Compressor: pol.CompressionPolicy.CompressorForFile(f.Name()), }) defer writer.Close() //nolint:errcheck @@ -236,8 +237,8 @@ func newDirEntry(md fs.Entry, oid object.ID) (*snapshot.DirEntry, error) { } // uploadFile uploads the specified File to the repository. -func (u *Uploader) uploadFile(ctx context.Context, file fs.File) (*snapshot.DirEntry, error) { - res := u.uploadFileInternal(ctx, file) +func (u *Uploader) uploadFile(ctx context.Context, file fs.File, pol *policy.Policy) (*snapshot.DirEntry, error) { + res := u.uploadFileInternal(ctx, file, pol) if res.err != nil { return nil, res.err } @@ -259,8 +260,8 @@ func (u *Uploader) uploadFile(ctx context.Context, file fs.File) (*snapshot.DirE // uploadDir uploads the specified Directory to the repository. // An optional ID of a hash-cache object may be provided, in which case the Uploader will use its // contents to avoid hashing -func (u *Uploader) uploadDir(ctx context.Context, rootDir fs.Directory, previousDirs []fs.Directory) (*snapshot.DirEntry, error) { - oid, summ, err := uploadDirInternal(ctx, u, rootDir, previousDirs, ".") +func (u *Uploader) uploadDir(ctx context.Context, rootDir fs.Directory, policyTree *policy.Tree, previousDirs []fs.Directory) (*snapshot.DirEntry, error) { + oid, summ, err := uploadDirInternal(ctx, u, rootDir, policyTree, previousDirs, ".") if err != nil { return nil, err } @@ -296,7 +297,7 @@ type entryResult struct { de *snapshot.DirEntry } -func (u *Uploader) processSubdirectories(ctx context.Context, relativePath string, entries fs.Entries, previousEntries []fs.Entries, dirManifest *snapshot.DirManifest, summ *fs.DirectorySummary) error { +func (u *Uploader) processSubdirectories(ctx context.Context, relativePath string, entries fs.Entries, policyTree *policy.Tree, previousEntries []fs.Entries, dirManifest *snapshot.DirManifest, summ *fs.DirectorySummary) error { return u.foreachEntryUnlessCancelled(relativePath, entries, func(entry fs.Entry, entryRelativePath string) error { dir, ok := entry.(fs.Directory) if !ok { @@ -313,7 +314,7 @@ func (u *Uploader) processSubdirectories(ctx context.Context, relativePath strin previousDirs = uniqueDirectories(previousDirs) - oid, subdirsumm, err := uploadDirInternal(ctx, u, dir, previousDirs, entryRelativePath) + oid, subdirsumm, err := uploadDirInternal(ctx, u, dir, policyTree.Child(entry.Name()), previousDirs, entryRelativePath) if err == errCancelled { return err } @@ -423,7 +424,7 @@ func (u *Uploader) maybeIgnoreCachedEntry(ent fs.Entry) fs.Entry { return nil } -func (u *Uploader) prepareWorkItems(ctx context.Context, dirRelativePath string, entries fs.Entries, prevEntries []fs.Entries, summ *fs.DirectorySummary) ([]*uploadWorkItem, error) { +func (u *Uploader) prepareWorkItems(ctx context.Context, dirRelativePath string, entries fs.Entries, policyTree *policy.Tree, prevEntries []fs.Entries, summ *fs.DirectorySummary) ([]*uploadWorkItem, error) { var result []*uploadWorkItem resultErr := u.foreachEntryUnlessCancelled(dirRelativePath, entries, func(entry fs.Entry, entryRelativePath string) error { @@ -479,7 +480,7 @@ func (u *Uploader) prepareWorkItems(ctx context.Context, dirRelativePath string, entry: entry, entryRelativePath: entryRelativePath, uploadFunc: func() entryResult { - return u.uploadFileInternal(ctx, entry) + return u.uploadFileInternal(ctx, entry, policyTree.Child(entry.Name()).EffectivePolicy()) }, }) @@ -606,6 +607,7 @@ func uploadDirInternal( ctx context.Context, u *Uploader, directory fs.Directory, + policyTree *policy.Tree, previousDirs []fs.Directory, dirRelativePath string, ) (object.ID, fs.DirectorySummary, error) { @@ -644,14 +646,14 @@ func uploadDirInternal( StreamType: directoryStreamType, } - if err := u.processSubdirectories(ctx, dirRelativePath, entries, prevEntries, dirManifest, &summ); err != nil && err != errCancelled { + if err := u.processSubdirectories(ctx, dirRelativePath, entries, policyTree, prevEntries, dirManifest, &summ); err != nil && err != errCancelled { return "", fs.DirectorySummary{}, err } u.prepareProgress(dirRelativePath, entries) log.Debugf("preparing work items %v", dirRelativePath) - workItems, workItemErr := u.prepareWorkItems(ctx, dirRelativePath, entries, prevEntries, &summ) + workItems, workItemErr := u.prepareWorkItems(ctx, dirRelativePath, entries, policyTree, prevEntries, &summ) log.Debugf("finished preparing work items %v", dirRelativePath) if workItemErr != nil && workItemErr != errCancelled { @@ -751,10 +753,10 @@ func (u *Uploader) Upload( entry = ignorefs.New(entry, policyTree, ignorefs.ReportIgnoredFiles(func(_ string, md fs.Entry) { u.stats.AddExcluded(md) })) - s.RootEntry, err = u.uploadDir(ctx, entry, previousDirs) + s.RootEntry, err = u.uploadDir(ctx, entry, policyTree, previousDirs) case fs.File: - s.RootEntry, err = u.uploadFile(ctx, entry) + s.RootEntry, err = u.uploadFile(ctx, entry, policyTree.EffectivePolicy()) default: return nil, errors.Errorf("unsupported source: %v", s.Source)