feat(repository): Metadata compression config support for directory and indirect content (#4080)

* Configure compressor for k and x prefixed content

Adds metadata compression setting to policy
Add support to configure compressor for k and x prefixed content
Set zstd-fastest as the default compressor for metadata in the policy
Adds support to set and show metadata compression to kopia policy commands
Adds metadata compression config to dir writer

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

* Pass concatenate options with ConcatenateOptions struct

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

* Move content compression handling to caller

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

* Move handling manifests to manifest pkg

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

* Correct const in server_test

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

* Remove unnecessary whitespace

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

* Disable metadata compression for < V2 format

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

---------

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>
This commit is contained in:
Prasad Ghangal
2024-10-24 11:58:23 +05:30
committed by GitHub
parent e20ec3d290
commit 3bf947d746
29 changed files with 465 additions and 137 deletions

View File

@@ -19,6 +19,7 @@ type commandPolicySet struct {
policyActionFlags
policyCompressionFlags
policyMetadataCompressionFlags
policySplitterFlags
policyErrorFlags
policyFilesFlags
@@ -36,6 +37,7 @@ func (c *commandPolicySet) setup(svc appServices, parent commandParent) {
c.policyActionFlags.setup(cmd)
c.policyCompressionFlags.setup(cmd)
c.policyMetadataCompressionFlags.setup(cmd)
c.policySplitterFlags.setup(cmd)
c.policyErrorFlags.setup(cmd)
c.policyFilesFlags.setup(cmd)
@@ -108,6 +110,10 @@ func (c *commandPolicySet) setPolicyFromFlags(ctx context.Context, p *policy.Pol
return errors.Wrap(err, "compression policy")
}
if err := c.setMetadataCompressionPolicyFromFlags(ctx, &p.MetadataCompressionPolicy, changeCount); err != nil {
return errors.Wrap(err, "metadata compression policy")
}
if err := c.setSplitterPolicyFromFlags(ctx, &p.SplitterPolicy, changeCount); err != nil {
return errors.Wrap(err, "splitter policy")
}

View File

@@ -24,6 +24,37 @@ type policyCompressionFlags struct {
policySetClearNeverCompress bool
}
type policyMetadataCompressionFlags struct {
policySetMetadataCompressionAlgorithm string
}
func (c *policyMetadataCompressionFlags) setup(cmd *kingpin.CmdClause) {
// Name of compression algorithm.
cmd.Flag("metadata-compression", "Metadata Compression algorithm").EnumVar(&c.policySetMetadataCompressionAlgorithm, supportedCompressionAlgorithms()...)
}
func (c *policyMetadataCompressionFlags) setMetadataCompressionPolicyFromFlags(
ctx context.Context,
p *policy.MetadataCompressionPolicy,
changeCount *int,
) error { //nolint:unparam
if v := c.policySetMetadataCompressionAlgorithm; v != "" {
*changeCount++
if v == inheritPolicyString {
log(ctx).Info(" - resetting metadata compression algorithm to default value inherited from parent")
p.CompressorName = ""
} else {
log(ctx).Infof(" - setting metadata compression algorithm to %v", v)
p.CompressorName = compression.Name(v)
}
}
return nil
}
func (c *policyCompressionFlags) setup(cmd *kingpin.CmdClause) {
// Name of compression algorithm.
cmd.Flag("compression", "Compression algorithm").EnumVar(&c.policySetCompressionAlgorithm, supportedCompressionAlgorithms()...)

View File

@@ -126,6 +126,8 @@ func printPolicy(out *textOutput, p *policy.Policy, def *policy.Definition) {
rows = append(rows, policyTableRow{})
rows = appendCompressionPolicyRows(rows, p, def)
rows = append(rows, policyTableRow{})
rows = appendMetadataCompressionPolicyRows(rows, p, def)
rows = append(rows, policyTableRow{})
rows = appendSplitterPolicyRows(rows, p, def)
rows = append(rows, policyTableRow{})
rows = appendActionsPolicyRows(rows, p, def)
@@ -388,6 +390,17 @@ func appendCompressionPolicyRows(rows []policyTableRow, p *policy.Policy, def *p
return rows
}
func appendMetadataCompressionPolicyRows(rows []policyTableRow, p *policy.Policy, def *policy.Definition) []policyTableRow {
if p.MetadataCompressionPolicy.CompressorName == "" || p.MetadataCompressionPolicy.CompressorName == "none" {
rows = append(rows, policyTableRow{"Metadata compression disabled.", "", ""})
return rows
}
return append(rows,
policyTableRow{"Metadata compression:", "", ""},
policyTableRow{" Compressor:", string(p.MetadataCompressionPolicy.CompressorName), definitionPointToString(p.Target(), def.MetadataCompressionPolicy.CompressorName)})
}
func appendSplitterPolicyRows(rows []policyTableRow, p *policy.Policy, def *policy.Definition) []policyTableRow {
algorithm := p.SplitterPolicy.Algorithm
if algorithm == "" {

View File

@@ -10,6 +10,7 @@
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
)
@@ -90,12 +91,19 @@ func (c *commonRewriteSnapshots) rewriteMatchingSnapshots(ctx context.Context, r
for _, mg := range snapshot.GroupBySource(manifests) {
log(ctx).Infof("Processing snapshot %v", mg[0].Source)
policyTree, err := policy.TreeForSource(ctx, rep, mg[0].Source)
if err != nil {
return errors.Wrap(err, "unable to get policy tree")
}
metadataComp := policyTree.EffectivePolicy().MetadataCompressionPolicy.MetadataCompressor()
for _, man := range snapshot.SortByTime(mg, false) {
log(ctx).Debugf(" %v (%v)", formatTimestamp(man.StartTime.ToTime()), man.ID)
old := man.Clone()
changed, err := rw.RewriteSnapshotManifest(ctx, man)
changed, err := rw.RewriteSnapshotManifest(ctx, man, metadataComp)
if err != nil {
return errors.Wrap(err, "error rewriting manifest")
}

View File

@@ -314,7 +314,7 @@ func remoteRepositoryNotificationTest(t *testing.T, ctx context.Context, rep rep
func mustWriteObject(ctx context.Context, t *testing.T, w repo.RepositoryWriter, data []byte) object.ID {
t.Helper()
ow := w.NewObjectWriter(ctx, object.WriterOptions{})
ow := w.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
_, err := ow.Write(data)
require.NoError(t, err)

View File

@@ -42,7 +42,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetention(t *testing.T) {
nro.RetentionPeriod = period
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
@@ -103,7 +103,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionUnsupported(t *testing.
nro.RetentionMode = ""
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()

View File

@@ -30,10 +30,9 @@ func (sm *SharedManager) maybeCompressAndEncryptDataForPacking(data gather.Bytes
iv := getPackedContentIV(hashOutput[:0], contentID)
// If the content is prefixed (which represents Kopia's own metadata as opposed to user data),
// and we're on V2 format or greater, enable internal compression even when not requested.
if contentID.HasPrefix() && comp == NoCompression && mp.IndexVersion >= index.Version2 {
// 'zstd-fastest' has a good mix of being fast, low memory usage and high compression for JSON.
comp = compression.HeaderZstdFastest
// and we're on < V2 format, disable compression even when its requested.
if contentID.HasPrefix() && mp.IndexVersion < index.Version2 {
comp = NoCompression
}
//nolint:nestif

View File

File diff suppressed because one or more lines are too long

View File

@@ -401,7 +401,7 @@ func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) {
func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, data []byte, testCaseID string) {
t.Helper()
w := rep.NewObjectWriter(ctx, object.WriterOptions{})
w := rep.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
_, err := w.Write(data)
require.NoError(t, err, testCaseID)

View File

@@ -560,9 +560,9 @@ func (r *grpcRepositoryClient) NewWriter(ctx context.Context, opt WriteSessionOp
}
// ConcatenateObjects creates a concatenated objects from the provided object IDs.
func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) {
func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID, opt ConcatenateOptions) (object.ID, error) {
//nolint:wrapcheck
return r.omgr.Concatenate(ctx, objectIDs)
return r.omgr.Concatenate(ctx, objectIDs, opt.Compressor)
}
// maybeRetry executes the provided callback with or without automatic retries depending on how

View File

@@ -46,7 +46,7 @@ func (s *formatSpecificTestSuite) TestDeleteUnreferencedBlobs(t *testing.T) {
nro.BlockFormat.HMACSecret = testHMACSecret
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()

View File

@@ -43,7 +43,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTime(t *testing.T) {
nro.RetentionPeriod = period
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
@@ -96,7 +96,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTimeDisabled(t *testing
nro.BlockFormat.HMACSecret = testHMACSecret
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()

View File

@@ -79,7 +79,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {
// run N sessions to create N individual pack blobs for each content prefix
for range tc.numPContents {
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{})
ow := w.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "%v", uuid.NewString())
_, err := ow.Result()
return err
@@ -88,7 +88,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {
for range tc.numQContents {
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k"})
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k", MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "%v", uuid.NewString())
_, err := ow.Result()
return err

View File

@@ -34,7 +34,7 @@ func (s *formatSpecificTestSuite) TestMaintenanceSafety(t *testing.T) {
// create object that's immediately orphaned since nobody refers to it.
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"})
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y", MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "hello world")
var err error
objectID, err = ow.Result()
@@ -43,7 +43,7 @@ func (s *formatSpecificTestSuite) TestMaintenanceSafety(t *testing.T) {
// create another object in separate pack.
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"})
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y", MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "hello universe")
_, err := ow.Result()
return err

View File

@@ -14,6 +14,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/content/index"
)
@@ -117,7 +118,8 @@ func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entri
mustSucceed(gz.Flush())
mustSucceed(gz.Close())
contentID, err := m.b.WriteContent(ctx, buf.Bytes(), ContentPrefix, content.NoCompression)
// TODO: Configure manifest metadata compression with Policy setting
contentID, err := m.b.WriteContent(ctx, buf.Bytes(), ContentPrefix, compression.HeaderZstdFastest)
if err != nil {
return nil, errors.Wrap(err, "unable to write content")
}

View File

@@ -70,6 +70,7 @@ func (om *Manager) NewWriter(ctx context.Context, opt WriterOptions) Writer {
w.description = opt.Description
w.prefix = opt.Prefix
w.compressor = compression.ByName[opt.Compressor]
w.metadataCompressor = compression.ByName[opt.MetadataCompressor]
w.totalLength = 0
w.currentPosition = 0
@@ -106,7 +107,7 @@ func (om *Manager) closedWriter(ow *objectWriter) {
// in parallel utilizing more CPU cores. Because some split points now start at fixed boundaries and not content-specific,
// this causes some slight loss of deduplication at concatenation points (typically 1-2 contents, usually <10MB),
// so this method should only be used for very large files where this overhead is relatively small.
func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error) {
func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, metadataComp compression.Name) (ID, error) {
if len(objectIDs) == 0 {
return EmptyID, errors.New("empty list of objects")
}
@@ -131,8 +132,10 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error)
log(ctx).Debugf("concatenated: %v total: %v", concatenatedEntries, totalLength)
w := om.NewWriter(ctx, WriterOptions{
Prefix: indirectContentPrefix,
Description: "CONCATENATED INDEX",
Prefix: indirectContentPrefix,
Description: "CONCATENATED INDEX",
Compressor: metadataComp,
MetadataCompressor: metadataComp,
})
defer w.Close() //nolint:errcheck

View File

@@ -88,7 +88,7 @@ func (f *fakeContentManager) ContentInfo(ctx context.Context, contentID content.
defer f.mu.Unlock()
if d, ok := f.data[contentID]; ok {
return content.Info{ContentID: contentID, PackedLength: uint32(len(d))}, nil
return content.Info{ContentID: contentID, PackedLength: uint32(len(d)), CompressionHeaderID: f.compresionIDs[contentID]}, nil
}
return content.Info{}, blob.ErrBlobNotFound
@@ -175,18 +175,43 @@ func TestCompression_ContentCompressionEnabled(t *testing.T) {
_, _, om := setupTest(t, cmap)
w := om.NewWriter(ctx, WriterOptions{
Compressor: "gzip",
Compressor: "gzip",
MetadataCompressor: "zstd-fastest",
})
w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000))
oid, err := w.Result()
require.NoError(t, err)
cid, isCompressed, ok := oid.ContentID()
require.True(t, ok)
require.False(t, isCompressed) // oid will not indicate compression
require.Equal(t, compression.ByName["gzip"].HeaderID(), cmap[cid])
}
func TestCompression_IndirectContentCompressionEnabledMetadata(t *testing.T) {
ctx := testlogging.Context(t)
cmap := map[content.ID]compression.HeaderID{}
_, _, om := setupTest(t, cmap)
w := om.NewWriter(ctx, WriterOptions{
Compressor: "gzip",
MetadataCompressor: "zstd-fastest",
})
w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000000))
oid, err := w.Result()
require.NoError(t, err)
verifyIndirectBlock(ctx, t, om, oid, compression.HeaderZstdFastest)
w2 := om.NewWriter(ctx, WriterOptions{
MetadataCompressor: "none",
})
w2.Write(bytes.Repeat([]byte{5, 6, 7, 8}, 1000000))
oid2, err2 := w2.Result()
require.NoError(t, err2)
verifyIndirectBlock(ctx, t, om, oid2, content.NoCompression)
}
func TestCompression_CustomSplitters(t *testing.T) {
cases := []struct {
wo WriterOptions
@@ -244,7 +269,8 @@ func TestCompression_ContentCompressionDisabled(t *testing.T) {
_, _, om := setupTest(t, nil)
w := om.NewWriter(ctx, WriterOptions{
Compressor: "gzip",
Compressor: "gzip",
MetadataCompressor: "zstd-fastest",
})
w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000))
oid, err := w.Result()
@@ -409,7 +435,7 @@ func verifyNoError(t *testing.T, err error) {
require.NoError(t, err)
}
func verifyIndirectBlock(ctx context.Context, t *testing.T, om *Manager, oid ID) {
func verifyIndirectBlock(ctx context.Context, t *testing.T, om *Manager, oid ID, expectedComp compression.HeaderID) {
t.Helper()
for indexContentID, isIndirect := oid.IndexObjectID(); isIndirect; indexContentID, isIndirect = indexContentID.IndexObjectID() {
@@ -418,6 +444,11 @@ func() {
if !c.HasPrefix() {
t.Errorf("expected base content ID to be prefixed, was %v", c)
}
info, err := om.contentMgr.ContentInfo(ctx, c)
if err != nil {
t.Errorf("error getting content info for %v", err.Error())
}
require.Equal(t, expectedComp, info.CompressionHeaderID)
}
rd, err := Open(ctx, om.contentMgr, indexContentID)
@@ -443,6 +474,7 @@ func TestIndirection(t *testing.T) {
dataLength int
expectedBlobCount int
expectedIndirection int
metadataCompressor compression.Name
}{
{dataLength: 200, expectedBlobCount: 1, expectedIndirection: 0},
{dataLength: 1000, expectedBlobCount: 1, expectedIndirection: 0},
@@ -452,15 +484,18 @@ func TestIndirection(t *testing.T) {
// 1 blob of 1000 zeros + 1 index blob
{dataLength: 4000, expectedBlobCount: 2, expectedIndirection: 1},
// 1 blob of 1000 zeros + 1 index blob
{dataLength: 10000, expectedBlobCount: 2, expectedIndirection: 1},
{dataLength: 10000, expectedBlobCount: 2, expectedIndirection: 1, metadataCompressor: "none"},
// 1 blob of 1000 zeros + 1 index blob, enabled metadata compression
{dataLength: 10000, expectedBlobCount: 2, expectedIndirection: 1, metadataCompressor: "zstd-fastest"},
}
for _, c := range cases {
data, _, om := setupTest(t, nil)
cmap := map[content.ID]compression.HeaderID{}
data, _, om := setupTest(t, cmap)
contentBytes := make([]byte, c.dataLength)
writer := om.NewWriter(ctx, WriterOptions{})
writer := om.NewWriter(ctx, WriterOptions{MetadataCompressor: c.metadataCompressor})
writer.(*objectWriter).splitter = splitterFactory()
if _, err := writer.Write(contentBytes); err != nil {
@@ -491,7 +526,11 @@ func TestIndirection(t *testing.T) {
t.Errorf("invalid blob count for %v, got %v, wanted %v", result, got, want)
}
verifyIndirectBlock(ctx, t, om, result)
expectedCompressor := content.NoCompression
if len(c.metadataCompressor) > 0 && c.metadataCompressor != "none" {
expectedCompressor = compression.ByName[c.metadataCompressor].HeaderID()
}
verifyIndirectBlock(ctx, t, om, result, expectedCompressor)
}
}
@@ -578,7 +617,7 @@ func TestConcatenate(t *testing.T) {
}
for _, tc := range cases {
concatenatedOID, err := om.Concatenate(ctx, tc.inputs)
concatenatedOID, err := om.Concatenate(ctx, tc.inputs, "zstd-fastest")
if err != nil {
t.Fatal(err)
}
@@ -617,7 +656,7 @@ func TestConcatenate(t *testing.T) {
}
// make sure results of concatenation can be further concatenated.
concatenated3OID, err := om.Concatenate(ctx, []ID{concatenatedOID, concatenatedOID, concatenatedOID})
concatenated3OID, err := om.Concatenate(ctx, []ID{concatenatedOID, concatenatedOID, concatenatedOID}, "zstd-fastest")
if err != nil {
t.Fatal(err)
}

View File

@@ -68,7 +68,8 @@ type objectWriter struct {
om *Manager
compressor compression.Compressor
compressor compression.Compressor
metadataCompressor compression.Compressor
prefix content.IDPrefix
buffer gather.WriteBuffer
@@ -197,6 +198,13 @@ func (w *objectWriter) prepareAndWriteContentChunk(chunkID int, data gather.Byte
objectComp = nil
}
// metadata objects are ALWAYS compressed at the content layer, irrespective of the index version (1 or 1+).
// even if a compressor for metadata objects is set by the caller, do not compress the objects at this layer;
// instead, let it be handled at the content layer.
if w.prefix != "" {
objectComp = nil
}
// contentBytes is what we're going to write to the content manager, it potentially uses bytes from b
contentBytes, isCompressed, err := maybeCompressedContentBytes(objectComp, data, &b)
if err != nil {
@@ -292,12 +300,13 @@ func (w *objectWriter) checkpointLocked() (ID, error) {
}
iw := &objectWriter{
ctx: w.ctx,
om: w.om,
compressor: nil,
description: "LIST(" + w.description + ")",
splitter: w.om.newDefaultSplitter(),
prefix: w.prefix,
ctx: w.ctx,
om: w.om,
compressor: w.metadataCompressor,
metadataCompressor: w.metadataCompressor,
description: "LIST(" + w.description + ")",
splitter: w.om.newDefaultSplitter(),
prefix: w.prefix,
}
if iw.prefix == "" {
@@ -334,9 +343,10 @@ func writeIndirectObject(w io.Writer, entries []IndirectObjectEntry) error {
// WriterOptions can be passed to Repository.NewWriter().
type WriterOptions struct {
Description string
Prefix content.IDPrefix // empty string or a single-character ('g'..'z')
Compressor compression.Name
Splitter string // use particular splitter instead of default
AsyncWrites int // allow up to N content writes to be asynchronous
Description string
Prefix content.IDPrefix // empty string or a single-character ('g'..'z')
Compressor compression.Name
MetadataCompressor compression.Name
Splitter string // use particular splitter instead of default
AsyncWrites int // allow up to N content writes to be asynchronous
}

View File

@@ -15,7 +15,7 @@ func BenchmarkWriterDedup1M(b *testing.B) {
ctx, env := repotesting.NewEnvironment(b, format.FormatVersion2)
dataBuf := make([]byte, 4<<20)
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
writer.Write(dataBuf)
_, err := writer.Result()
require.NoError(b, err)
@@ -25,7 +25,7 @@ func BenchmarkWriterDedup1M(b *testing.B) {
for range b.N {
// write exactly the same data
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
writer.Write(dataBuf)
writer.Result()
writer.Close()
@@ -45,7 +45,7 @@ func BenchmarkWriterNoDedup1M(b *testing.B) {
for i := range b.N {
// write exactly the same data
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
if i+chunkSize > len(dataBuf) {
chunkSize++

View File

@@ -14,6 +14,7 @@
"github.com/kopia/kopia/internal/metrics"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/throttling"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/content/indexblob"
"github.com/kopia/kopia/repo/format"
@@ -47,7 +48,7 @@ type RepositoryWriter interface {
Repository
NewObjectWriter(ctx context.Context, opt object.WriterOptions) object.Writer
ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error)
ConcatenateObjects(ctx context.Context, objectIDs []object.ID, opt ConcatenateOptions) (object.ID, error)
PutManifest(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error)
ReplaceManifests(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error)
DeleteManifest(ctx context.Context, id manifest.ID) error
@@ -184,10 +185,15 @@ func (r *directRepository) NewObjectWriter(ctx context.Context, opt object.Write
return r.omgr.NewWriter(ctx, opt)
}
// ConcatenateOptions describes options for concatenating objects.
type ConcatenateOptions struct {
Compressor compression.Name
}
// ConcatenateObjects creates a concatenated objects from the provided object IDs.
func (r *directRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) {
func (r *directRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID, opt ConcatenateOptions) (object.ID, error) {
//nolint:wrapcheck
return r.omgr.Concatenate(ctx, objectIDs)
return r.omgr.Concatenate(ctx, objectIDs, opt.Compressor)
}
// DisableIndexRefresh disables index refresh for the duration of the write session.

View File

@@ -47,7 +47,7 @@ func (s *formatSpecificTestSuite) TestWriters(t *testing.T) {
for _, c := range cases {
ctx, env := repotesting.NewEnvironment(t, s.formatVersion)
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
if _, err := writer.Write(c.data); err != nil {
t.Fatalf("write error: %v", err)
}
@@ -74,7 +74,7 @@ func (s *formatSpecificTestSuite) TestWriterCompleteChunkInTwoWrites(t *testing.
ctx, env := repotesting.NewEnvironment(t, s.formatVersion)
b := make([]byte, 100)
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
writer.Write(b[0:50])
writer.Write(b[0:50])
result, err := writer.Result()
@@ -159,7 +159,7 @@ func (s *formatSpecificTestSuite) TestHMAC(t *testing.T) {
c := bytes.Repeat([]byte{0xcd}, 50)
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
w.Write(c)
result, err := w.Result()
@@ -185,7 +185,7 @@ func (s *formatSpecificTestSuite) TestReaderStoredBlockNotFound(t *testing.T) {
func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, data []byte, testCaseID string) object.ID {
t.Helper()
w := rep.NewObjectWriter(ctx, object.WriterOptions{})
w := rep.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
if _, err := w.Write(data); err != nil {
t.Fatalf("can't write object %q - write failed: %v", testCaseID, err)
}
@@ -275,7 +275,7 @@ func TestFormats(t *testing.T) {
for k, v := range c.oids {
bytesToWrite := []byte(k)
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
w.Write(bytesToWrite)
oid, err := w.Result()
@@ -555,7 +555,7 @@ func TestObjectWritesWithRetention(t *testing.T) {
},
})
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
_, err := writer.Write([]byte("the quick brown fox jumps over the lazy dog"))
require.NoError(t, err)
@@ -774,7 +774,8 @@ func TestMetrics_CompressibleData(t *testing.T) {
for ensureMapEntry(t, env.RepositoryMetrics().Snapshot(false).Counters, "content_write_duration_nanos") < 5e6 {
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{
Compressor: "gzip",
Compressor: "gzip",
MetadataCompressor: "zstd-fastest",
})
w.Write(inputData)

View File

@@ -20,6 +20,11 @@ type CompressionPolicy struct {
MaxSize int64 `json:"maxSize,omitempty"`
}
// MetadataCompressionPolicy specifies compression policy for metadata.
type MetadataCompressionPolicy struct {
CompressorName compression.Name `json:"compressorName,omitempty"`
}
// CompressionPolicyDefinition specifies which policy definition provided the value of a particular field.
type CompressionPolicyDefinition struct {
CompressorName snapshot.SourceInfo `json:"compressorName,omitempty"`
@@ -29,6 +34,11 @@ type CompressionPolicyDefinition struct {
MaxSize snapshot.SourceInfo `json:"maxSize,omitempty"`
}
// MetadataCompressionPolicyDefinition specifies which policy definition provided the value of a particular field.
type MetadataCompressionPolicyDefinition struct {
CompressorName snapshot.SourceInfo `json:"compressorName,omitempty"`
}
// CompressorForFile returns compression name to be used for compressing a given file according to policy, using attributes such as name or size.
func (p *CompressionPolicy) CompressorForFile(e fs.Entry) compression.Name {
ext := filepath.Ext(e.Name())
@@ -67,6 +77,20 @@ func (p *CompressionPolicy) Merge(src CompressionPolicy, def *CompressionPolicyD
mergeStrings(&p.NeverCompress, &p.NoParentNeverCompress, src.NeverCompress, src.NoParentNeverCompress, &def.NeverCompress, si)
}
// Merge applies default values from the provided policy.
func (p *MetadataCompressionPolicy) Merge(src MetadataCompressionPolicy, def *MetadataCompressionPolicyDefinition, si snapshot.SourceInfo) {
mergeCompressionName(&p.CompressorName, src.CompressorName, &def.CompressorName, si)
}
// MetadataCompressor returns compression name to be used for according to policy.
func (p *MetadataCompressionPolicy) MetadataCompressor() compression.Name {
if p.CompressorName == "none" {
return ""
}
return p.CompressorName
}
func isInSortedSlice(s string, slice []string) bool {
x := sort.SearchStrings(slice, s)
return x < len(slice) && slice[x] == s

View File

@@ -21,33 +21,35 @@ type TargetWithPolicy struct {
// Policy describes snapshot policy for a single source.
type Policy struct {
Labels map[string]string `json:"-"`
RetentionPolicy RetentionPolicy `json:"retention,omitempty"`
FilesPolicy FilesPolicy `json:"files,omitempty"`
ErrorHandlingPolicy ErrorHandlingPolicy `json:"errorHandling,omitempty"`
SchedulingPolicy SchedulingPolicy `json:"scheduling,omitempty"`
CompressionPolicy CompressionPolicy `json:"compression,omitempty"`
SplitterPolicy SplitterPolicy `json:"splitter,omitempty"`
Actions ActionsPolicy `json:"actions,omitempty"`
OSSnapshotPolicy OSSnapshotPolicy `json:"osSnapshots,omitempty"`
LoggingPolicy LoggingPolicy `json:"logging,omitempty"`
UploadPolicy UploadPolicy `json:"upload,omitempty"`
NoParent bool `json:"noParent,omitempty"`
Labels map[string]string `json:"-"`
RetentionPolicy RetentionPolicy `json:"retention,omitempty"`
FilesPolicy FilesPolicy `json:"files,omitempty"`
ErrorHandlingPolicy ErrorHandlingPolicy `json:"errorHandling,omitempty"`
SchedulingPolicy SchedulingPolicy `json:"scheduling,omitempty"`
CompressionPolicy CompressionPolicy `json:"compression,omitempty"`
MetadataCompressionPolicy MetadataCompressionPolicy `json:"metadataCompression,omitempty"`
SplitterPolicy SplitterPolicy `json:"splitter,omitempty"`
Actions ActionsPolicy `json:"actions,omitempty"`
OSSnapshotPolicy OSSnapshotPolicy `json:"osSnapshots,omitempty"`
LoggingPolicy LoggingPolicy `json:"logging,omitempty"`
UploadPolicy UploadPolicy `json:"upload,omitempty"`
NoParent bool `json:"noParent,omitempty"`
}
// Definition corresponds 1:1 to Policy and each field specifies the snapshot.SourceInfo
// where a particular policy field was specified.
type Definition struct {
RetentionPolicy RetentionPolicyDefinition `json:"retention,omitempty"`
FilesPolicy FilesPolicyDefinition `json:"files,omitempty"`
ErrorHandlingPolicy ErrorHandlingPolicyDefinition `json:"errorHandling,omitempty"`
SchedulingPolicy SchedulingPolicyDefinition `json:"scheduling,omitempty"`
CompressionPolicy CompressionPolicyDefinition `json:"compression,omitempty"`
SplitterPolicy SplitterPolicyDefinition `json:"splitter,omitempty"`
Actions ActionsPolicyDefinition `json:"actions,omitempty"`
OSSnapshotPolicy OSSnapshotPolicyDefinition `json:"osSnapshots,omitempty"`
LoggingPolicy LoggingPolicyDefinition `json:"logging,omitempty"`
UploadPolicy UploadPolicyDefinition `json:"upload,omitempty"`
RetentionPolicy RetentionPolicyDefinition `json:"retention,omitempty"`
FilesPolicy FilesPolicyDefinition `json:"files,omitempty"`
ErrorHandlingPolicy ErrorHandlingPolicyDefinition `json:"errorHandling,omitempty"`
SchedulingPolicy SchedulingPolicyDefinition `json:"scheduling,omitempty"`
CompressionPolicy CompressionPolicyDefinition `json:"compression,omitempty"`
MetadataCompressionPolicy MetadataCompressionPolicyDefinition `json:"metadataCompression,omitempty"`
SplitterPolicy SplitterPolicyDefinition `json:"splitter,omitempty"`
Actions ActionsPolicyDefinition `json:"actions,omitempty"`
OSSnapshotPolicy OSSnapshotPolicyDefinition `json:"osSnapshots,omitempty"`
LoggingPolicy LoggingPolicyDefinition `json:"logging,omitempty"`
UploadPolicy UploadPolicyDefinition `json:"upload,omitempty"`
}
func (p *Policy) String() string {

View File

@@ -24,6 +24,7 @@ func MergePolicies(policies []*Policy, si snapshot.SourceInfo) (*Policy, *Defini
merged.SchedulingPolicy.Merge(p.SchedulingPolicy, &def.SchedulingPolicy, p.Target())
merged.UploadPolicy.Merge(p.UploadPolicy, &def.UploadPolicy, p.Target())
merged.CompressionPolicy.Merge(p.CompressionPolicy, &def.CompressionPolicy, p.Target())
merged.MetadataCompressionPolicy.Merge(p.MetadataCompressionPolicy, &def.MetadataCompressionPolicy, p.Target())
merged.SplitterPolicy.Merge(p.SplitterPolicy, &def.SplitterPolicy, p.Target())
merged.Actions.Merge(p.Actions, &def.Actions, p.Target())
merged.OSSnapshotPolicy.Merge(p.OSSnapshotPolicy, &def.OSSnapshotPolicy, p.Target())
@@ -41,6 +42,7 @@ func MergePolicies(policies []*Policy, si snapshot.SourceInfo) (*Policy, *Defini
merged.SchedulingPolicy.Merge(defaultSchedulingPolicy, &def.SchedulingPolicy, GlobalPolicySourceInfo)
merged.UploadPolicy.Merge(defaultUploadPolicy, &def.UploadPolicy, GlobalPolicySourceInfo)
merged.CompressionPolicy.Merge(defaultCompressionPolicy, &def.CompressionPolicy, GlobalPolicySourceInfo)
merged.MetadataCompressionPolicy.Merge(defaultMetadataCompressionPolicy, &def.MetadataCompressionPolicy, GlobalPolicySourceInfo)
merged.SplitterPolicy.Merge(defaultSplitterPolicy, &def.SplitterPolicy, GlobalPolicySourceInfo)
merged.Actions.Merge(defaultActionsPolicy, &def.Actions, GlobalPolicySourceInfo)
merged.OSSnapshotPolicy.Merge(defaultOSSnapshotPolicy, &def.OSSnapshotPolicy, GlobalPolicySourceInfo)

View File

@@ -12,6 +12,9 @@
defaultCompressionPolicy = CompressionPolicy{
CompressorName: "none",
}
defaultMetadataCompressionPolicy = MetadataCompressionPolicy{
CompressorName: "zstd-fastest",
}
defaultSplitterPolicy = SplitterPolicy{}
@@ -71,15 +74,16 @@
// DefaultPolicy is a default policy returned by policy tree in absence of other policies.
DefaultPolicy = &Policy{
FilesPolicy: defaultFilesPolicy,
RetentionPolicy: defaultRetentionPolicy,
CompressionPolicy: defaultCompressionPolicy,
ErrorHandlingPolicy: defaultErrorHandlingPolicy,
SchedulingPolicy: defaultSchedulingPolicy,
LoggingPolicy: defaultLoggingPolicy,
Actions: defaultActionsPolicy,
OSSnapshotPolicy: defaultOSSnapshotPolicy,
UploadPolicy: defaultUploadPolicy,
FilesPolicy: defaultFilesPolicy,
RetentionPolicy: defaultRetentionPolicy,
CompressionPolicy: defaultCompressionPolicy,
MetadataCompressionPolicy: defaultMetadataCompressionPolicy,
ErrorHandlingPolicy: defaultErrorHandlingPolicy,
SchedulingPolicy: defaultSchedulingPolicy,
LoggingPolicy: defaultLoggingPolicy,
Actions: defaultActionsPolicy,
OSSnapshotPolicy: defaultOSSnapshotPolicy,
UploadPolicy: defaultUploadPolicy,
}
// DefaultDefinition provides the Definition for the default policy.

View File

@@ -14,9 +14,11 @@
"github.com/kopia/kopia/internal/impossible"
"github.com/kopia/kopia/internal/workshare"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/logging"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
)
var dirRewriterLog = logging.Module("dirRewriter")
@@ -59,17 +61,18 @@ type DirRewriter struct {
}
type dirRewriterRequest struct {
ctx context.Context //nolint:containedctx
parentPath string
input *snapshot.DirEntry
result *snapshot.DirEntry
err error
ctx context.Context //nolint:containedctx
parentPath string
input *snapshot.DirEntry
result *snapshot.DirEntry
metadataCompression compression.Name
err error
}
func (rw *DirRewriter) processRequest(pool *workshare.Pool[*dirRewriterRequest], req *dirRewriterRequest) {
_ = pool
req.result, req.err = rw.getCachedReplacement(req.ctx, req.parentPath, req.input)
req.result, req.err = rw.getCachedReplacement(req.ctx, req.parentPath, req.input, req.metadataCompression)
}
func (rw *DirRewriter) getCacheKey(input *snapshot.DirEntry) dirRewriterCacheKey {
@@ -87,7 +90,7 @@ func (rw *DirRewriter) getCacheKey(input *snapshot.DirEntry) dirRewriterCacheKey
return out
}
func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath string, input *snapshot.DirEntry) (*snapshot.DirEntry, error) {
func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath string, input *snapshot.DirEntry, metadataComp compression.Name) (*snapshot.DirEntry, error) {
key := rw.getCacheKey(input)
// see if we already processed this exact directory entry
@@ -113,7 +116,7 @@ func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath stri
// the rewriter returned a directory, we must recursively process it.
if result.Type == snapshot.EntryTypeDirectory {
rep2, subdirErr := rw.processDirectory(ctx, parentPath, result)
rep2, subdirErr := rw.processDirectory(ctx, parentPath, result, metadataComp)
if rep2 == nil {
return nil, errors.Wrap(subdirErr, input.Name)
}
@@ -131,7 +134,7 @@ func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath stri
return result, nil
}
func (rw *DirRewriter) processDirectory(ctx context.Context, pathFromRoot string, entry *snapshot.DirEntry) (*snapshot.DirEntry, error) {
func (rw *DirRewriter) processDirectory(ctx context.Context, pathFromRoot string, entry *snapshot.DirEntry, metadataComp compression.Name) (*snapshot.DirEntry, error) {
dirRewriterLog(ctx).Debugw("processDirectory", "path", pathFromRoot)
r, err := rw.rep.OpenObject(ctx, entry.ObjectID)
@@ -145,10 +148,10 @@ func (rw *DirRewriter) processDirectory(ctx context.Context, pathFromRoot string
return rw.opts.OnDirectoryReadFailure(ctx, pathFromRoot, entry, errors.Wrap(err, "unable to read directory entries"))
}
return rw.processDirectoryEntries(ctx, pathFromRoot, entry, entries)
return rw.processDirectoryEntries(ctx, pathFromRoot, entry, entries, metadataComp)
}
func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath string, entry *snapshot.DirEntry, entries []*snapshot.DirEntry) (*snapshot.DirEntry, error) {
func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath string, entry *snapshot.DirEntry, entries []*snapshot.DirEntry, metadataComp compression.Name) (*snapshot.DirEntry, error) {
var (
builder DirManifestBuilder
wg workshare.AsyncGroup[*dirRewriterRequest]
@@ -165,6 +168,7 @@ func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath s
path.Join(parentPath, child.Name),
child,
nil,
metadataComp,
nil,
})
@@ -172,7 +176,7 @@ func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath s
}
// run in current goroutine
replacement, repErr := rw.getCachedReplacement(ctx, path.Join(parentPath, child.Name), child)
replacement, repErr := rw.getCachedReplacement(ctx, path.Join(parentPath, child.Name), child, metadataComp)
if repErr != nil {
return nil, errors.Wrap(repErr, child.Name)
}
@@ -194,7 +198,7 @@ func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath s
dm := builder.Build(entry.ModTime, entry.DirSummary.IncompleteReason)
oid, err := writeDirManifest(ctx, rw.rep, entry.ObjectID.String(), dm)
oid, err := writeDirManifest(ctx, rw.rep, entry.ObjectID.String(), dm, metadataComp)
if err != nil {
return nil, errors.Wrap(err, "unable to write directory manifest")
}
@@ -219,8 +223,8 @@ func (rw *DirRewriter) equalEntries(e1, e2 *snapshot.DirEntry) bool {
}
// RewriteSnapshotManifest rewrites the directory tree starting at a given manifest.
func (rw *DirRewriter) RewriteSnapshotManifest(ctx context.Context, man *snapshot.Manifest) (bool, error) {
newEntry, err := rw.getCachedReplacement(ctx, ".", man.RootEntry)
func (rw *DirRewriter) RewriteSnapshotManifest(ctx context.Context, man *snapshot.Manifest, metadataComp compression.Name) (bool, error) {
newEntry, err := rw.getCachedReplacement(ctx, ".", man.RootEntry, metadataComp)
if err != nil {
return false, errors.Wrapf(err, "error processing snapshot %v", man.ID)
}
@@ -273,7 +277,13 @@ func RewriteAsStub(rep repo.RepositoryWriter) RewriteFailedEntryCallback {
return nil, errors.Wrap(err, "error writing stub contents")
}
w := rep.NewObjectWriter(ctx, object.WriterOptions{})
pol, _, _, err := policy.GetEffectivePolicy(ctx, rep, policy.GlobalPolicySourceInfo)
if err != nil {
return nil, errors.Wrap(err, "error getting policy")
}
metadataCompressor := pol.MetadataCompressionPolicy.MetadataCompressor()
w := rep.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: metadataCompressor})
n, err := buf.WriteTo(w)
if err != nil {

View File

@@ -7,14 +7,17 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot"
)
func writeDirManifest(ctx context.Context, rep repo.RepositoryWriter, dirRelativePath string, dirManifest *snapshot.DirManifest) (object.ID, error) {
func writeDirManifest(ctx context.Context, rep repo.RepositoryWriter, dirRelativePath string, dirManifest *snapshot.DirManifest, metadataComp compression.Name) (object.ID, error) {
writer := rep.NewObjectWriter(ctx, object.WriterOptions{
Description: "DIR:" + dirRelativePath,
Prefix: objectIDPrefixDirectory,
Description: "DIR:" + dirRelativePath,
Prefix: objectIDPrefixDirectory,
Compressor: metadataComp,
MetadataCompressor: metadataComp,
})
defer writer.Close() //nolint:errcheck

View File

@@ -160,12 +160,13 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis
}
comp := pol.CompressionPolicy.CompressorForFile(f)
metadataComp := pol.MetadataCompressionPolicy.MetadataCompressor()
splitterName := pol.SplitterPolicy.SplitterForFile(f)
chunkSize := pol.UploadPolicy.ParallelUploadAboveSize.OrDefault(-1)
if chunkSize < 0 || f.Size() <= chunkSize {
// all data fits in 1 full chunks, upload directly
return u.uploadFileData(ctx, parentCheckpointRegistry, f, f.Name(), 0, -1, comp, splitterName)
return u.uploadFileData(ctx, parentCheckpointRegistry, f, f.Name(), 0, -1, comp, metadataComp, splitterName)
}
// we always have N+1 parts, first N are exactly chunkSize, last one has undetermined length
@@ -190,11 +191,11 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis
if wg.CanShareWork(u.workerPool) {
// another goroutine is available, delegate to them
wg.RunAsync(u.workerPool, func(_ *workshare.Pool[*uploadWorkItem], _ *uploadWorkItem) {
parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp, splitterName)
parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp, metadataComp, splitterName)
}, nil)
} else {
// just do the work in the current goroutine
parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp, splitterName)
parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp, metadataComp, splitterName)
}
}
@@ -205,10 +206,10 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis
return nil, errors.Wrap(err, "error uploading parts")
}
return concatenateParts(ctx, u.repo, f.Name(), parts)
return concatenateParts(ctx, u.repo, f.Name(), parts, metadataComp)
}
func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name string, parts []*snapshot.DirEntry) (*snapshot.DirEntry, error) {
func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name string, parts []*snapshot.DirEntry, metadataComp compression.Name) (*snapshot.DirEntry, error) {
var (
objectIDs []object.ID
totalSize int64
@@ -220,7 +221,7 @@ func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name strin
objectIDs = append(objectIDs, part.ObjectID)
}
resultObject, err := rep.ConcatenateObjects(ctx, objectIDs)
resultObject, err := rep.ConcatenateObjects(ctx, objectIDs, repo.ConcatenateOptions{Compressor: metadataComp})
if err != nil {
return nil, errors.Wrap(err, "concatenate")
}
@@ -233,7 +234,7 @@ func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name strin
return de, nil
}
func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, f fs.File, fname string, offset, length int64, compressor compression.Name, splitterName string) (*snapshot.DirEntry, error) {
func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, f fs.File, fname string, offset, length int64, compressor, metadataComp compression.Name, splitterName string) (*snapshot.DirEntry, error) {
file, err := f.Open(ctx)
if err != nil {
return nil, errors.Wrap(err, "unable to open file")
@@ -241,10 +242,11 @@ func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry
defer file.Close() //nolint:errcheck
writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{
Description: "FILE:" + fname,
Compressor: compressor,
Splitter: splitterName,
AsyncWrites: 1, // upload chunk in parallel to writing another chunk
Description: "FILE:" + fname,
Compressor: compressor,
MetadataCompressor: metadataComp,
Splitter: splitterName,
AsyncWrites: 1, // upload chunk in parallel to writing another chunk
})
defer writer.Close() //nolint:errcheck
@@ -297,7 +299,7 @@ func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry
return de, nil
}
func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath string, f fs.Symlink) (dirEntry *snapshot.DirEntry, ret error) {
func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath string, f fs.Symlink, metadataComp compression.Name) (dirEntry *snapshot.DirEntry, ret error) {
u.Progress.HashingFile(relativePath)
defer func() {
@@ -311,7 +313,8 @@ func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath strin
}
writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{
Description: "SYMLINK:" + f.Name(),
Description: "SYMLINK:" + f.Name(),
MetadataCompressor: metadataComp,
})
defer writer.Close() //nolint:errcheck
@@ -352,11 +355,13 @@ func (u *Uploader) uploadStreamingFileInternal(ctx context.Context, relativePath
}()
comp := pol.CompressionPolicy.CompressorForFile(f)
metadataComp := pol.MetadataCompressionPolicy.MetadataCompressor()
writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{
Description: "STREAMFILE:" + f.Name(),
Compressor: comp,
Splitter: pol.SplitterPolicy.SplitterForFile(f),
Description: "STREAMFILE:" + f.Name(),
Compressor: comp,
MetadataCompressor: metadataComp,
Splitter: pol.SplitterPolicy.SplitterForFile(f),
})
defer writer.Close() //nolint:errcheck
@@ -902,7 +907,8 @@ func (u *Uploader) processSingle(
return nil
case fs.Symlink:
de, err := u.uploadSymlinkInternal(ctx, entryRelativePath, entry)
childTree := policyTree.Child(entry.Name())
de, err := u.uploadSymlinkInternal(ctx, entryRelativePath, entry, childTree.EffectivePolicy().MetadataCompressionPolicy.MetadataCompressor())
return u.processEntryUploadResult(ctx, de, err, entryRelativePath, parentDirBuilder,
policyTree.EffectivePolicy().ErrorHandlingPolicy.IgnoreFileErrors.OrDefault(false),
@@ -1144,6 +1150,8 @@ func uploadDirInternal(
childCheckpointRegistry := &checkpointRegistry{}
metadataComp := policyTree.EffectivePolicy().MetadataCompressionPolicy.MetadataCompressor()
thisCheckpointRegistry.addCheckpointCallback(directory.Name(), func() (*snapshot.DirEntry, error) {
// when snapshotting the parent, snapshot all our children and tell them to populate
// childCheckpointBuilder
@@ -1156,7 +1164,7 @@ func uploadDirInternal(
checkpointManifest := thisCheckpointBuilder.Build(fs.UTCTimestampFromTime(directory.ModTime()), IncompleteReasonCheckpoint)
oid, err := writeDirManifest(ctx, u.repo, dirRelativePath, checkpointManifest)
oid, err := writeDirManifest(ctx, u.repo, dirRelativePath, checkpointManifest, metadataComp)
if err != nil {
return nil, errors.Wrap(err, "error writing dir manifest")
}
@@ -1171,7 +1179,7 @@ func uploadDirInternal(
dirManifest := thisDirBuilder.Build(fs.UTCTimestampFromTime(directory.ModTime()), u.incompleteReason())
oid, err := writeDirManifest(ctx, u.repo, dirRelativePath, dirManifest)
oid, err := writeDirManifest(ctx, u.repo, dirRelativePath, dirManifest, metadataComp)
if err != nil {
return nil, errors.Wrapf(err, "error writing dir manifest: %v", directory.Name())
}

View File

@@ -38,6 +38,8 @@
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob/filesystem"
bloblogging "github.com/kopia/kopia/repo/blob/logging"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/logging"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot"
@@ -228,6 +230,108 @@ func TestUpload(t *testing.T) {
}
}
type entry struct {
name string
objectID object.ID
}
// findAllEntries recursively iterates over all the dirs and returns list of file entries.
func findAllEntries(t *testing.T, ctx context.Context, dir fs.Directory) []entry {
t.Helper()
entries := []entry{}
fs.IterateEntries(ctx, dir, func(ctx context.Context, e fs.Entry) error {
oid, err := object.ParseID(e.(object.HasObjectID).ObjectID().String())
require.NoError(t, err)
entries = append(entries, entry{
name: e.Name(),
objectID: oid,
})
if e.IsDir() {
entries = append(entries, findAllEntries(t, ctx, e.(fs.Directory))...)
}
return nil
})
return entries
}
func verifyMetadataCompressor(t *testing.T, ctx context.Context, rep repo.Repository, entries []entry, comp compression.HeaderID) {
t.Helper()
for _, e := range entries {
cid, _, ok := e.objectID.ContentID()
require.True(t, ok)
if !cid.HasPrefix() {
continue
}
info, err := rep.ContentInfo(ctx, cid)
if err != nil {
t.Errorf("failed to get content info: %v", err)
}
require.Equal(t, comp, info.CompressionHeaderID)
}
}
func TestUploadMetadataCompression(t *testing.T) {
ctx := testlogging.Context(t)
t.Run("default metadata compression", func(t *testing.T) {
th := newUploadTestHarness(ctx, t)
defer th.cleanup()
u := NewUploader(th.repo)
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
if err != nil {
t.Errorf("Upload error: %v", err)
}
dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory)
entries := findAllEntries(t, ctx, dir)
verifyMetadataCompressor(t, ctx, th.repo, entries, compression.HeaderZstdFastest)
})
t.Run("disable metadata compression", func(t *testing.T) {
th := newUploadTestHarness(ctx, t)
defer th.cleanup()
u := NewUploader(th.repo)
policyTree := policy.BuildTree(map[string]*policy.Policy{
".": {
MetadataCompressionPolicy: policy.MetadataCompressionPolicy{
CompressorName: "none",
},
},
}, policy.DefaultPolicy)
s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
if err != nil {
t.Errorf("Upload error: %v", err)
}
dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory)
entries := findAllEntries(t, ctx, dir)
verifyMetadataCompressor(t, ctx, th.repo, entries, content.NoCompression)
})
t.Run("set metadata compressor", func(t *testing.T) {
th := newUploadTestHarness(ctx, t)
defer th.cleanup()
u := NewUploader(th.repo)
policyTree := policy.BuildTree(map[string]*policy.Policy{
".": {
MetadataCompressionPolicy: policy.MetadataCompressionPolicy{
CompressorName: "gzip",
},
},
}, policy.DefaultPolicy)
s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
if err != nil {
t.Errorf("Upload error: %v", err)
}
dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory)
entries := findAllEntries(t, ctx, dir)
verifyMetadataCompressor(t, ctx, th.repo, entries, compression.ByName["gzip"].HeaderID())
})
}
func TestUpload_TopLevelDirectoryReadFailure(t *testing.T) {
ctx := testlogging.Context(t)
th := newUploadTestHarness(ctx, t)
@@ -816,14 +920,14 @@ func TestUpload_VirtualDirectoryWithStreamingFile(t *testing.T) {
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
// Create a temporary pipe file with test data
content := []byte("Streaming Temporary file content")
tmpContent := []byte("Streaming Temporary file content")
r, w, err := os.Pipe()
if err != nil {
t.Fatalf("error creating pipe file: %v", err)
}
if _, err = w.Write(content); err != nil {
if _, err = w.Write(tmpContent); err != nil {
t.Fatalf("error writing to pipe file: %v", err)
}
@@ -873,8 +977,8 @@ func TestUpload_VirtualDirectoryWithStreamingFile_WithCompression(t *testing.T)
// Create a temporary file with test data. Want something compressible but
// small so we don't trigger dedupe.
content := []byte(strings.Repeat("a", 4096))
r := io.NopCloser(bytes.NewReader(content))
tmpContent := []byte(strings.Repeat("a", 4096))
r := io.NopCloser(bytes.NewReader(tmpContent))
staticRoot := virtualfs.NewStaticDirectory("rootdir", []fs.Entry{
virtualfs.StreamingFileFromReader("stream-file", r),
@@ -895,7 +999,7 @@ func TestUpload_VirtualDirectoryWithStreamingFile_WithCompression(t *testing.T)
}
func TestUpload_VirtualDirectoryWithStreamingFileWithModTime(t *testing.T) {
content := []byte("Streaming Temporary file content")
tmpContent := []byte("Streaming Temporary file content")
mt := time.Date(2021, 1, 2, 3, 4, 5, 0, time.UTC)
cases := []struct {
@@ -907,7 +1011,7 @@ func TestUpload_VirtualDirectoryWithStreamingFileWithModTime(t *testing.T) {
{
desc: "CurrentTime",
getFile: func() fs.StreamingFile {
return virtualfs.StreamingFileFromReader("a", io.NopCloser(bytes.NewReader(content)))
return virtualfs.StreamingFileFromReader("a", io.NopCloser(bytes.NewReader(tmpContent)))
},
cachedFiles: 0,
uploadedFiles: 1,
@@ -915,7 +1019,7 @@ func TestUpload_VirtualDirectoryWithStreamingFileWithModTime(t *testing.T) {
{
desc: "FixedTime",
getFile: func() fs.StreamingFile {
return virtualfs.StreamingFileWithModTimeFromReader("a", mt, io.NopCloser(bytes.NewReader(content)))
return virtualfs.StreamingFileWithModTimeFromReader("a", mt, io.NopCloser(bytes.NewReader(tmpContent)))
},
cachedFiles: 1,
uploadedFiles: 0,
@@ -944,7 +1048,7 @@ func TestUpload_VirtualDirectoryWithStreamingFileWithModTime(t *testing.T) {
require.Equal(t, int32(1), atomic.LoadInt32(&man1.Stats.NonCachedFiles))
require.Equal(t, int32(1), atomic.LoadInt32(&man1.Stats.TotalDirectoryCount))
require.Equal(t, int32(1), atomic.LoadInt32(&man1.Stats.TotalFileCount))
require.Equal(t, int64(len(content)), atomic.LoadInt64(&man1.Stats.TotalFileSize))
require.Equal(t, int64(len(tmpContent)), atomic.LoadInt64(&man1.Stats.TotalFileSize))
// wait a little bit to ensure clock moves forward which is not always the case on Windows.
time.Sleep(100 * time.Millisecond)
@@ -963,7 +1067,7 @@ func TestUpload_VirtualDirectoryWithStreamingFileWithModTime(t *testing.T) {
assert.Equal(t, tc.uploadedFiles, atomic.LoadInt32(&man2.Stats.NonCachedFiles))
// Cached files don't count towards the total file count.
assert.Equal(t, tc.uploadedFiles, atomic.LoadInt32(&man2.Stats.TotalFileCount))
require.Equal(t, int64(len(content)), atomic.LoadInt64(&man2.Stats.TotalFileSize))
require.Equal(t, int64(len(tmpContent)), atomic.LoadInt64(&man2.Stats.TotalFileSize))
})
}
}