upload: added support for uploading compressed objects based on policies

This commit is contained in:
Jarek Kowalski
2019-12-07 14:33:28 -08:00
parent aec3cdcb2f
commit c7be9cb45e

View File

@@ -75,7 +75,7 @@ func (u *Uploader) cancelReason() string {
return ""
}
func (u *Uploader) uploadFileInternal(ctx context.Context, f fs.File) entryResult {
func (u *Uploader) uploadFileInternal(ctx context.Context, f fs.File, pol *policy.Policy) entryResult {
file, err := f.Open(ctx)
if err != nil {
return entryResult{err: errors.Wrap(err, "unable to open file")}
@@ -84,6 +84,7 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, f fs.File) entryResul
writer := u.repo.Objects.NewWriter(ctx, object.WriterOptions{
Description: "FILE:" + f.Name(),
Compressor: pol.CompressionPolicy.CompressorForFile(f.Name()),
})
defer writer.Close() //nolint:errcheck
@@ -236,8 +237,8 @@ func newDirEntry(md fs.Entry, oid object.ID) (*snapshot.DirEntry, error) {
}
// uploadFile uploads the specified File to the repository.
func (u *Uploader) uploadFile(ctx context.Context, file fs.File) (*snapshot.DirEntry, error) {
res := u.uploadFileInternal(ctx, file)
func (u *Uploader) uploadFile(ctx context.Context, file fs.File, pol *policy.Policy) (*snapshot.DirEntry, error) {
res := u.uploadFileInternal(ctx, file, pol)
if res.err != nil {
return nil, res.err
}
@@ -259,8 +260,8 @@ func (u *Uploader) uploadFile(ctx context.Context, file fs.File) (*snapshot.DirE
// uploadDir uploads the specified Directory to the repository.
// An optional ID of a hash-cache object may be provided, in which case the Uploader will use its
// contents to avoid hashing
func (u *Uploader) uploadDir(ctx context.Context, rootDir fs.Directory, previousDirs []fs.Directory) (*snapshot.DirEntry, error) {
oid, summ, err := uploadDirInternal(ctx, u, rootDir, previousDirs, ".")
func (u *Uploader) uploadDir(ctx context.Context, rootDir fs.Directory, policyTree *policy.Tree, previousDirs []fs.Directory) (*snapshot.DirEntry, error) {
oid, summ, err := uploadDirInternal(ctx, u, rootDir, policyTree, previousDirs, ".")
if err != nil {
return nil, err
}
@@ -296,7 +297,7 @@ type entryResult struct {
de *snapshot.DirEntry
}
func (u *Uploader) processSubdirectories(ctx context.Context, relativePath string, entries fs.Entries, previousEntries []fs.Entries, dirManifest *snapshot.DirManifest, summ *fs.DirectorySummary) error {
func (u *Uploader) processSubdirectories(ctx context.Context, relativePath string, entries fs.Entries, policyTree *policy.Tree, previousEntries []fs.Entries, dirManifest *snapshot.DirManifest, summ *fs.DirectorySummary) error {
return u.foreachEntryUnlessCancelled(relativePath, entries, func(entry fs.Entry, entryRelativePath string) error {
dir, ok := entry.(fs.Directory)
if !ok {
@@ -313,7 +314,7 @@ func (u *Uploader) processSubdirectories(ctx context.Context, relativePath strin
previousDirs = uniqueDirectories(previousDirs)
oid, subdirsumm, err := uploadDirInternal(ctx, u, dir, previousDirs, entryRelativePath)
oid, subdirsumm, err := uploadDirInternal(ctx, u, dir, policyTree.Child(entry.Name()), previousDirs, entryRelativePath)
if err == errCancelled {
return err
}
@@ -423,7 +424,7 @@ func (u *Uploader) maybeIgnoreCachedEntry(ent fs.Entry) fs.Entry {
return nil
}
func (u *Uploader) prepareWorkItems(ctx context.Context, dirRelativePath string, entries fs.Entries, prevEntries []fs.Entries, summ *fs.DirectorySummary) ([]*uploadWorkItem, error) {
func (u *Uploader) prepareWorkItems(ctx context.Context, dirRelativePath string, entries fs.Entries, policyTree *policy.Tree, prevEntries []fs.Entries, summ *fs.DirectorySummary) ([]*uploadWorkItem, error) {
var result []*uploadWorkItem
resultErr := u.foreachEntryUnlessCancelled(dirRelativePath, entries, func(entry fs.Entry, entryRelativePath string) error {
@@ -479,7 +480,7 @@ func (u *Uploader) prepareWorkItems(ctx context.Context, dirRelativePath string,
entry: entry,
entryRelativePath: entryRelativePath,
uploadFunc: func() entryResult {
return u.uploadFileInternal(ctx, entry)
return u.uploadFileInternal(ctx, entry, policyTree.Child(entry.Name()).EffectivePolicy())
},
})
@@ -606,6 +607,7 @@ func uploadDirInternal(
ctx context.Context,
u *Uploader,
directory fs.Directory,
policyTree *policy.Tree,
previousDirs []fs.Directory,
dirRelativePath string,
) (object.ID, fs.DirectorySummary, error) {
@@ -644,14 +646,14 @@ func uploadDirInternal(
StreamType: directoryStreamType,
}
if err := u.processSubdirectories(ctx, dirRelativePath, entries, prevEntries, dirManifest, &summ); err != nil && err != errCancelled {
if err := u.processSubdirectories(ctx, dirRelativePath, entries, policyTree, prevEntries, dirManifest, &summ); err != nil && err != errCancelled {
return "", fs.DirectorySummary{}, err
}
u.prepareProgress(dirRelativePath, entries)
log.Debugf("preparing work items %v", dirRelativePath)
workItems, workItemErr := u.prepareWorkItems(ctx, dirRelativePath, entries, prevEntries, &summ)
workItems, workItemErr := u.prepareWorkItems(ctx, dirRelativePath, entries, policyTree, prevEntries, &summ)
log.Debugf("finished preparing work items %v", dirRelativePath)
if workItemErr != nil && workItemErr != errCancelled {
@@ -751,10 +753,10 @@ func (u *Uploader) Upload(
entry = ignorefs.New(entry, policyTree, ignorefs.ReportIgnoredFiles(func(_ string, md fs.Entry) {
u.stats.AddExcluded(md)
}))
s.RootEntry, err = u.uploadDir(ctx, entry, previousDirs)
s.RootEntry, err = u.uploadDir(ctx, entry, policyTree, previousDirs)
case fs.File:
s.RootEntry, err = u.uploadFile(ctx, entry)
s.RootEntry, err = u.uploadFile(ctx, entry, policyTree.EffectivePolicy())
default:
return nil, errors.Errorf("unsupported source: %v", s.Source)