mirror of
https://github.com/kopia/kopia.git
synced 2025-12-23 22:57:50 -05:00
refactor(repository): extract parts repo/content into packages (#2651)
- repolog package - blobcrypto package - indexblob package Minor cleanups: - removed dead code - introduced New*() methods for object construction
This commit is contained in:
@@ -5,9 +5,10 @@
|
||||
"strings"
|
||||
|
||||
"github.com/kopia/kopia/internal/epoch"
|
||||
"github.com/kopia/kopia/internal/repolog"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
)
|
||||
|
||||
type commandBlobList struct {
|
||||
@@ -57,7 +58,7 @@ func (c *commandBlobList) run(ctx context.Context, rep repo.DirectRepository) er
|
||||
|
||||
func (c *commandBlobList) shouldInclude(b blob.Metadata) bool {
|
||||
if c.dataOnly {
|
||||
if strings.HasPrefix(string(b.BlobID), content.LegacyIndexBlobPrefix) {
|
||||
if strings.HasPrefix(string(b.BlobID), indexblob.V0IndexBlobPrefix) {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -65,7 +66,7 @@ func (c *commandBlobList) shouldInclude(b blob.Metadata) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
if strings.HasPrefix(string(b.BlobID), content.TextLogBlobPrefix) {
|
||||
if strings.HasPrefix(string(b.BlobID), repolog.BlobPrefix) {
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
@@ -8,11 +8,11 @@
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobcrypto"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/iocopy"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
)
|
||||
|
||||
type commandBlobShow struct {
|
||||
@@ -57,7 +57,7 @@ func (c *commandBlobShow) maybeDecryptBlob(ctx context.Context, w io.Writer, rep
|
||||
var tmp gather.WriteBuffer
|
||||
defer tmp.Close()
|
||||
|
||||
if err := content.DecryptBLOB(rep.ContentReader().ContentFormat(), b, blobID, &tmp); err != nil {
|
||||
if err := blobcrypto.Decrypt(rep.ContentReader().ContentFormat(), b, blobID, &tmp); err != nil {
|
||||
return errors.Wrap(err, "error decrypting blob")
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
)
|
||||
|
||||
type commandIndexInspect struct {
|
||||
@@ -81,7 +82,7 @@ func (c *commandIndexInspect) inspectAllBlobs(ctx context.Context, rep repo.Dire
|
||||
return errors.Wrap(err, "error listing index blobs")
|
||||
}
|
||||
|
||||
indexesCh := make(chan content.IndexBlobInfo, len(indexes))
|
||||
indexesCh := make(chan indexblob.Metadata, len(indexes))
|
||||
for _, bm := range indexes {
|
||||
indexesCh <- bm
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
)
|
||||
|
||||
type commandIndexOptimize struct {
|
||||
@@ -36,7 +36,7 @@ func (c *commandIndexOptimize) runOptimizeCommand(ctx context.Context, rep repo.
|
||||
return err
|
||||
}
|
||||
|
||||
opt := content.CompactOptions{
|
||||
opt := indexblob.CompactOptions{
|
||||
MaxSmallBlobs: c.optimizeMaxSmallBlobs,
|
||||
AllIndexes: c.optimizeAllIndexes,
|
||||
DropContents: contentIDs,
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
)
|
||||
|
||||
type commandIndexRecover struct {
|
||||
@@ -60,7 +61,7 @@ func (c *commandIndexRecover) run(ctx context.Context, rep repo.DirectRepository
|
||||
}()
|
||||
|
||||
if c.deleteIndexes {
|
||||
if err := rep.BlobReader().ListBlobs(ctx, content.LegacyIndexBlobPrefix, func(bm blob.Metadata) error {
|
||||
if err := rep.BlobReader().ListBlobs(ctx, indexblob.V0IndexBlobPrefix, func(bm blob.Metadata) error {
|
||||
if c.commit {
|
||||
log(ctx).Infof("deleting old index blob: %v", bm.BlobID)
|
||||
return errors.Wrap(rep.BlobStorage().DeleteBlob(ctx, bm.BlobID), "error deleting index blob")
|
||||
|
||||
@@ -11,8 +11,8 @@
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/repolog"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
)
|
||||
|
||||
type logSessionInfo struct {
|
||||
@@ -70,7 +70,7 @@ func getLogSessions(ctx context.Context, st blob.Reader) ([]*logSessionInfo, err
|
||||
|
||||
var allSessions []*logSessionInfo
|
||||
|
||||
if err := st.ListBlobs(ctx, content.TextLogBlobPrefix, func(bm blob.Metadata) error {
|
||||
if err := st.ListBlobs(ctx, repolog.BlobPrefix, func(bm blob.Metadata) error {
|
||||
parts := strings.Split(string(bm.BlobID), "_")
|
||||
|
||||
//nolint:gomnd
|
||||
|
||||
@@ -5,9 +5,9 @@
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobcrypto"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
)
|
||||
|
||||
type commandLogsShow struct {
|
||||
@@ -70,7 +70,7 @@ func (c *commandLogsShow) run(ctx context.Context, rep repo.DirectRepository) er
|
||||
return errors.Wrap(err, "error getting log")
|
||||
}
|
||||
|
||||
if err := content.DecryptBLOB(rep.ContentReader().ContentFormat(), data.Bytes(), bm.BlobID, &decrypted); err != nil {
|
||||
if err := blobcrypto.Decrypt(rep.ContentReader().ContentFormat(), data.Bytes(), bm.BlobID, &decrypted); err != nil {
|
||||
return errors.Wrap(err, "error decrypting log")
|
||||
}
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
"github.com/kopia/kopia/repo/content/index"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
"github.com/kopia/kopia/repo/format"
|
||||
)
|
||||
|
||||
@@ -98,7 +99,7 @@ func assign(iif content.Info, i int, m map[content.ID][2]index.Info) {
|
||||
}
|
||||
|
||||
// loadIndexBlobs load index blobs into indexEntries map. indexEntries map will allow comparison betweel two indexes (index at which == 0 and index at which == 1).
|
||||
func loadIndexBlobs(ctx context.Context, indexEntries map[content.ID][2]index.Info, sm *content.SharedManager, which int, indexBlobInfos []content.IndexBlobInfo) error {
|
||||
func loadIndexBlobs(ctx context.Context, indexEntries map[content.ID][2]index.Info, sm *content.SharedManager, which int, indexBlobInfos []indexblob.Metadata) error {
|
||||
d := gather.WriteBuffer{}
|
||||
|
||||
for _, indexBlobInfo := range indexBlobInfos {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
package content
|
||||
// Package blobcrypto performs whole-blob crypto operations.
|
||||
package blobcrypto
|
||||
|
||||
import (
|
||||
"crypto/aes"
|
||||
@@ -13,8 +14,8 @@
|
||||
"github.com/kopia/kopia/repo/hashing"
|
||||
)
|
||||
|
||||
// crypter ecapsulates hashing and encryption.
|
||||
type crypter interface {
|
||||
// Crypter ecapsulates hashing and encryption.
|
||||
type Crypter interface {
|
||||
HashFunc() hashing.HashFunc
|
||||
Encryptor() encryption.Encryptor
|
||||
}
|
||||
@@ -38,16 +39,16 @@ func getIndexBlobIV(s blob.ID) ([]byte, error) {
|
||||
return v, nil
|
||||
}
|
||||
|
||||
// EncryptBLOB encrypts the given data using crypter-defined key and returns a name that should
|
||||
// Encrypt encrypts the given data using crypter-defined key and returns a name that should
|
||||
// be used to save the blob in thre repository.
|
||||
func EncryptBLOB(c crypter, payload gather.Bytes, prefix blob.ID, sessionID SessionID, output *gather.WriteBuffer) (blob.ID, error) {
|
||||
func Encrypt(c Crypter, payload gather.Bytes, prefix, suffix blob.ID, output *gather.WriteBuffer) (blob.ID, error) {
|
||||
var hashOutput [hashing.MaxHashSize]byte
|
||||
|
||||
hash := c.HashFunc()(hashOutput[:0], payload)
|
||||
blobID := prefix + blob.ID(hex.EncodeToString(hash))
|
||||
|
||||
if sessionID != "" {
|
||||
blobID += blob.ID("-" + sessionID)
|
||||
if suffix != "" {
|
||||
blobID += "-" + suffix
|
||||
}
|
||||
|
||||
iv, err := getIndexBlobIV(blobID)
|
||||
@@ -64,8 +65,8 @@ func EncryptBLOB(c crypter, payload gather.Bytes, prefix blob.ID, sessionID Sess
|
||||
return blobID, nil
|
||||
}
|
||||
|
||||
// DecryptBLOB decrypts the provided data using provided blobID to derive initialization vector.
|
||||
func DecryptBLOB(c crypter, payload gather.Bytes, blobID blob.ID, output *gather.WriteBuffer) error {
|
||||
// Decrypt decrypts the provided data using provided blobID to derive initialization vector.
|
||||
func Decrypt(c Crypter, payload gather.Bytes, blobID blob.ID, output *gather.WriteBuffer) error {
|
||||
iv, err := getIndexBlobIV(blobID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to get index blob IV")
|
||||
@@ -1,4 +1,4 @@
|
||||
package content
|
||||
package blobcrypto
|
||||
|
||||
import (
|
||||
"strings"
|
||||
@@ -30,31 +30,31 @@ func TestBlobCrypto(t *testing.T) {
|
||||
defer tmp2.Close()
|
||||
defer tmp3.Close()
|
||||
|
||||
id, err := EncryptBLOB(cr, gather.FromSlice([]byte{1, 2, 3}), "n", "mysessionid", &tmp)
|
||||
id, err := Encrypt(cr, gather.FromSlice([]byte{1, 2, 3}), "n", "mysessionid", &tmp)
|
||||
require.NoError(t, err)
|
||||
|
||||
id2, err := EncryptBLOB(cr, gather.FromSlice([]byte{1, 2, 4}), "n", "mysessionid", &tmp2)
|
||||
id2, err := Encrypt(cr, gather.FromSlice([]byte{1, 2, 4}), "n", "mysessionid", &tmp2)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NotEqual(t, id, id2)
|
||||
|
||||
require.NoError(t, DecryptBLOB(cr, tmp.Bytes(), id, &tmp3))
|
||||
require.NoError(t, Decrypt(cr, tmp.Bytes(), id, &tmp3))
|
||||
require.Equal(t, []byte{1, 2, 3}, tmp3.ToByteSlice())
|
||||
require.NoError(t, DecryptBLOB(cr, tmp2.Bytes(), id2, &tmp3))
|
||||
require.NoError(t, Decrypt(cr, tmp2.Bytes(), id2, &tmp3))
|
||||
require.Equal(t, []byte{1, 2, 4}, tmp3.ToByteSlice())
|
||||
|
||||
// decrypting using invalid ID fails
|
||||
require.Error(t, DecryptBLOB(cr, tmp.Bytes(), id2, &tmp3))
|
||||
require.Error(t, DecryptBLOB(cr, tmp2.Bytes(), id, &tmp3))
|
||||
require.Error(t, Decrypt(cr, tmp.Bytes(), id2, &tmp3))
|
||||
require.Error(t, Decrypt(cr, tmp2.Bytes(), id, &tmp3))
|
||||
|
||||
require.True(t, strings.HasPrefix(string(id), "n"))
|
||||
require.True(t, strings.HasSuffix(string(id), "-mysessionid"), id)
|
||||
|
||||
// negative cases
|
||||
require.Error(t, DecryptBLOB(cr, tmp.Bytes(), "invalid-blob-id", &tmp3))
|
||||
require.Error(t, DecryptBLOB(cr, tmp.Bytes(), "zzz0123456789abcdef0123456789abcde-suffix", &tmp3))
|
||||
require.Error(t, DecryptBLOB(cr, tmp.Bytes(), id2, &tmp3))
|
||||
require.Error(t, DecryptBLOB(cr, gather.FromSlice([]byte{2, 3, 4}), id, &tmp2))
|
||||
require.Error(t, Decrypt(cr, tmp.Bytes(), "invalid-blob-id", &tmp3))
|
||||
require.Error(t, Decrypt(cr, tmp.Bytes(), "zzz0123456789abcdef0123456789abcde-suffix", &tmp3))
|
||||
require.Error(t, Decrypt(cr, tmp.Bytes(), id2, &tmp3))
|
||||
require.Error(t, Decrypt(cr, gather.FromSlice([]byte{2, 3, 4}), id, &tmp2))
|
||||
}
|
||||
|
||||
type badEncryptor struct{}
|
||||
@@ -83,7 +83,7 @@ func(output []byte, data gather.Bytes) []byte {
|
||||
defer tmp2.Close()
|
||||
defer tmp3.Close()
|
||||
|
||||
_, err := EncryptBLOB(cr, gather.FromSlice([]byte{1, 2, 3}), "n", "mysessionid", &tmp)
|
||||
_, err := Encrypt(cr, gather.FromSlice([]byte{1, 2, 3}), "n", "mysessionid", &tmp)
|
||||
require.Error(t, err)
|
||||
|
||||
f := &format.ContentFormat{
|
||||
@@ -97,6 +97,19 @@ func(output []byte, data gather.Bytes) []byte {
|
||||
|
||||
cr.h = hf
|
||||
|
||||
_, err = EncryptBLOB(cr, gather.FromSlice([]byte{1, 2, 3}), "n", "mysessionid", &tmp)
|
||||
_, err = Encrypt(cr, gather.FromSlice([]byte{1, 2, 3}), "n", "mysessionid", &tmp)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
type staticCrypter struct {
|
||||
h hashing.HashFunc
|
||||
e encryption.Encryptor
|
||||
}
|
||||
|
||||
func (p staticCrypter) Encryptor() encryption.Encryptor {
|
||||
return p.e
|
||||
}
|
||||
|
||||
func (p staticCrypter) HashFunc() hashing.HashFunc {
|
||||
return p.h
|
||||
}
|
||||
119
internal/repolog/internal_logger.go
Normal file
119
internal/repolog/internal_logger.go
Normal file
@@ -0,0 +1,119 @@
|
||||
package repolog
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
)
|
||||
|
||||
// internalLogger represents a single log session that saves log files as blobs in the repository.
|
||||
// The logger starts disabled and to actually persist logs enable() must be called.
|
||||
type internalLogger struct {
|
||||
nextChunkNumber atomic.Int32
|
||||
|
||||
m *LogManager
|
||||
mu sync.Mutex
|
||||
|
||||
// +checklocks:mu
|
||||
buf *gather.WriteBuffer
|
||||
// +checklocks:mu
|
||||
gzw *gzip.Writer
|
||||
|
||||
// +checklocks:mu
|
||||
startTime int64 // unix timestamp of the first log
|
||||
|
||||
prefix blob.ID // +checklocksignore
|
||||
}
|
||||
|
||||
func (l *internalLogger) Write(b []byte) (int, error) {
|
||||
l.maybeEncryptAndWriteChunkUnlocked(l.addAndMaybeFlush(b))
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
func (l *internalLogger) maybeEncryptAndWriteChunkUnlocked(data gather.Bytes, closeFunc func()) {
|
||||
if data.Length() == 0 {
|
||||
closeFunc()
|
||||
return
|
||||
}
|
||||
|
||||
if !l.m.enabled.Load() {
|
||||
closeFunc()
|
||||
return
|
||||
}
|
||||
|
||||
endTime := l.m.timeFunc().Unix()
|
||||
|
||||
l.mu.Lock()
|
||||
st := l.startTime
|
||||
l.mu.Unlock()
|
||||
|
||||
prefix := blob.ID(fmt.Sprintf("%v_%v_%v_%v_", l.prefix, st, endTime, l.nextChunkNumber.Add(1)))
|
||||
|
||||
l.m.encryptAndWriteLogBlob(prefix, data, closeFunc)
|
||||
}
|
||||
|
||||
func (l *internalLogger) addAndMaybeFlush(b []byte) (payload gather.Bytes, closeFunc func()) {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
w := l.ensureWriterInitializedLocked()
|
||||
|
||||
_, err := w.Write(b)
|
||||
l.logUnexpectedError(err)
|
||||
|
||||
if l.buf.Length() < l.m.flushThreshold {
|
||||
return gather.Bytes{}, func() {}
|
||||
}
|
||||
|
||||
return l.flushAndResetLocked()
|
||||
}
|
||||
|
||||
// +checklocks:l.mu
|
||||
func (l *internalLogger) ensureWriterInitializedLocked() io.Writer {
|
||||
if l.gzw == nil {
|
||||
l.buf = gather.NewWriteBuffer()
|
||||
l.gzw = gzip.NewWriter(l.buf)
|
||||
l.startTime = l.m.timeFunc().Unix()
|
||||
}
|
||||
|
||||
return l.gzw
|
||||
}
|
||||
|
||||
// +checklocks:l.mu
|
||||
func (l *internalLogger) flushAndResetLocked() (payload gather.Bytes, closeFunc func()) {
|
||||
if l.gzw == nil {
|
||||
return gather.Bytes{}, func() {}
|
||||
}
|
||||
|
||||
l.logUnexpectedError(l.gzw.Flush())
|
||||
l.logUnexpectedError(l.gzw.Close())
|
||||
|
||||
closeBuf := l.buf.Close
|
||||
res := l.buf.Bytes()
|
||||
|
||||
l.buf = nil
|
||||
l.gzw = nil
|
||||
|
||||
return res, closeBuf
|
||||
}
|
||||
|
||||
func (l *internalLogger) logUnexpectedError(err error) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (l *internalLogger) Sync() error {
|
||||
l.mu.Lock()
|
||||
data, closeFunc := l.flushAndResetLocked()
|
||||
l.mu.Unlock()
|
||||
|
||||
l.maybeEncryptAndWriteChunkUnlocked(data, closeFunc)
|
||||
|
||||
return nil
|
||||
}
|
||||
112
internal/repolog/log_manager.go
Normal file
112
internal/repolog/log_manager.go
Normal file
@@ -0,0 +1,112 @@
|
||||
// Package repolog manages logs in the repository.
|
||||
package repolog
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobcrypto"
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/zaplogutil"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
)
|
||||
|
||||
const blobLoggerFlushThreshold = 4 << 20
|
||||
|
||||
// BlobPrefix is a prefix given to text logs stored in repository.
|
||||
const BlobPrefix = "_log_"
|
||||
|
||||
// LogManager manages writing encrypted log blobs to the repository.
|
||||
type LogManager struct {
|
||||
enabled atomic.Bool // set by enable(), logger is ineffective until called
|
||||
|
||||
// InternalLogManager implements io.Writer and we must be able to write to the
|
||||
// repository asynchronously when the context is not provided.
|
||||
ctx context.Context //nolint:containedctx
|
||||
|
||||
st blob.Storage
|
||||
bc blobcrypto.Crypter
|
||||
wg sync.WaitGroup
|
||||
timeFunc func() time.Time
|
||||
flushThreshold int
|
||||
}
|
||||
|
||||
// Close closes the log manager.
|
||||
func (m *LogManager) Close(ctx context.Context) {
|
||||
m.wg.Wait()
|
||||
}
|
||||
|
||||
func (m *LogManager) encryptAndWriteLogBlob(prefix blob.ID, data gather.Bytes, closeFunc func()) {
|
||||
encrypted := gather.NewWriteBuffer()
|
||||
// Close happens in a goroutine
|
||||
|
||||
blobID, err := blobcrypto.Encrypt(m.bc, data, prefix, "", encrypted)
|
||||
if err != nil {
|
||||
encrypted.Close()
|
||||
|
||||
// this should not happen, also nothing can be done about this, we're not in a place where we can return error, log it.
|
||||
return
|
||||
}
|
||||
|
||||
b := encrypted.Bytes()
|
||||
|
||||
m.wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer m.wg.Done()
|
||||
defer encrypted.Close()
|
||||
defer closeFunc()
|
||||
|
||||
if err := m.st.PutBlob(m.ctx, blobID, b, blob.PutOptions{}); err != nil {
|
||||
// nothing can be done about this, we're not in a place where we can return error, log it.
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// NewLogger creates new logger.
|
||||
func (m *LogManager) NewLogger() *zap.SugaredLogger {
|
||||
var rnd [2]byte
|
||||
|
||||
rand.Read(rnd[:]) //nolint:errcheck
|
||||
|
||||
w := &internalLogger{
|
||||
m: m,
|
||||
prefix: blob.ID(fmt.Sprintf("%v%v_%x", BlobPrefix, clock.Now().Local().Format("20060102150405"), rnd)),
|
||||
}
|
||||
|
||||
return zap.New(zapcore.NewCore(
|
||||
zaplogutil.NewStdConsoleEncoder(zaplogutil.StdConsoleEncoderConfig{
|
||||
TimeLayout: zaplogutil.PreciseLayout,
|
||||
LocalTime: false,
|
||||
}),
|
||||
w, zap.DebugLevel), zap.WithClock(zaplogutil.Clock())).Sugar()
|
||||
}
|
||||
|
||||
// Enable enables writing any buffered logs to repository.
|
||||
func (m *LogManager) Enable() {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.enabled.Store(true)
|
||||
}
|
||||
|
||||
// NewLogManager creates a new LogManager that will emit logs as repository blobs.
|
||||
func NewLogManager(ctx context.Context, st blob.Storage, bc blobcrypto.Crypter) *LogManager {
|
||||
return &LogManager{
|
||||
ctx: ctx,
|
||||
st: st,
|
||||
bc: bc,
|
||||
flushThreshold: blobLoggerFlushThreshold,
|
||||
timeFunc: clock.Now,
|
||||
}
|
||||
}
|
||||
@@ -18,11 +18,13 @@
|
||||
"github.com/kopia/kopia/internal/listcache"
|
||||
"github.com/kopia/kopia/internal/metrics"
|
||||
"github.com/kopia/kopia/internal/ownwrites"
|
||||
"github.com/kopia/kopia/internal/repolog"
|
||||
"github.com/kopia/kopia/internal/timetrack"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/blob/filesystem"
|
||||
"github.com/kopia/kopia/repo/blob/sharded"
|
||||
"github.com/kopia/kopia/repo/compression"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
"github.com/kopia/kopia/repo/format"
|
||||
"github.com/kopia/kopia/repo/hashing"
|
||||
"github.com/kopia/kopia/repo/logging"
|
||||
@@ -45,9 +47,9 @@
|
||||
|
||||
//nolint:gochecknoglobals
|
||||
var cachedIndexBlobPrefixes = []blob.ID{
|
||||
LegacyIndexBlobPrefix,
|
||||
compactionLogBlobPrefix,
|
||||
cleanupBlobPrefix,
|
||||
indexblob.V0IndexBlobPrefix,
|
||||
indexblob.V0CompactionLogBlobPrefix,
|
||||
indexblob.V0CleanupBlobPrefix,
|
||||
|
||||
epoch.UncompactedIndexBlobPrefix,
|
||||
epoch.EpochMarkerIndexBlobPrefix,
|
||||
@@ -57,7 +59,7 @@
|
||||
|
||||
//nolint:gochecknoglobals
|
||||
var allIndexBlobPrefixes = []blob.ID{
|
||||
LegacyIndexBlobPrefix,
|
||||
indexblob.V0IndexBlobPrefix,
|
||||
epoch.UncompactedIndexBlobPrefix,
|
||||
epoch.SingleEpochCompactionBlobPrefix,
|
||||
epoch.RangeCheckpointIndexBlobPrefix,
|
||||
@@ -65,16 +67,7 @@
|
||||
|
||||
// IndexBlobReader provides an API for reading index blobs.
|
||||
type IndexBlobReader interface {
|
||||
ListIndexBlobInfos(context.Context) ([]IndexBlobInfo, time.Time, error)
|
||||
}
|
||||
|
||||
// indexBlobManager is the API of index blob manager as used by content manager.
|
||||
type indexBlobManager interface {
|
||||
writeIndexBlobs(ctx context.Context, data []gather.Bytes, sessionID SessionID) ([]blob.Metadata, error)
|
||||
listActiveIndexBlobs(ctx context.Context) ([]IndexBlobInfo, time.Time, error)
|
||||
compact(ctx context.Context, opts CompactOptions) error
|
||||
flushCache(ctx context.Context)
|
||||
invalidate(ctx context.Context)
|
||||
ListIndexBlobInfos(context.Context) ([]indexblob.Metadata, time.Time, error)
|
||||
}
|
||||
|
||||
// SharedManager is responsible for read-only access to committed data.
|
||||
@@ -82,14 +75,13 @@ type SharedManager struct {
|
||||
Stats *Stats
|
||||
st blob.Storage
|
||||
|
||||
indexBlobManagerV0 *indexBlobManagerV0
|
||||
indexBlobManagerV1 *indexBlobManagerV1
|
||||
indexBlobManagerV0 *indexblob.ManagerV0
|
||||
indexBlobManagerV1 *indexblob.ManagerV1
|
||||
|
||||
contentCache cache.ContentCache
|
||||
metadataCache cache.ContentCache
|
||||
indexBlobCache *cache.PersistentCache
|
||||
committedContents *committedContentIndex
|
||||
enc *encryptedBlobMgr
|
||||
timeNow func() time.Time
|
||||
|
||||
// lock to protect the set of commtited indexes
|
||||
@@ -112,9 +104,9 @@ type SharedManager struct {
|
||||
log logging.Logger
|
||||
|
||||
// logger associated with the context that opened the repository.
|
||||
contextLogger logging.Logger
|
||||
internalLogManager *internalLogManager
|
||||
internalLogger *zap.SugaredLogger // backing logger for 'sharedBaseLogger'
|
||||
contextLogger logging.Logger
|
||||
repoLogManager *repolog.LogManager
|
||||
internalLogger *zap.SugaredLogger // backing logger for 'sharedBaseLogger'
|
||||
|
||||
metricsStruct
|
||||
}
|
||||
@@ -216,13 +208,17 @@ func (sm *SharedManager) loadPackIndexesLocked(ctx context.Context) error {
|
||||
}
|
||||
|
||||
if i > 0 {
|
||||
ibm.flushCache(ctx)
|
||||
// invalidate any list caches.
|
||||
if err := sm.st.FlushCaches(ctx); err != nil {
|
||||
sm.log.Errorw("unable to flush caches", "err", err)
|
||||
}
|
||||
|
||||
sm.log.Debugf("encountered NOT_FOUND when loading, sleeping %v before retrying #%v", nextSleepTime, i)
|
||||
time.Sleep(nextSleepTime)
|
||||
nextSleepTime *= 2
|
||||
}
|
||||
|
||||
indexBlobs, ignoreDeletedBefore, err := ibm.listActiveIndexBlobs(ctx)
|
||||
indexBlobs, ignoreDeletedBefore, err := ibm.ListActiveIndexBlobs(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error listing index blobs")
|
||||
}
|
||||
@@ -264,7 +260,7 @@ func (sm *SharedManager) getCacheForContentID(id ID) cache.ContentCache {
|
||||
return sm.contentCache
|
||||
}
|
||||
|
||||
func (sm *SharedManager) indexBlobManager() (indexBlobManager, error) {
|
||||
func (sm *SharedManager) indexBlobManager() (indexblob.Manager, error) {
|
||||
mp, mperr := sm.format.GetMutableParameters()
|
||||
if mperr != nil {
|
||||
return nil, errors.Wrap(mperr, "mutable parameters")
|
||||
@@ -336,9 +332,9 @@ func (sm *SharedManager) decryptAndVerify(encrypted gather.Bytes, iv []byte, out
|
||||
}
|
||||
|
||||
// IndexBlobs returns the list of active index blobs.
|
||||
func (sm *SharedManager) IndexBlobs(ctx context.Context, includeInactive bool) ([]IndexBlobInfo, error) {
|
||||
func (sm *SharedManager) IndexBlobs(ctx context.Context, includeInactive bool) ([]indexblob.Metadata, error) {
|
||||
if includeInactive {
|
||||
var result []IndexBlobInfo
|
||||
var result []indexblob.Metadata
|
||||
|
||||
for _, prefix := range allIndexBlobPrefixes {
|
||||
blobs, err := blob.ListAllBlobs(ctx, sm.st, prefix)
|
||||
@@ -347,7 +343,7 @@ func (sm *SharedManager) IndexBlobs(ctx context.Context, includeInactive bool) (
|
||||
}
|
||||
|
||||
for _, bm := range blobs {
|
||||
result = append(result, IndexBlobInfo{Metadata: bm})
|
||||
result = append(result, indexblob.Metadata{Metadata: bm})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -359,7 +355,7 @@ func (sm *SharedManager) IndexBlobs(ctx context.Context, includeInactive bool) (
|
||||
return nil, err0
|
||||
}
|
||||
|
||||
blobs, _, err := ibm.listActiveIndexBlobs(ctx)
|
||||
blobs, _, err := ibm.ListActiveIndexBlobs(ctx)
|
||||
|
||||
//nolint:wrapcheck
|
||||
return blobs, err
|
||||
@@ -471,32 +467,36 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca
|
||||
return errors.Wrap(err, "unable to initialize list cache")
|
||||
}
|
||||
|
||||
sm.enc = &encryptedBlobMgr{
|
||||
st: cachedSt,
|
||||
crypter: sm.format,
|
||||
indexBlobCache: indexBlobCache,
|
||||
log: sm.namedLogger("encrypted-blob-manager"),
|
||||
}
|
||||
enc := indexblob.NewEncryptionManager(
|
||||
cachedSt,
|
||||
sm.format,
|
||||
indexBlobCache,
|
||||
sm.namedLogger("encrypted-blob-manager"))
|
||||
|
||||
// set up legacy index blob manager
|
||||
sm.indexBlobManagerV0 = &indexBlobManagerV0{
|
||||
st: cachedSt,
|
||||
enc: sm.enc,
|
||||
timeNow: sm.timeNow,
|
||||
formattingOptions: sm.format,
|
||||
log: sm.namedLogger("index-blob-manager"),
|
||||
}
|
||||
sm.indexBlobManagerV0 = indexblob.NewManagerV0(
|
||||
cachedSt,
|
||||
enc,
|
||||
sm.timeNow,
|
||||
sm.format,
|
||||
sm.namedLogger("index-blob-manager"),
|
||||
)
|
||||
|
||||
// set up new index blob manager
|
||||
sm.indexBlobManagerV1 = &indexBlobManagerV1{
|
||||
st: cachedSt,
|
||||
enc: sm.enc,
|
||||
timeNow: sm.timeNow,
|
||||
formattingOptions: sm.format,
|
||||
log: sm.namedLogger("index-blob-manager"),
|
||||
}
|
||||
|
||||
sm.indexBlobManagerV1.epochMgr = epoch.NewManager(cachedSt, epochParameters{sm.format}, sm.indexBlobManagerV1.compactEpoch, sm.namedLogger("epoch-manager"), sm.timeNow)
|
||||
sm.indexBlobManagerV1 = indexblob.NewManagerV1(
|
||||
cachedSt,
|
||||
enc,
|
||||
epoch.NewManager(cachedSt,
|
||||
epochParameters{sm.format},
|
||||
func(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error {
|
||||
return errors.Wrap(sm.indexBlobManagerV1.CompactEpoch(ctx, blobIDs, outputPrefix), "CompactEpoch")
|
||||
},
|
||||
sm.namedLogger("epoch-manager"),
|
||||
sm.timeNow),
|
||||
sm.timeNow,
|
||||
sm.format,
|
||||
sm.namedLogger("index-blob-manager"),
|
||||
)
|
||||
|
||||
// once everything is ready, set it up
|
||||
sm.contentCache = dataCache
|
||||
@@ -505,7 +505,7 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca
|
||||
sm.committedContents = newCommittedContentIndex(caching,
|
||||
sm.format.Encryptor().Overhead,
|
||||
sm.format,
|
||||
sm.enc.getEncryptedBlob,
|
||||
enc.GetEncryptedBlob,
|
||||
sm.namedLogger("committed-content-index"),
|
||||
caching.MinIndexSweepAge.DurationOrDefault(DefaultIndexCacheSweepAge))
|
||||
|
||||
@@ -532,12 +532,12 @@ func (sm *SharedManager) EpochManager() (*epoch.Manager, bool, error) {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
ibm1, ok := ibm.(*indexBlobManagerV1)
|
||||
ibm1, ok := ibm.(*indexblob.ManagerV1)
|
||||
if !ok {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
return ibm1.epochMgr, true, nil
|
||||
return ibm1.EpochManager(), true, nil
|
||||
}
|
||||
|
||||
// CloseShared releases all resources in a shared manager.
|
||||
@@ -554,9 +554,9 @@ func (sm *SharedManager) CloseShared(ctx context.Context) error {
|
||||
sm.internalLogger.Sync() //nolint:errcheck
|
||||
}
|
||||
|
||||
sm.internalLogManager.Close(ctx)
|
||||
sm.repoLogManager.Close(ctx)
|
||||
|
||||
sm.indexBlobManagerV1.epochMgr.Flush()
|
||||
sm.indexBlobManagerV1.EpochManager().Flush()
|
||||
|
||||
if err := sm.st.Close(ctx); err != nil {
|
||||
return errors.Wrap(err, "error closing storage")
|
||||
@@ -568,7 +568,7 @@ func (sm *SharedManager) CloseShared(ctx context.Context) error {
|
||||
// AlsoLogToContentLog wraps the provided content so that all logs are also sent to
|
||||
// internal content log.
|
||||
func (sm *SharedManager) AlsoLogToContentLog(ctx context.Context) context.Context {
|
||||
sm.internalLogManager.enable()
|
||||
sm.repoLogManager.Enable()
|
||||
|
||||
return logging.AlsoLogTo(ctx, sm.log)
|
||||
}
|
||||
@@ -580,6 +580,12 @@ func (sm *SharedManager) shouldRefreshIndexes() bool {
|
||||
return sm.timeNow().After(sm.refreshIndexesAfter)
|
||||
}
|
||||
|
||||
// PrepareUpgradeToIndexBlobManagerV1 prepares the repository for migrating to IndexBlobManagerV1.
|
||||
func (sm *SharedManager) PrepareUpgradeToIndexBlobManagerV1(ctx context.Context, params epoch.Parameters) error {
|
||||
//nolint:wrapcheck
|
||||
return sm.indexBlobManagerV1.PrepareUpgradeToIndexBlobManagerV1(ctx, params, sm.indexBlobManagerV0)
|
||||
}
|
||||
|
||||
// NewSharedManager returns SharedManager that is used by SessionWriteManagers on top of a repository.
|
||||
func NewSharedManager(ctx context.Context, st blob.Storage, prov format.Provider, caching *CachingOptions, opts *ManagerOptions, mr *metrics.Registry) (*SharedManager, error) {
|
||||
opts = opts.CloneOrDefault()
|
||||
@@ -587,18 +593,6 @@ func NewSharedManager(ctx context.Context, st blob.Storage, prov format.Provider
|
||||
opts.TimeNow = clock.Now
|
||||
}
|
||||
|
||||
// create internal logger that will be writing logs as encrypted repository blobs.
|
||||
ilm := newInternalLogManager(ctx, st, prov)
|
||||
|
||||
// sharedBaseLogger writes to the both context and internal log
|
||||
// and is used as a base for all content manager components.
|
||||
var internalLog *zap.SugaredLogger
|
||||
|
||||
// capture logger (usually console or log file) associated with current context.
|
||||
if !opts.DisableInternalLog {
|
||||
internalLog = ilm.NewLogger()
|
||||
}
|
||||
|
||||
sm := &SharedManager{
|
||||
st: st,
|
||||
Stats: new(Stats),
|
||||
@@ -608,14 +602,16 @@ func NewSharedManager(ctx context.Context, st blob.Storage, prov format.Provider
|
||||
maxPreambleLength: defaultMaxPreambleLength,
|
||||
paddingUnit: defaultPaddingUnit,
|
||||
checkInvariantsOnUnlock: os.Getenv("KOPIA_VERIFY_INVARIANTS") != "",
|
||||
internalLogManager: ilm,
|
||||
internalLogger: internalLog,
|
||||
repoLogManager: repolog.NewLogManager(ctx, st, prov),
|
||||
contextLogger: logging.Module(FormatLogModule)(ctx),
|
||||
|
||||
metricsStruct: initMetricsStruct(mr),
|
||||
}
|
||||
|
||||
// remember logger defined for the context.
|
||||
if !opts.DisableInternalLog {
|
||||
sm.internalLogger = sm.repoLogManager.NewLogger()
|
||||
}
|
||||
|
||||
sm.log = sm.namedLogger("shared-manager")
|
||||
|
||||
caching = caching.CloneOrDefault()
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
)
|
||||
|
||||
func (s *contentManagerSuite) TestContentIndexRecovery(t *testing.T) {
|
||||
@@ -28,7 +29,7 @@ func (s *contentManagerSuite) TestContentIndexRecovery(t *testing.T) {
|
||||
}
|
||||
|
||||
// delete all index blobs
|
||||
require.NoError(t, bm.st.ListBlobs(ctx, LegacyIndexBlobPrefix, func(bi blob.Metadata) error {
|
||||
require.NoError(t, bm.st.ListBlobs(ctx, indexblob.V0IndexBlobPrefix, func(bi blob.Metadata) error {
|
||||
t.Logf("deleting %v", bi.BlobID)
|
||||
return bm.st.DeleteBlob(ctx, bi.BlobID)
|
||||
}))
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/compression"
|
||||
"github.com/kopia/kopia/repo/content/index"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
"github.com/kopia/kopia/repo/format"
|
||||
"github.com/kopia/kopia/repo/hashing"
|
||||
"github.com/kopia/kopia/repo/logging"
|
||||
@@ -62,12 +63,6 @@
|
||||
// ErrContentNotFound is returned when content is not found.
|
||||
var ErrContentNotFound = errors.New("content not found")
|
||||
|
||||
// IndexBlobInfo is an information about a single index blob managed by Manager.
|
||||
type IndexBlobInfo struct {
|
||||
blob.Metadata
|
||||
Superseded []blob.Metadata
|
||||
}
|
||||
|
||||
// WriteManager builds content-addressable storage with encryption, deduplication and packaging on top of BLOB store.
|
||||
type WriteManager struct {
|
||||
revision atomic.Int64 // changes on each local write
|
||||
@@ -429,7 +424,7 @@ func (bm *WriteManager) writeIndexBlobs(ctx context.Context, dataShards []gather
|
||||
}
|
||||
|
||||
//nolint:wrapcheck
|
||||
return ibm.writeIndexBlobs(ctx, dataShards, sessionID)
|
||||
return ibm.WriteIndexBlobs(ctx, dataShards, blob.ID(sessionID))
|
||||
}
|
||||
|
||||
// +checklocksread:bm.indexesLock
|
||||
@@ -452,7 +447,7 @@ func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context, mp format.Mu
|
||||
|
||||
if len(bm.packIndexBuilder) > 0 {
|
||||
_, span2 := tracer.Start(ctx, "BuildShards")
|
||||
dataShards, closeShards, err := bm.packIndexBuilder.BuildShards(mp.IndexVersion, true, defaultIndexShardSize)
|
||||
dataShards, closeShards, err := bm.packIndexBuilder.BuildShards(mp.IndexVersion, true, indexblob.DefaultIndexShardSize)
|
||||
|
||||
span2.End()
|
||||
|
||||
@@ -732,7 +727,7 @@ func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, pr
|
||||
return pp, nil
|
||||
}
|
||||
|
||||
bm.internalLogManager.enable()
|
||||
bm.repoLogManager.Enable()
|
||||
|
||||
b := gather.NewWriteBuffer()
|
||||
|
||||
|
||||
@@ -2,35 +2,17 @@
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobcrypto"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/timetrack"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/content/index"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
)
|
||||
|
||||
const verySmallContentFraction = 20 // blobs less than 1/verySmallContentFraction of maxPackSize are considered 'very small'
|
||||
|
||||
// CompactOptions provides options for compaction.
|
||||
type CompactOptions struct {
|
||||
MaxSmallBlobs int
|
||||
AllIndexes bool
|
||||
DropDeletedBefore time.Time
|
||||
DropContents []ID
|
||||
DisableEventualConsistencySafety bool
|
||||
}
|
||||
|
||||
func (co *CompactOptions) maxEventualConsistencySettleTime() time.Duration {
|
||||
if co.DisableEventualConsistencySafety {
|
||||
return 0
|
||||
}
|
||||
|
||||
return defaultEventualConsistencySettleTime
|
||||
}
|
||||
|
||||
// Refresh reloads the committed content indexes.
|
||||
func (sm *SharedManager) Refresh(ctx context.Context) error {
|
||||
sm.indexesLock.Lock()
|
||||
@@ -43,7 +25,7 @@ func (sm *SharedManager) Refresh(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
ibm.invalidate(ctx)
|
||||
ibm.Invalidate(ctx)
|
||||
|
||||
timer := timetrack.StartTimer()
|
||||
|
||||
@@ -54,7 +36,7 @@ func (sm *SharedManager) Refresh(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// CompactIndexes performs compaction of index blobs ensuring that # of small index blobs is below opt.maxSmallBlobs.
|
||||
func (sm *SharedManager) CompactIndexes(ctx context.Context, opt CompactOptions) error {
|
||||
func (sm *SharedManager) CompactIndexes(ctx context.Context, opt indexblob.CompactOptions) error {
|
||||
// we must hold the lock here to avoid the race with Refresh() which can reload the
|
||||
// current set of indexes while we process them.
|
||||
sm.indexesLock.Lock()
|
||||
@@ -67,7 +49,7 @@ func (sm *SharedManager) CompactIndexes(ctx context.Context, opt CompactOptions)
|
||||
return err
|
||||
}
|
||||
|
||||
if err := ibm.compact(ctx, opt); err != nil {
|
||||
if err := ibm.Compact(ctx, opt); err != nil {
|
||||
return errors.Wrap(err, "error performing compaction")
|
||||
}
|
||||
|
||||
@@ -80,11 +62,11 @@ func (sm *SharedManager) CompactIndexes(ctx context.Context, opt CompactOptions)
|
||||
}
|
||||
|
||||
// ParseIndexBlob loads entries in a given index blob and returns them.
|
||||
func ParseIndexBlob(blobID blob.ID, encrypted gather.Bytes, crypter crypter) ([]Info, error) {
|
||||
func ParseIndexBlob(blobID blob.ID, encrypted gather.Bytes, crypter blobcrypto.Crypter) ([]Info, error) {
|
||||
var data gather.WriteBuffer
|
||||
defer data.Close()
|
||||
|
||||
if err := DecryptBLOB(crypter, encrypted, blobID, &data); err != nil {
|
||||
if err := blobcrypto.Decrypt(crypter, encrypted, blobID, &data); err != nil {
|
||||
return nil, errors.Wrap(err, "unable to decrypt index blob")
|
||||
}
|
||||
|
||||
@@ -102,17 +84,3 @@ func ParseIndexBlob(blobID blob.ID, encrypted gather.Bytes, crypter crypter) ([]
|
||||
|
||||
return results, errors.Wrap(err, "error iterating index entries")
|
||||
}
|
||||
|
||||
func addBlobsToIndex(ndx map[blob.ID]*IndexBlobInfo, blobs []blob.Metadata) {
|
||||
for _, it := range blobs {
|
||||
if ndx[it.BlobID] == nil {
|
||||
ndx[it.BlobID] = &IndexBlobInfo{
|
||||
Metadata: blob.Metadata{
|
||||
BlobID: it.BlobID,
|
||||
Length: it.Length,
|
||||
Timestamp: it.Timestamp,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@
|
||||
"github.com/kopia/kopia/repo/blob/logging"
|
||||
"github.com/kopia/kopia/repo/compression"
|
||||
"github.com/kopia/kopia/repo/content/index"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
"github.com/kopia/kopia/repo/format"
|
||||
)
|
||||
|
||||
@@ -442,7 +443,7 @@ func (s *contentManagerSuite) TestIndexCompactionDropsContent(t *testing.T) {
|
||||
|
||||
bm = s.newTestContentManagerWithCustomTime(t, st, timeFunc)
|
||||
// this drops deleted entries, including from index #1
|
||||
require.NoError(t, bm.CompactIndexes(ctx, CompactOptions{
|
||||
require.NoError(t, bm.CompactIndexes(ctx, indexblob.CompactOptions{
|
||||
DropDeletedBefore: deleteThreshold,
|
||||
AllIndexes: true,
|
||||
}))
|
||||
@@ -522,7 +523,7 @@ func (s *contentManagerSuite) TestContentManagerConcurrency(t *testing.T) {
|
||||
|
||||
validateIndexCount(t, data, 4, 0)
|
||||
|
||||
if err := bm4.CompactIndexes(ctx, CompactOptions{MaxSmallBlobs: 1}); err != nil {
|
||||
if err := bm4.CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}); err != nil {
|
||||
t.Errorf("compaction error: %v", err)
|
||||
}
|
||||
|
||||
@@ -540,7 +541,7 @@ func (s *contentManagerSuite) TestContentManagerConcurrency(t *testing.T) {
|
||||
verifyContent(ctx, t, bm5, bm2content, seededRandomData(32, 100))
|
||||
verifyContent(ctx, t, bm5, bm3content, seededRandomData(33, 100))
|
||||
|
||||
if err := bm5.CompactIndexes(ctx, CompactOptions{MaxSmallBlobs: 1}); err != nil {
|
||||
if err := bm5.CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}); err != nil {
|
||||
t.Errorf("compaction error: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -551,11 +552,11 @@ func validateIndexCount(t *testing.T, data map[blob.ID][]byte, wantIndexCount, w
|
||||
var indexCnt, compactionLogCnt int
|
||||
|
||||
for blobID := range data {
|
||||
if strings.HasPrefix(string(blobID), LegacyIndexBlobPrefix) || strings.HasPrefix(string(blobID), "x") {
|
||||
if strings.HasPrefix(string(blobID), indexblob.V0IndexBlobPrefix) || strings.HasPrefix(string(blobID), "x") {
|
||||
indexCnt++
|
||||
}
|
||||
|
||||
if strings.HasPrefix(string(blobID), compactionLogBlobPrefix) {
|
||||
if strings.HasPrefix(string(blobID), indexblob.V0CompactionLogBlobPrefix) {
|
||||
compactionLogCnt++
|
||||
}
|
||||
}
|
||||
@@ -1203,7 +1204,7 @@ func (s *contentManagerSuite) TestFlushWaitsForAllPendingWriters(t *testing.T) {
|
||||
bm.Flush(ctx)
|
||||
t.Logf("<<< end of flushing")
|
||||
|
||||
indexBlobPrefix := blob.ID(LegacyIndexBlobPrefix)
|
||||
indexBlobPrefix := blob.ID(indexblob.V0IndexBlobPrefix)
|
||||
if s.mutableParameters.EpochParameters.Enabled {
|
||||
indexBlobPrefix = "x"
|
||||
}
|
||||
@@ -1923,7 +1924,7 @@ func (s *contentManagerSuite) verifyVersionCompat(t *testing.T, writeVersion for
|
||||
// make sure we can read everything
|
||||
verifyContentManagerDataSet(ctx, t, mgr, dataSet)
|
||||
|
||||
if err := mgr.CompactIndexes(ctx, CompactOptions{MaxSmallBlobs: 1}); err != nil {
|
||||
if err := mgr.CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}); err != nil {
|
||||
t.Fatalf("unable to compact indexes: %v", err)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,53 +0,0 @@
|
||||
package content
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/cache"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/logging"
|
||||
)
|
||||
|
||||
type encryptedBlobMgr struct {
|
||||
st blob.Storage
|
||||
crypter crypter
|
||||
indexBlobCache *cache.PersistentCache
|
||||
log logging.Logger
|
||||
}
|
||||
|
||||
func (m *encryptedBlobMgr) getEncryptedBlob(ctx context.Context, blobID blob.ID, output *gather.WriteBuffer) error {
|
||||
var payload gather.WriteBuffer
|
||||
defer payload.Close()
|
||||
|
||||
if err := m.indexBlobCache.GetOrLoad(ctx, string(blobID), func(output *gather.WriteBuffer) error {
|
||||
//nolint:wrapcheck
|
||||
return m.st.GetBlob(ctx, blobID, 0, -1, output)
|
||||
}, &payload); err != nil {
|
||||
return errors.Wrap(err, "getContent")
|
||||
}
|
||||
|
||||
return DecryptBLOB(m.crypter, payload.Bytes(), blobID, output)
|
||||
}
|
||||
|
||||
func (m *encryptedBlobMgr) encryptAndWriteBlob(ctx context.Context, data gather.Bytes, prefix blob.ID, sessionID SessionID) (blob.Metadata, error) {
|
||||
var data2 gather.WriteBuffer
|
||||
defer data2.Close()
|
||||
|
||||
blobID, err := EncryptBLOB(m.crypter, data, prefix, sessionID, &data2)
|
||||
if err != nil {
|
||||
return blob.Metadata{}, errors.Wrap(err, "error encrypting")
|
||||
}
|
||||
|
||||
bm, err := blob.PutBlobAndGetMetadata(ctx, m.st, blobID, data2.Bytes(), blob.PutOptions{})
|
||||
if err != nil {
|
||||
m.log.Debugf("write-index-blob %v failed %v", blobID, err)
|
||||
return blob.Metadata{}, errors.Wrapf(err, "error writing blob %v", blobID)
|
||||
}
|
||||
|
||||
m.log.Debugf("write-index-blob %v %v %v", blobID, bm.Length, bm.Timestamp)
|
||||
|
||||
return bm, nil
|
||||
}
|
||||
@@ -1,158 +0,0 @@
|
||||
package content
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/epoch"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/content/index"
|
||||
"github.com/kopia/kopia/repo/logging"
|
||||
)
|
||||
|
||||
type indexBlobManagerV1 struct {
|
||||
st blob.Storage
|
||||
enc *encryptedBlobMgr
|
||||
epochMgr *epoch.Manager
|
||||
timeNow func() time.Time
|
||||
log logging.Logger
|
||||
formattingOptions IndexFormattingOptions
|
||||
}
|
||||
|
||||
// ListIndexBlobInfos list active blob info structs. Also returns time of latest content deletion commit.
|
||||
func (m *indexBlobManagerV1) ListIndexBlobInfos(ctx context.Context) ([]IndexBlobInfo, time.Time, error) {
|
||||
return m.listActiveIndexBlobs(ctx)
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV1) listActiveIndexBlobs(ctx context.Context) ([]IndexBlobInfo, time.Time, error) {
|
||||
active, deletionWatermark, err := m.epochMgr.GetCompleteIndexSet(ctx, epoch.LatestEpoch)
|
||||
if err != nil {
|
||||
return nil, time.Time{}, errors.Wrap(err, "error getting index set")
|
||||
}
|
||||
|
||||
var result []IndexBlobInfo
|
||||
|
||||
for _, bm := range active {
|
||||
result = append(result, IndexBlobInfo{Metadata: bm})
|
||||
}
|
||||
|
||||
m.log.Errorf("active indexes %v deletion watermark %v", blob.IDsFromMetadata(active), deletionWatermark)
|
||||
|
||||
return result, deletionWatermark, nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV1) invalidate(ctx context.Context) {
|
||||
m.epochMgr.Invalidate()
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV1) flushCache(ctx context.Context) {
|
||||
if err := m.st.FlushCaches(ctx); err != nil {
|
||||
m.log.Debugf("error flushing caches: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV1) compact(ctx context.Context, opt CompactOptions) error {
|
||||
if opt.DropDeletedBefore.IsZero() {
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.Wrap(m.epochMgr.AdvanceDeletionWatermark(ctx, opt.DropDeletedBefore), "error advancing deletion watermark")
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV1) compactEpoch(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error {
|
||||
tmpbld := make(index.Builder)
|
||||
|
||||
for _, indexBlob := range blobIDs {
|
||||
if err := addIndexBlobsToBuilder(ctx, m.enc, tmpbld, indexBlob); err != nil {
|
||||
return errors.Wrap(err, "error adding index to builder")
|
||||
}
|
||||
}
|
||||
|
||||
mp, mperr := m.formattingOptions.GetMutableParameters()
|
||||
if mperr != nil {
|
||||
return errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
|
||||
dataShards, cleanupShards, err := tmpbld.BuildShards(mp.IndexVersion, true, defaultIndexShardSize)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to build index dataShards")
|
||||
}
|
||||
|
||||
defer cleanupShards()
|
||||
|
||||
var rnd [8]byte
|
||||
|
||||
if _, err := rand.Read(rnd[:]); err != nil {
|
||||
return errors.Wrap(err, "error getting random session ID")
|
||||
}
|
||||
|
||||
sessionID := fmt.Sprintf("s%x-c%v", rnd[:], len(dataShards))
|
||||
|
||||
var data2 gather.WriteBuffer
|
||||
defer data2.Close()
|
||||
|
||||
for _, data := range dataShards {
|
||||
data2.Reset()
|
||||
|
||||
blobID, err := EncryptBLOB(m.enc.crypter, data, outputPrefix, SessionID(sessionID), &data2)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error encrypting")
|
||||
}
|
||||
|
||||
if err := m.st.PutBlob(ctx, blobID, data2.Bytes(), blob.PutOptions{}); err != nil {
|
||||
return errors.Wrap(err, "error writing index blob")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV1) writeIndexBlobs(ctx context.Context, dataShards []gather.Bytes, sessionID SessionID) ([]blob.Metadata, error) {
|
||||
shards := map[blob.ID]blob.Bytes{}
|
||||
|
||||
sessionID = SessionID(fmt.Sprintf("%v-c%v", sessionID, len(dataShards)))
|
||||
|
||||
for _, data := range dataShards {
|
||||
// important - we're intentionally using data2 in the inner loop scheduling multiple Close()
|
||||
// we want all Close() to be called at the end of the function after WriteIndex()
|
||||
data2 := gather.NewWriteBuffer()
|
||||
defer data2.Close() //nolint:gocritic
|
||||
|
||||
unprefixedBlobID, err := EncryptBLOB(m.enc.crypter, data, "", sessionID, data2)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error encrypting")
|
||||
}
|
||||
|
||||
shards[unprefixedBlobID] = data2.Bytes()
|
||||
}
|
||||
|
||||
//nolint:wrapcheck
|
||||
return m.epochMgr.WriteIndex(ctx, shards)
|
||||
}
|
||||
|
||||
var _ indexBlobManager = (*indexBlobManagerV1)(nil)
|
||||
|
||||
// PrepareUpgradeToIndexBlobManagerV1 prepares the repository for migrating to IndexBlobManagerV1.
|
||||
func (sm *SharedManager) PrepareUpgradeToIndexBlobManagerV1(ctx context.Context, params epoch.Parameters) error {
|
||||
ibl, _, err := sm.indexBlobManagerV0.listActiveIndexBlobs(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error listing active index blobs")
|
||||
}
|
||||
|
||||
var blobIDs []blob.ID
|
||||
|
||||
for _, ib := range ibl {
|
||||
blobIDs = append(blobIDs, ib.BlobID)
|
||||
}
|
||||
|
||||
if err := sm.indexBlobManagerV1.compactEpoch(ctx, blobIDs, epoch.UncompactedEpochBlobPrefix(epoch.FirstEpoch)); err != nil {
|
||||
return errors.Wrap(err, "unable to generate initial epoch")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
56
repo/content/indexblob/index_blob.go
Normal file
56
repo/content/indexblob/index_blob.go
Normal file
@@ -0,0 +1,56 @@
|
||||
// Package indexblob manages sets of active index blobs.
|
||||
package indexblob
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/content/index"
|
||||
)
|
||||
|
||||
// Manager is the API of index blob manager as used by content manager.
|
||||
type Manager interface {
|
||||
WriteIndexBlobs(ctx context.Context, data []gather.Bytes, suffix blob.ID) ([]blob.Metadata, error)
|
||||
ListActiveIndexBlobs(ctx context.Context) ([]Metadata, time.Time, error)
|
||||
Compact(ctx context.Context, opts CompactOptions) error
|
||||
Invalidate(ctx context.Context)
|
||||
}
|
||||
|
||||
// CompactOptions provides options for compaction.
|
||||
type CompactOptions struct {
|
||||
MaxSmallBlobs int
|
||||
AllIndexes bool
|
||||
DropDeletedBefore time.Time
|
||||
DropContents []index.ID
|
||||
DisableEventualConsistencySafety bool
|
||||
}
|
||||
|
||||
func (co *CompactOptions) maxEventualConsistencySettleTime() time.Duration {
|
||||
if co.DisableEventualConsistencySafety {
|
||||
return 0
|
||||
}
|
||||
|
||||
return defaultEventualConsistencySettleTime
|
||||
}
|
||||
|
||||
// DefaultIndexShardSize is the maximum number of items in an index shard.
|
||||
// It is less than 2^24, which lets V1 index use 24-bit/3-byte indexes.
|
||||
const DefaultIndexShardSize = 16e6
|
||||
|
||||
const verySmallContentFraction = 20 // blobs less than 1/verySmallContentFraction of maxPackSize are considered 'very small'
|
||||
|
||||
func addBlobsToIndex(ndx map[blob.ID]*Metadata, blobs []blob.Metadata) {
|
||||
for _, it := range blobs {
|
||||
if ndx[it.BlobID] == nil {
|
||||
ndx[it.BlobID] = &Metadata{
|
||||
Metadata: blob.Metadata{
|
||||
BlobID: it.BlobID,
|
||||
Length: it.Length,
|
||||
Timestamp: it.Timestamp,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
75
repo/content/indexblob/index_blob_encryption.go
Normal file
75
repo/content/indexblob/index_blob_encryption.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package indexblob
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobcrypto"
|
||||
"github.com/kopia/kopia/internal/cache"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/logging"
|
||||
)
|
||||
|
||||
// Metadata is an information about a single index blob managed by Manager.
|
||||
type Metadata struct {
|
||||
blob.Metadata
|
||||
Superseded []blob.Metadata
|
||||
}
|
||||
|
||||
// EncryptionManager manages encryption and caching of index blobs.
|
||||
type EncryptionManager struct {
|
||||
st blob.Storage
|
||||
crypter blobcrypto.Crypter
|
||||
indexBlobCache *cache.PersistentCache
|
||||
log logging.Logger
|
||||
}
|
||||
|
||||
// GetEncryptedBlob fetches and decrypts the contents of a given encrypted blob
|
||||
// using cache first and falling back to the underlying storage.
|
||||
func (m *EncryptionManager) GetEncryptedBlob(ctx context.Context, blobID blob.ID, output *gather.WriteBuffer) error {
|
||||
var payload gather.WriteBuffer
|
||||
defer payload.Close()
|
||||
|
||||
if err := m.indexBlobCache.GetOrLoad(ctx, string(blobID), func(output *gather.WriteBuffer) error {
|
||||
//nolint:wrapcheck
|
||||
return m.st.GetBlob(ctx, blobID, 0, -1, output)
|
||||
}, &payload); err != nil {
|
||||
return errors.Wrap(err, "getContent")
|
||||
}
|
||||
|
||||
return errors.Wrap(blobcrypto.Decrypt(m.crypter, payload.Bytes(), blobID, output), "decrypt blob")
|
||||
}
|
||||
|
||||
// EncryptAndWriteBlob encrypts and writes the provided data into a blob,
|
||||
// with name {prefix}{hash}[-{suffix}].
|
||||
func (m *EncryptionManager) EncryptAndWriteBlob(ctx context.Context, data gather.Bytes, prefix, suffix blob.ID) (blob.Metadata, error) {
|
||||
var data2 gather.WriteBuffer
|
||||
defer data2.Close()
|
||||
|
||||
blobID, err := blobcrypto.Encrypt(m.crypter, data, prefix, suffix, &data2)
|
||||
if err != nil {
|
||||
return blob.Metadata{}, errors.Wrap(err, "error encrypting")
|
||||
}
|
||||
|
||||
bm, err := blob.PutBlobAndGetMetadata(ctx, m.st, blobID, data2.Bytes(), blob.PutOptions{})
|
||||
if err != nil {
|
||||
m.log.Debugf("write-index-blob %v failed %v", blobID, err)
|
||||
return blob.Metadata{}, errors.Wrapf(err, "error writing blob %v", blobID)
|
||||
}
|
||||
|
||||
m.log.Debugf("write-index-blob %v %v %v", blobID, bm.Length, bm.Timestamp)
|
||||
|
||||
return bm, nil
|
||||
}
|
||||
|
||||
// NewEncryptionManager creates new encryption manager.
|
||||
func NewEncryptionManager(
|
||||
st blob.Storage,
|
||||
crypter blobcrypto.Crypter,
|
||||
indexBlobCache *cache.PersistentCache,
|
||||
log logging.Logger,
|
||||
) *EncryptionManager {
|
||||
return &EncryptionManager{st, crypter, indexBlobCache, log}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package content
|
||||
package indexblob
|
||||
|
||||
import (
|
||||
"testing"
|
||||
@@ -38,7 +38,7 @@ func TestEncryptedBlobManager(t *testing.T) {
|
||||
enc, err := encryption.CreateEncryptor(f)
|
||||
require.NoError(t, err)
|
||||
|
||||
ebm := encryptedBlobMgr{
|
||||
ebm := EncryptionManager{
|
||||
st: fs,
|
||||
crypter: staticCrypter{hf, enc},
|
||||
indexBlobCache: nil,
|
||||
@@ -47,7 +47,7 @@ func TestEncryptedBlobManager(t *testing.T) {
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
|
||||
bm, err := ebm.encryptAndWriteBlob(ctx, gather.FromSlice([]byte{1, 2, 3}), "x", "session1")
|
||||
bm, err := ebm.EncryptAndWriteBlob(ctx, gather.FromSlice([]byte{1, 2, 3}), "x", "session1")
|
||||
require.NoError(t, err)
|
||||
|
||||
stbm, err := st.GetMetadata(ctx, bm.BlobID)
|
||||
@@ -58,27 +58,27 @@ func TestEncryptedBlobManager(t *testing.T) {
|
||||
var tmp gather.WriteBuffer
|
||||
defer tmp.Close()
|
||||
|
||||
require.NoError(t, ebm.getEncryptedBlob(ctx, bm.BlobID, &tmp))
|
||||
require.NoError(t, ebm.GetEncryptedBlob(ctx, bm.BlobID, &tmp))
|
||||
|
||||
// data corruption
|
||||
data[bm.BlobID][0] ^= 1
|
||||
|
||||
require.Error(t, ebm.getEncryptedBlob(ctx, bm.BlobID, &tmp))
|
||||
require.Error(t, ebm.GetEncryptedBlob(ctx, bm.BlobID, &tmp))
|
||||
|
||||
require.ErrorIs(t, ebm.getEncryptedBlob(ctx, "no-such-blob", &tmp), blob.ErrBlobNotFound)
|
||||
require.ErrorIs(t, ebm.GetEncryptedBlob(ctx, "no-such-blob", &tmp), blob.ErrBlobNotFound)
|
||||
|
||||
someError := errors.Errorf("some error")
|
||||
|
||||
fs.AddFault(blobtesting.MethodPutBlob).ErrorInstead(someError)
|
||||
|
||||
_, err = ebm.encryptAndWriteBlob(ctx, gather.FromSlice([]byte{1, 2, 3, 4}), "x", "session1")
|
||||
_, err = ebm.EncryptAndWriteBlob(ctx, gather.FromSlice([]byte{1, 2, 3, 4}), "x", "session1")
|
||||
require.ErrorIs(t, err, someError)
|
||||
|
||||
someError2 := errors.Errorf("some error 2")
|
||||
|
||||
ebm.crypter = staticCrypter{hf, failingEncryptor{nil, someError2}}
|
||||
|
||||
_, err = ebm.encryptAndWriteBlob(ctx, gather.FromSlice([]byte{1, 2, 3, 4}), "x", "session1")
|
||||
_, err = ebm.EncryptAndWriteBlob(ctx, gather.FromSlice([]byte{1, 2, 3, 4}), "x", "session1")
|
||||
require.ErrorIs(t, err, someError2)
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package content
|
||||
package indexblob
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -15,16 +15,16 @@
|
||||
"github.com/kopia/kopia/repo/logging"
|
||||
)
|
||||
|
||||
// LegacyIndexBlobPrefix is the prefix for all legacy (v0) index blobs.
|
||||
const LegacyIndexBlobPrefix = "n"
|
||||
// V0IndexBlobPrefix is the prefix for all legacy (v0) index blobs.
|
||||
const V0IndexBlobPrefix = "n"
|
||||
|
||||
const (
|
||||
defaultIndexShardSize = 16e6 // slightly less than 2^24, which lets index use 24-bit/3-byte indexes
|
||||
// V0CompactionLogBlobPrefix is the prefix for all legacy (v0) index compactions blobs.
|
||||
const V0CompactionLogBlobPrefix = "m"
|
||||
|
||||
defaultEventualConsistencySettleTime = 1 * time.Hour
|
||||
compactionLogBlobPrefix = "m"
|
||||
cleanupBlobPrefix = "l"
|
||||
)
|
||||
// V0CleanupBlobPrefix is the prefix for all legacy (v0) index cleanup blobs.
|
||||
const V0CleanupBlobPrefix = "l"
|
||||
|
||||
const defaultEventualConsistencySettleTime = 1 * time.Hour
|
||||
|
||||
// compactionLogEntry represents contents of compaction log entry stored in `m` blob.
|
||||
type compactionLogEntry struct {
|
||||
@@ -56,23 +56,23 @@ type IndexFormattingOptions interface {
|
||||
GetMutableParameters() (format.MutableParameters, error)
|
||||
}
|
||||
|
||||
type indexBlobManagerV0 struct {
|
||||
st blob.Storage
|
||||
enc *encryptedBlobMgr
|
||||
timeNow func() time.Time
|
||||
log logging.Logger
|
||||
|
||||
// ManagerV0 is a V0 (legacy) implementation of index blob manager.
|
||||
type ManagerV0 struct {
|
||||
st blob.Storage
|
||||
enc *EncryptionManager
|
||||
timeNow func() time.Time
|
||||
formattingOptions IndexFormattingOptions
|
||||
log logging.Logger
|
||||
}
|
||||
|
||||
// ListIndexBlobInfos list active blob info structs. Also returns time of latest content deletion commit.
|
||||
func (m *indexBlobManagerV0) ListIndexBlobInfos(ctx context.Context) ([]IndexBlobInfo, time.Time, error) {
|
||||
activeIndexBlobs, t0, err := m.listActiveIndexBlobs(ctx)
|
||||
func (m *ManagerV0) ListIndexBlobInfos(ctx context.Context) ([]Metadata, time.Time, error) {
|
||||
activeIndexBlobs, t0, err := m.ListActiveIndexBlobs(ctx)
|
||||
if err != nil {
|
||||
return nil, time.Time{}, err
|
||||
}
|
||||
|
||||
q := make([]IndexBlobInfo, 0, len(activeIndexBlobs))
|
||||
q := make([]Metadata, 0, len(activeIndexBlobs))
|
||||
|
||||
for _, activeIndexBlob := range activeIndexBlobs {
|
||||
// skip the V0 blob poison that is used to prevent client reads.
|
||||
@@ -86,21 +86,23 @@ func (m *indexBlobManagerV0) ListIndexBlobInfos(ctx context.Context) ([]IndexBlo
|
||||
return q, t0, nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) listActiveIndexBlobs(ctx context.Context) ([]IndexBlobInfo, time.Time, error) {
|
||||
// ListActiveIndexBlobs lists the metadata for active index blobs and returns the cut-off time
|
||||
// before which all deleted index entries should be treated as non-existent.
|
||||
func (m *ManagerV0) ListActiveIndexBlobs(ctx context.Context) ([]Metadata, time.Time, error) {
|
||||
var compactionLogMetadata, storageIndexBlobs []blob.Metadata
|
||||
|
||||
var eg errgroup.Group
|
||||
|
||||
// list index and cleanup blobs in parallel.
|
||||
eg.Go(func() error {
|
||||
v, err := blob.ListAllBlobs(ctx, m.st, compactionLogBlobPrefix)
|
||||
v, err := blob.ListAllBlobs(ctx, m.st, V0CompactionLogBlobPrefix)
|
||||
compactionLogMetadata = v
|
||||
|
||||
return errors.Wrap(err, "error listing compaction blobs")
|
||||
})
|
||||
|
||||
eg.Go(func() error {
|
||||
v, err := blob.ListAllBlobs(ctx, m.st, LegacyIndexBlobPrefix)
|
||||
v, err := blob.ListAllBlobs(ctx, m.st, V0IndexBlobPrefix)
|
||||
storageIndexBlobs = v
|
||||
|
||||
return errors.Wrap(err, "error listing index blobs")
|
||||
@@ -118,7 +120,7 @@ func (m *indexBlobManagerV0) listActiveIndexBlobs(ctx context.Context) ([]IndexB
|
||||
m.log.Debugf("found-compaction-blobs[%v] %v", i, clm)
|
||||
}
|
||||
|
||||
indexMap := map[blob.ID]*IndexBlobInfo{}
|
||||
indexMap := map[blob.ID]*Metadata{}
|
||||
addBlobsToIndex(indexMap, storageIndexBlobs)
|
||||
|
||||
compactionLogs, err := m.getCompactionLogEntries(ctx, compactionLogMetadata)
|
||||
@@ -129,7 +131,7 @@ func (m *indexBlobManagerV0) listActiveIndexBlobs(ctx context.Context) ([]IndexB
|
||||
// remove entries from indexMap that have been compacted and replaced by other indexes.
|
||||
m.removeCompactedIndexes(indexMap, compactionLogs)
|
||||
|
||||
var results []IndexBlobInfo
|
||||
var results []Metadata
|
||||
for _, v := range indexMap {
|
||||
results = append(results, *v)
|
||||
}
|
||||
@@ -141,17 +143,14 @@ func (m *indexBlobManagerV0) listActiveIndexBlobs(ctx context.Context) ([]IndexB
|
||||
return results, time.Time{}, nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) invalidate(ctx context.Context) {
|
||||
// Invalidate invalidates any caches.
|
||||
func (m *ManagerV0) Invalidate(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) flushCache(ctx context.Context) {
|
||||
if err := m.st.FlushCaches(ctx); err != nil {
|
||||
m.log.Debugf("error flushing caches: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) compact(ctx context.Context, opt CompactOptions) error {
|
||||
indexBlobs, _, err := m.listActiveIndexBlobs(ctx)
|
||||
// Compact performs compaction of index blobs by merging smaller ones into larger
|
||||
// and registering compaction and cleanup blobs in the repository.
|
||||
func (m *ManagerV0) Compact(ctx context.Context, opt CompactOptions) error {
|
||||
indexBlobs, _, err := m.ListActiveIndexBlobs(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error listing active index blobs")
|
||||
}
|
||||
@@ -174,7 +173,7 @@ func (m *indexBlobManagerV0) compact(ctx context.Context, opt CompactOptions) er
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata, maxEventualConsistencySettleTime time.Duration) error {
|
||||
func (m *ManagerV0) registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata, maxEventualConsistencySettleTime time.Duration) error {
|
||||
logEntryBytes, err := json.Marshal(&compactionLogEntry{
|
||||
InputMetadata: inputs,
|
||||
OutputMetadata: outputs,
|
||||
@@ -183,7 +182,7 @@ func (m *indexBlobManagerV0) registerCompaction(ctx context.Context, inputs, out
|
||||
return errors.Wrap(err, "unable to marshal log entry bytes")
|
||||
}
|
||||
|
||||
compactionLogBlobMetadata, err := m.enc.encryptAndWriteBlob(ctx, gather.FromSlice(logEntryBytes), compactionLogBlobPrefix, "")
|
||||
compactionLogBlobMetadata, err := m.enc.EncryptAndWriteBlob(ctx, gather.FromSlice(logEntryBytes), V0CompactionLogBlobPrefix, "")
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to write compaction log")
|
||||
}
|
||||
@@ -205,15 +204,12 @@ func (m *indexBlobManagerV0) registerCompaction(ctx context.Context, inputs, out
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) getIndexBlob(ctx context.Context, blobID blob.ID, output *gather.WriteBuffer) error {
|
||||
return m.enc.getEncryptedBlob(ctx, blobID, output)
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) writeIndexBlobs(ctx context.Context, dataShards []gather.Bytes, sessionID SessionID) ([]blob.Metadata, error) {
|
||||
// WriteIndexBlobs writes the provided data shards into new index blobs oprionally appending the provided suffix.
|
||||
func (m *ManagerV0) WriteIndexBlobs(ctx context.Context, dataShards []gather.Bytes, suffix blob.ID) ([]blob.Metadata, error) {
|
||||
var result []blob.Metadata
|
||||
|
||||
for _, data := range dataShards {
|
||||
bm, err := m.enc.encryptAndWriteBlob(ctx, data, LegacyIndexBlobPrefix, sessionID)
|
||||
bm, err := m.enc.EncryptAndWriteBlob(ctx, data, V0IndexBlobPrefix, suffix)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error writing index blbo")
|
||||
}
|
||||
@@ -224,14 +220,14 @@ func (m *indexBlobManagerV0) writeIndexBlobs(ctx context.Context, dataShards []g
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) getCompactionLogEntries(ctx context.Context, blobs []blob.Metadata) (map[blob.ID]*compactionLogEntry, error) {
|
||||
func (m *ManagerV0) getCompactionLogEntries(ctx context.Context, blobs []blob.Metadata) (map[blob.ID]*compactionLogEntry, error) {
|
||||
results := map[blob.ID]*compactionLogEntry{}
|
||||
|
||||
var data gather.WriteBuffer
|
||||
defer data.Close()
|
||||
|
||||
for _, cb := range blobs {
|
||||
err := m.enc.getEncryptedBlob(ctx, cb.BlobID, &data)
|
||||
err := m.enc.GetEncryptedBlob(ctx, cb.BlobID, &data)
|
||||
|
||||
if errors.Is(err, blob.ErrBlobNotFound) {
|
||||
continue
|
||||
@@ -255,7 +251,7 @@ func (m *indexBlobManagerV0) getCompactionLogEntries(ctx context.Context, blobs
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) getCleanupEntries(ctx context.Context, latestServerBlobTime time.Time, blobs []blob.Metadata) (map[blob.ID]*cleanupEntry, error) {
|
||||
func (m *ManagerV0) getCleanupEntries(ctx context.Context, latestServerBlobTime time.Time, blobs []blob.Metadata) (map[blob.ID]*cleanupEntry, error) {
|
||||
results := map[blob.ID]*cleanupEntry{}
|
||||
|
||||
var data gather.WriteBuffer
|
||||
@@ -264,7 +260,7 @@ func (m *indexBlobManagerV0) getCleanupEntries(ctx context.Context, latestServer
|
||||
for _, cb := range blobs {
|
||||
data.Reset()
|
||||
|
||||
err := m.enc.getEncryptedBlob(ctx, cb.BlobID, &data)
|
||||
err := m.enc.GetEncryptedBlob(ctx, cb.BlobID, &data)
|
||||
|
||||
if errors.Is(err, blob.ErrBlobNotFound) {
|
||||
continue
|
||||
@@ -288,8 +284,8 @@ func (m *indexBlobManagerV0) getCleanupEntries(ctx context.Context, latestServer
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) deleteOldBlobs(ctx context.Context, latestBlob blob.Metadata, maxEventualConsistencySettleTime time.Duration) error {
|
||||
allCompactionLogBlobs, err := blob.ListAllBlobs(ctx, m.st, compactionLogBlobPrefix)
|
||||
func (m *ManagerV0) deleteOldBlobs(ctx context.Context, latestBlob blob.Metadata, maxEventualConsistencySettleTime time.Duration) error {
|
||||
allCompactionLogBlobs, err := blob.ListAllBlobs(ctx, m.st, V0CompactionLogBlobPrefix)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error listing compaction log blobs")
|
||||
}
|
||||
@@ -323,7 +319,7 @@ func (m *indexBlobManagerV0) deleteOldBlobs(ctx context.Context, latestBlob blob
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) findIndexBlobsToDelete(latestServerBlobTime time.Time, entries map[blob.ID]*compactionLogEntry, maxEventualConsistencySettleTime time.Duration) []blob.ID {
|
||||
func (m *ManagerV0) findIndexBlobsToDelete(latestServerBlobTime time.Time, entries map[blob.ID]*compactionLogEntry, maxEventualConsistencySettleTime time.Duration) []blob.ID {
|
||||
tmp := map[blob.ID]bool{}
|
||||
|
||||
for _, cl := range entries {
|
||||
@@ -349,7 +345,7 @@ func (m *indexBlobManagerV0) findIndexBlobsToDelete(latestServerBlobTime time.Ti
|
||||
return result
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) findCompactionLogBlobsToDelayCleanup(compactionBlobs []blob.Metadata) []blob.ID {
|
||||
func (m *ManagerV0) findCompactionLogBlobsToDelayCleanup(compactionBlobs []blob.Metadata) []blob.ID {
|
||||
var result []blob.ID
|
||||
|
||||
for _, cb := range compactionBlobs {
|
||||
@@ -360,7 +356,7 @@ func (m *indexBlobManagerV0) findCompactionLogBlobsToDelayCleanup(compactionBlob
|
||||
return result
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) findBlobsToDelete(entries map[blob.ID]*cleanupEntry, maxEventualConsistencySettleTime time.Duration) (compactionLogs, cleanupBlobs []blob.ID) {
|
||||
func (m *ManagerV0) findBlobsToDelete(entries map[blob.ID]*cleanupEntry, maxEventualConsistencySettleTime time.Duration) (compactionLogs, cleanupBlobs []blob.ID) {
|
||||
for k, e := range entries {
|
||||
if e.age >= maxEventualConsistencySettleTime {
|
||||
compactionLogs = append(compactionLogs, e.BlobIDs...)
|
||||
@@ -371,7 +367,7 @@ func (m *indexBlobManagerV0) findBlobsToDelete(entries map[blob.ID]*cleanupEntry
|
||||
return
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) delayCleanupBlobs(ctx context.Context, blobIDs []blob.ID, cleanupScheduleTime time.Time) error {
|
||||
func (m *ManagerV0) delayCleanupBlobs(ctx context.Context, blobIDs []blob.ID, cleanupScheduleTime time.Time) error {
|
||||
if len(blobIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -384,14 +380,14 @@ func (m *indexBlobManagerV0) delayCleanupBlobs(ctx context.Context, blobIDs []bl
|
||||
return errors.Wrap(err, "unable to marshal cleanup log bytes")
|
||||
}
|
||||
|
||||
if _, err := m.enc.encryptAndWriteBlob(ctx, gather.FromSlice(payload), cleanupBlobPrefix, ""); err != nil {
|
||||
if _, err := m.enc.EncryptAndWriteBlob(ctx, gather.FromSlice(payload), V0CleanupBlobPrefix, ""); err != nil {
|
||||
return errors.Wrap(err, "unable to cleanup log")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) deleteBlobsFromStorageAndCache(ctx context.Context, blobIDs []blob.ID) error {
|
||||
func (m *ManagerV0) deleteBlobsFromStorageAndCache(ctx context.Context, blobIDs []blob.ID) error {
|
||||
for _, blobID := range blobIDs {
|
||||
if err := m.st.DeleteBlob(ctx, blobID); err != nil && !errors.Is(err, blob.ErrBlobNotFound) {
|
||||
m.log.Debugf("delete-blob failed %v %v", blobID, err)
|
||||
@@ -404,8 +400,8 @@ func (m *indexBlobManagerV0) deleteBlobsFromStorageAndCache(ctx context.Context,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) cleanup(ctx context.Context, maxEventualConsistencySettleTime time.Duration) error {
|
||||
allCleanupBlobs, err := blob.ListAllBlobs(ctx, m.st, cleanupBlobPrefix)
|
||||
func (m *ManagerV0) cleanup(ctx context.Context, maxEventualConsistencySettleTime time.Duration) error {
|
||||
allCleanupBlobs, err := blob.ListAllBlobs(ctx, m.st, V0CleanupBlobPrefix)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error listing cleanup blobs")
|
||||
}
|
||||
@@ -436,14 +432,16 @@ func (m *indexBlobManagerV0) cleanup(ctx context.Context, maxEventualConsistency
|
||||
return errors.Wrap(err, "unable to delete cleanup blobs")
|
||||
}
|
||||
|
||||
m.flushCache(ctx)
|
||||
if err := m.st.FlushCaches(ctx); err != nil {
|
||||
m.log.Debugw("error flushing caches", "err", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) getBlobsToCompact(indexBlobs []IndexBlobInfo, opt CompactOptions, mp format.MutableParameters) []IndexBlobInfo {
|
||||
func (m *ManagerV0) getBlobsToCompact(indexBlobs []Metadata, opt CompactOptions, mp format.MutableParameters) []Metadata {
|
||||
var (
|
||||
nonCompactedBlobs, verySmallBlobs []IndexBlobInfo
|
||||
nonCompactedBlobs, verySmallBlobs []Metadata
|
||||
totalSizeNonCompactedBlobs, totalSizeVerySmallBlobs, totalSizeMediumSizedBlobs int64
|
||||
mediumSizedBlobCount int
|
||||
)
|
||||
@@ -467,7 +465,7 @@ func (m *indexBlobManagerV0) getBlobsToCompact(indexBlobs []IndexBlobInfo, opt C
|
||||
|
||||
if len(nonCompactedBlobs) < opt.MaxSmallBlobs {
|
||||
// current count is below min allowed - nothing to do
|
||||
m.log.Debugf("no small contents to compact")
|
||||
m.log.Debugf("no small contents to Compact")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -481,7 +479,7 @@ func (m *indexBlobManagerV0) getBlobsToCompact(indexBlobs []IndexBlobInfo, opt C
|
||||
return nonCompactedBlobs
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs []IndexBlobInfo, opt CompactOptions) error {
|
||||
func (m *ManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs []Metadata, opt CompactOptions) error {
|
||||
if len(indexBlobs) <= 1 && opt.DropDeletedBefore.IsZero() && len(opt.DropContents) == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -509,14 +507,14 @@ func (m *indexBlobManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs [
|
||||
// we must do it after all input blobs have been merged, otherwise we may resurrect contents.
|
||||
m.dropContentsFromBuilder(bld, opt)
|
||||
|
||||
dataShards, cleanupShards, err := bld.BuildShards(mp.IndexVersion, false, defaultIndexShardSize)
|
||||
dataShards, cleanupShards, err := bld.BuildShards(mp.IndexVersion, false, DefaultIndexShardSize)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to build an index")
|
||||
}
|
||||
|
||||
defer cleanupShards()
|
||||
|
||||
compactedIndexBlobs, err := m.writeIndexBlobs(ctx, dataShards, "")
|
||||
compactedIndexBlobs, err := m.WriteIndexBlobs(ctx, dataShards, "")
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to write compacted indexes")
|
||||
}
|
||||
@@ -530,7 +528,7 @@ func (m *indexBlobManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs [
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) dropContentsFromBuilder(bld index.Builder, opt CompactOptions) {
|
||||
func (m *ManagerV0) dropContentsFromBuilder(bld index.Builder, opt CompactOptions) {
|
||||
for _, dc := range opt.DropContents {
|
||||
if _, ok := bld[dc]; ok {
|
||||
m.log.Debugf("manual-drop-from-index %v", dc)
|
||||
@@ -552,11 +550,11 @@ func (m *indexBlobManagerV0) dropContentsFromBuilder(bld index.Builder, opt Comp
|
||||
}
|
||||
}
|
||||
|
||||
func addIndexBlobsToBuilder(ctx context.Context, enc *encryptedBlobMgr, bld index.Builder, indexBlobID blob.ID) error {
|
||||
func addIndexBlobsToBuilder(ctx context.Context, enc *EncryptionManager, bld index.Builder, indexBlobID blob.ID) error {
|
||||
var data gather.WriteBuffer
|
||||
defer data.Close()
|
||||
|
||||
err := enc.getEncryptedBlob(ctx, indexBlobID, &data)
|
||||
err := enc.GetEncryptedBlob(ctx, indexBlobID, &data)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error getting index %q", indexBlobID)
|
||||
}
|
||||
@@ -566,7 +564,7 @@ func addIndexBlobsToBuilder(ctx context.Context, enc *encryptedBlobMgr, bld inde
|
||||
return errors.Wrapf(err, "unable to open index blob %q", indexBlobID)
|
||||
}
|
||||
|
||||
_ = ndx.Iterate(index.AllIDs, func(i Info) error {
|
||||
_ = ndx.Iterate(index.AllIDs, func(i index.Info) error {
|
||||
bld.Add(i)
|
||||
return nil
|
||||
})
|
||||
@@ -586,7 +584,7 @@ func blobsOlderThan(m []blob.Metadata, cutoffTime time.Time) []blob.Metadata {
|
||||
return res
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerV0) removeCompactedIndexes(bimap map[blob.ID]*IndexBlobInfo, compactionLogs map[blob.ID]*compactionLogEntry) {
|
||||
func (m *ManagerV0) removeCompactedIndexes(bimap map[blob.ID]*Metadata, compactionLogs map[blob.ID]*compactionLogEntry) {
|
||||
var validCompactionLogs []*compactionLogEntry
|
||||
|
||||
for _, cl := range compactionLogs {
|
||||
@@ -619,3 +617,16 @@ func (m *indexBlobManagerV0) removeCompactedIndexes(bimap map[blob.ID]*IndexBlob
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewManagerV0 creates new instance of ManagerV0 with all required parameters set.
|
||||
func NewManagerV0(
|
||||
st blob.Storage,
|
||||
enc *EncryptionManager,
|
||||
timeNow func() time.Time,
|
||||
formattingOptions IndexFormattingOptions,
|
||||
log logging.Logger,
|
||||
) *ManagerV0 {
|
||||
return &ManagerV0{st, enc, timeNow, formattingOptions, log}
|
||||
}
|
||||
|
||||
var _ Manager = (*ManagerV0)(nil)
|
||||
@@ -1,4 +1,4 @@
|
||||
package content
|
||||
package indexblob
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -426,7 +426,7 @@ type fakeContentIndexEntry struct {
|
||||
Deleted bool
|
||||
}
|
||||
|
||||
func verifyFakeContentsWritten(ctx context.Context, t *testing.T, m *indexBlobManagerV0, numWritten int, contentPrefix string, deletedContents map[string]bool) error {
|
||||
func verifyFakeContentsWritten(ctx context.Context, t *testing.T, m *ManagerV0, numWritten int, contentPrefix string, deletedContents map[string]bool) error {
|
||||
t.Helper()
|
||||
|
||||
if numWritten == 0 {
|
||||
@@ -460,7 +460,7 @@ func verifyFakeContentsWritten(ctx context.Context, t *testing.T, m *indexBlobMa
|
||||
return nil
|
||||
}
|
||||
|
||||
func fakeCompaction(ctx context.Context, t *testing.T, m *indexBlobManagerV0, dropDeleted bool) error {
|
||||
func fakeCompaction(ctx context.Context, t *testing.T, m *ManagerV0, dropDeleted bool) error {
|
||||
t.Helper()
|
||||
|
||||
t.Logf("fakeCompaction(dropDeleted=%v)", dropDeleted)
|
||||
@@ -516,7 +516,7 @@ func fakeContentID(prefix string, n int) string {
|
||||
return fmt.Sprintf("%v-%06v", prefix, n)
|
||||
}
|
||||
|
||||
func deleteFakeContents(ctx context.Context, t *testing.T, m *indexBlobManagerV0, prefix string, numWritten int, deleted map[string]bool, timeFunc func() time.Time) error {
|
||||
func deleteFakeContents(ctx context.Context, t *testing.T, m *ManagerV0, prefix string, numWritten int, deleted map[string]bool, timeFunc func() time.Time) error {
|
||||
t.Helper()
|
||||
|
||||
if numWritten == 0 {
|
||||
@@ -553,7 +553,7 @@ func deleteFakeContents(ctx context.Context, t *testing.T, m *indexBlobManagerV0
|
||||
return err
|
||||
}
|
||||
|
||||
func undeleteFakeContents(ctx context.Context, t *testing.T, m *indexBlobManagerV0, deleted map[string]bool, timeFunc func() time.Time) error {
|
||||
func undeleteFakeContents(ctx context.Context, t *testing.T, m *ManagerV0, deleted map[string]bool, timeFunc func() time.Time) error {
|
||||
t.Helper()
|
||||
|
||||
if len(deleted) == 0 {
|
||||
@@ -591,7 +591,7 @@ func undeleteFakeContents(ctx context.Context, t *testing.T, m *indexBlobManager
|
||||
return err
|
||||
}
|
||||
|
||||
func writeFakeContents(ctx context.Context, t *testing.T, m *indexBlobManagerV0, prefix string, count int, numWritten *int, timeFunc func() time.Time) error {
|
||||
func writeFakeContents(ctx context.Context, t *testing.T, m *ManagerV0, prefix string, count int, numWritten *int, timeFunc func() time.Time) error {
|
||||
t.Helper()
|
||||
|
||||
t.Logf("writeFakeContents()")
|
||||
@@ -618,7 +618,7 @@ type fakeIndexData struct {
|
||||
Entries map[string]fakeContentIndexEntry
|
||||
}
|
||||
|
||||
func writeFakeIndex(ctx context.Context, t *testing.T, m *indexBlobManagerV0, ndx map[string]fakeContentIndexEntry) ([]blob.Metadata, error) {
|
||||
func writeFakeIndex(ctx context.Context, t *testing.T, m *ManagerV0, ndx map[string]fakeContentIndexEntry) ([]blob.Metadata, error) {
|
||||
t.Helper()
|
||||
|
||||
var tmp gather.WriteBuffer
|
||||
@@ -629,7 +629,7 @@ func writeFakeIndex(ctx context.Context, t *testing.T, m *indexBlobManagerV0, nd
|
||||
Entries: ndx,
|
||||
}))
|
||||
|
||||
bms, err := m.writeIndexBlobs(ctx, []gather.Bytes{tmp.Bytes()}, "")
|
||||
bms, err := m.WriteIndexBlobs(ctx, []gather.Bytes{tmp.Bytes()}, "")
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error writing blob")
|
||||
}
|
||||
@@ -639,7 +639,7 @@ func writeFakeIndex(ctx context.Context, t *testing.T, m *indexBlobManagerV0, nd
|
||||
|
||||
var errGetAllFakeContentsRetry = errors.New("retry")
|
||||
|
||||
func getAllFakeContents(ctx context.Context, t *testing.T, m *indexBlobManagerV0) (map[string]fakeContentIndexEntry, []IndexBlobInfo, error) {
|
||||
func getAllFakeContents(ctx context.Context, t *testing.T, m *ManagerV0) (map[string]fakeContentIndexEntry, []Metadata, error) {
|
||||
t.Helper()
|
||||
|
||||
allContents, allBlobs, err := getAllFakeContentsInternal(ctx, t, m)
|
||||
@@ -651,10 +651,10 @@ func getAllFakeContents(ctx context.Context, t *testing.T, m *indexBlobManagerV0
|
||||
return allContents, allBlobs, err
|
||||
}
|
||||
|
||||
func getAllFakeContentsInternal(ctx context.Context, t *testing.T, m *indexBlobManagerV0) (map[string]fakeContentIndexEntry, []IndexBlobInfo, error) {
|
||||
func getAllFakeContentsInternal(ctx context.Context, t *testing.T, m *ManagerV0) (map[string]fakeContentIndexEntry, []Metadata, error) {
|
||||
t.Helper()
|
||||
|
||||
blobs, _, err := m.listActiveIndexBlobs(ctx)
|
||||
blobs, _, err := m.ListActiveIndexBlobs(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "error listing index blobs")
|
||||
}
|
||||
@@ -667,7 +667,7 @@ func getAllFakeContentsInternal(ctx context.Context, t *testing.T, m *indexBlobM
|
||||
defer bb.Close()
|
||||
|
||||
for _, bi := range blobs {
|
||||
err := m.getIndexBlob(ctx, bi.BlobID, &bb)
|
||||
err := m.enc.GetEncryptedBlob(ctx, bi.BlobID, &bb)
|
||||
if errors.Is(err, blob.ErrBlobNotFound) {
|
||||
return nil, nil, errGetAllFakeContentsRetry
|
||||
}
|
||||
@@ -700,8 +700,8 @@ func getAllFakeContentsInternal(ctx context.Context, t *testing.T, m *indexBlobM
|
||||
|
||||
func assertBlobCounts(t *testing.T, data blobtesting.DataMap, wantN, wantM, wantL int) {
|
||||
t.Helper()
|
||||
require.Len(t, keysWithPrefix(data, compactionLogBlobPrefix), wantM)
|
||||
require.Len(t, keysWithPrefix(data, LegacyIndexBlobPrefix), wantN)
|
||||
require.Len(t, keysWithPrefix(data, V0CompactionLogBlobPrefix), wantM)
|
||||
require.Len(t, keysWithPrefix(data, V0IndexBlobPrefix), wantN)
|
||||
require.Len(t, keysWithPrefix(data, "l"), wantL)
|
||||
}
|
||||
|
||||
@@ -717,7 +717,7 @@ func keysWithPrefix(data blobtesting.DataMap, prefix blob.ID) []blob.ID {
|
||||
return res
|
||||
}
|
||||
|
||||
func mustRegisterCompaction(t *testing.T, m *indexBlobManagerV0, inputs, outputs []blob.Metadata) {
|
||||
func mustRegisterCompaction(t *testing.T, m *ManagerV0, inputs, outputs []blob.Metadata) {
|
||||
t.Helper()
|
||||
|
||||
t.Logf("compacting %v to %v", inputs, outputs)
|
||||
@@ -728,12 +728,12 @@ func mustRegisterCompaction(t *testing.T, m *indexBlobManagerV0, inputs, outputs
|
||||
}
|
||||
}
|
||||
|
||||
func mustWriteIndexBlob(t *testing.T, m *indexBlobManagerV0, data string) blob.Metadata {
|
||||
func mustWriteIndexBlob(t *testing.T, m *ManagerV0, data string) blob.Metadata {
|
||||
t.Helper()
|
||||
|
||||
t.Logf("writing index blob %q", data)
|
||||
|
||||
blobMDs, err := m.writeIndexBlobs(testlogging.Context(t), []gather.Bytes{gather.FromSlice([]byte(data))}, "")
|
||||
blobMDs, err := m.WriteIndexBlobs(testlogging.Context(t), []gather.Bytes{gather.FromSlice([]byte(data))}, "")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to write index blob: %v", err)
|
||||
}
|
||||
@@ -741,7 +741,7 @@ func mustWriteIndexBlob(t *testing.T, m *indexBlobManagerV0, data string) blob.M
|
||||
return blobMDs[0]
|
||||
}
|
||||
|
||||
func assertIndexBlobList(t *testing.T, m *indexBlobManagerV0, wantMD ...blob.Metadata) {
|
||||
func assertIndexBlobList(t *testing.T, m *ManagerV0, wantMD ...blob.Metadata) {
|
||||
t.Helper()
|
||||
|
||||
var want []blob.ID
|
||||
@@ -749,7 +749,7 @@ func assertIndexBlobList(t *testing.T, m *indexBlobManagerV0, wantMD ...blob.Met
|
||||
want = append(want, it.BlobID)
|
||||
}
|
||||
|
||||
l, _, err := m.listActiveIndexBlobs(testlogging.Context(t))
|
||||
l, _, err := m.ListActiveIndexBlobs(testlogging.Context(t))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to list index blobs: %v", err)
|
||||
}
|
||||
@@ -764,7 +764,7 @@ func assertIndexBlobList(t *testing.T, m *indexBlobManagerV0, wantMD ...blob.Met
|
||||
require.ElementsMatch(t, got, want)
|
||||
}
|
||||
|
||||
func newIndexBlobManagerForTesting(t *testing.T, st blob.Storage, localTimeNow func() time.Time) *indexBlobManagerV0 {
|
||||
func newIndexBlobManagerForTesting(t *testing.T, st blob.Storage, localTimeNow func() time.Time) *ManagerV0 {
|
||||
t.Helper()
|
||||
|
||||
p := &format.ContentFormat{
|
||||
@@ -785,15 +785,15 @@ func newIndexBlobManagerForTesting(t *testing.T, st blob.Storage, localTimeNow f
|
||||
st = ownwrites.NewWrapper(
|
||||
st,
|
||||
blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, nil),
|
||||
[]blob.ID{LegacyIndexBlobPrefix, compactionLogBlobPrefix, cleanupBlobPrefix},
|
||||
[]blob.ID{V0IndexBlobPrefix, V0CompactionLogBlobPrefix, V0CleanupBlobPrefix},
|
||||
15*time.Minute,
|
||||
)
|
||||
|
||||
log := testlogging.Printf(t.Logf, "")
|
||||
|
||||
m := &indexBlobManagerV0{
|
||||
m := &ManagerV0{
|
||||
st: st,
|
||||
enc: &encryptedBlobMgr{
|
||||
enc: &EncryptionManager{
|
||||
st: st,
|
||||
indexBlobCache: nil,
|
||||
crypter: staticCrypter{hf, enc},
|
||||
189
repo/content/indexblob/index_blob_manager_v1.go
Normal file
189
repo/content/indexblob/index_blob_manager_v1.go
Normal file
@@ -0,0 +1,189 @@
|
||||
package indexblob
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobcrypto"
|
||||
"github.com/kopia/kopia/internal/epoch"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/content/index"
|
||||
"github.com/kopia/kopia/repo/logging"
|
||||
)
|
||||
|
||||
// ManagerV1 is the append-only implementation of indexblob.Manager
|
||||
// based on epoch manager.
|
||||
type ManagerV1 struct {
|
||||
st blob.Storage
|
||||
enc *EncryptionManager
|
||||
timeNow func() time.Time
|
||||
formattingOptions IndexFormattingOptions
|
||||
log logging.Logger
|
||||
|
||||
EpochMgr *epoch.Manager
|
||||
}
|
||||
|
||||
// ListIndexBlobInfos list active blob info structs. Also returns time of latest content deletion commit.
|
||||
func (m *ManagerV1) ListIndexBlobInfos(ctx context.Context) ([]Metadata, time.Time, error) {
|
||||
return m.ListActiveIndexBlobs(ctx)
|
||||
}
|
||||
|
||||
// ListActiveIndexBlobs lists the metadata for active index blobs and returns the cut-off time
|
||||
// before which all deleted index entries should be treated as non-existent.
|
||||
func (m *ManagerV1) ListActiveIndexBlobs(ctx context.Context) ([]Metadata, time.Time, error) {
|
||||
active, deletionWatermark, err := m.EpochMgr.GetCompleteIndexSet(ctx, epoch.LatestEpoch)
|
||||
if err != nil {
|
||||
return nil, time.Time{}, errors.Wrap(err, "error getting index set")
|
||||
}
|
||||
|
||||
var result []Metadata
|
||||
|
||||
for _, bm := range active {
|
||||
result = append(result, Metadata{Metadata: bm})
|
||||
}
|
||||
|
||||
m.log.Errorf("active indexes %v deletion watermark %v", blob.IDsFromMetadata(active), deletionWatermark)
|
||||
|
||||
return result, deletionWatermark, nil
|
||||
}
|
||||
|
||||
// Invalidate clears any read caches.
|
||||
func (m *ManagerV1) Invalidate(ctx context.Context) {
|
||||
m.EpochMgr.Invalidate()
|
||||
}
|
||||
|
||||
// Compact advances the deletion watermark.
|
||||
func (m *ManagerV1) Compact(ctx context.Context, opt CompactOptions) error {
|
||||
if opt.DropDeletedBefore.IsZero() {
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.Wrap(m.EpochMgr.AdvanceDeletionWatermark(ctx, opt.DropDeletedBefore), "error advancing deletion watermark")
|
||||
}
|
||||
|
||||
// CompactEpoch compacts the provided index blobs and writes a new set of blobs.
|
||||
func (m *ManagerV1) CompactEpoch(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error {
|
||||
tmpbld := make(index.Builder)
|
||||
|
||||
for _, indexBlob := range blobIDs {
|
||||
if err := addIndexBlobsToBuilder(ctx, m.enc, tmpbld, indexBlob); err != nil {
|
||||
return errors.Wrap(err, "error adding index to builder")
|
||||
}
|
||||
}
|
||||
|
||||
mp, mperr := m.formattingOptions.GetMutableParameters()
|
||||
if mperr != nil {
|
||||
return errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
|
||||
dataShards, cleanupShards, err := tmpbld.BuildShards(mp.IndexVersion, true, DefaultIndexShardSize)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to build index dataShards")
|
||||
}
|
||||
|
||||
defer cleanupShards()
|
||||
|
||||
var rnd [8]byte
|
||||
|
||||
if _, err := rand.Read(rnd[:]); err != nil {
|
||||
return errors.Wrap(err, "error getting random session ID")
|
||||
}
|
||||
|
||||
sessionID := fmt.Sprintf("s%x-c%v", rnd[:], len(dataShards))
|
||||
|
||||
var data2 gather.WriteBuffer
|
||||
defer data2.Close()
|
||||
|
||||
for _, data := range dataShards {
|
||||
data2.Reset()
|
||||
|
||||
blobID, err := blobcrypto.Encrypt(m.enc.crypter, data, outputPrefix, blob.ID(sessionID), &data2)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error encrypting")
|
||||
}
|
||||
|
||||
if err := m.st.PutBlob(ctx, blobID, data2.Bytes(), blob.PutOptions{}); err != nil {
|
||||
return errors.Wrap(err, "error writing index blob")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteIndexBlobs writes the provided data shards into new index blobs oprionally appending the provided suffix.
|
||||
// The writes are atomic in the sense that if any of them fails, the reader will
|
||||
// ignore all of the indexes that share the same suffix.
|
||||
func (m *ManagerV1) WriteIndexBlobs(ctx context.Context, dataShards []gather.Bytes, suffix blob.ID) ([]blob.Metadata, error) {
|
||||
shards := map[blob.ID]blob.Bytes{}
|
||||
|
||||
suffix = blob.ID(fmt.Sprintf("%v-c%v", suffix, len(dataShards)))
|
||||
|
||||
for _, data := range dataShards {
|
||||
// important - we're intentionally using data2 in the inner loop scheduling multiple Close()
|
||||
// we want all Close() to be called at the end of the function after WriteIndex()
|
||||
data2 := gather.NewWriteBuffer()
|
||||
defer data2.Close() //nolint:gocritic
|
||||
|
||||
unprefixedBlobID, err := blobcrypto.Encrypt(m.enc.crypter, data, "", suffix, data2)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error encrypting")
|
||||
}
|
||||
|
||||
shards[unprefixedBlobID] = data2.Bytes()
|
||||
}
|
||||
|
||||
//nolint:wrapcheck
|
||||
return m.EpochMgr.WriteIndex(ctx, shards)
|
||||
}
|
||||
|
||||
// EpochManager returns the epoch manager.
|
||||
func (m *ManagerV1) EpochManager() *epoch.Manager {
|
||||
return m.EpochMgr
|
||||
}
|
||||
|
||||
// PrepareUpgradeToIndexBlobManagerV1 prepares the repository for migrating to IndexBlobManagerV1.
|
||||
func (m *ManagerV1) PrepareUpgradeToIndexBlobManagerV1(ctx context.Context, params epoch.Parameters, v0 *ManagerV0) error {
|
||||
ibl, _, err := v0.ListActiveIndexBlobs(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error listing active index blobs")
|
||||
}
|
||||
|
||||
var blobIDs []blob.ID
|
||||
|
||||
for _, ib := range ibl {
|
||||
blobIDs = append(blobIDs, ib.BlobID)
|
||||
}
|
||||
|
||||
if err := m.CompactEpoch(ctx, blobIDs, epoch.UncompactedEpochBlobPrefix(epoch.FirstEpoch)); err != nil {
|
||||
return errors.Wrap(err, "unable to generate initial epoch")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewManagerV1 creates new instance of ManagerV1 with all required parameters set.
|
||||
func NewManagerV1(
|
||||
st blob.Storage,
|
||||
enc *EncryptionManager,
|
||||
epochMgr *epoch.Manager,
|
||||
timeNow func() time.Time,
|
||||
formattingOptions IndexFormattingOptions,
|
||||
log logging.Logger,
|
||||
) *ManagerV1 {
|
||||
return &ManagerV1{
|
||||
st: st,
|
||||
enc: enc,
|
||||
timeNow: timeNow,
|
||||
log: log,
|
||||
formattingOptions: formattingOptions,
|
||||
|
||||
EpochMgr: epochMgr,
|
||||
}
|
||||
}
|
||||
|
||||
var _ Manager = (*ManagerV1)(nil)
|
||||
@@ -1,217 +0,0 @@
|
||||
package content
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/zaplogutil"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
)
|
||||
|
||||
const blobLoggerFlushThreshold = 4 << 20
|
||||
|
||||
// TextLogBlobPrefix is a prefix given to text logs stored in repositor.
|
||||
const TextLogBlobPrefix = "_log_"
|
||||
|
||||
type internalLogManager struct {
|
||||
enabled atomic.Bool // set by enable(), logger is ineffective until called
|
||||
|
||||
// internalLogManager implements io.Writer and we must be able to write to the
|
||||
// repository asynchronously when the context is not provided.
|
||||
ctx context.Context //nolint:containedctx
|
||||
|
||||
st blob.Storage
|
||||
bc crypter
|
||||
wg sync.WaitGroup
|
||||
timeFunc func() time.Time
|
||||
flushThreshold int
|
||||
}
|
||||
|
||||
// Close closes the log manager.
|
||||
func (m *internalLogManager) Close(ctx context.Context) {
|
||||
m.wg.Wait()
|
||||
}
|
||||
|
||||
func (m *internalLogManager) encryptAndWriteLogBlob(prefix blob.ID, data gather.Bytes, closeFunc func()) {
|
||||
encrypted := gather.NewWriteBuffer()
|
||||
// Close happens in a goroutine
|
||||
|
||||
blobID, err := EncryptBLOB(m.bc, data, prefix, "", encrypted)
|
||||
if err != nil {
|
||||
encrypted.Close()
|
||||
|
||||
// this should not happen, also nothing can be done about this, we're not in a place where we can return error, log it.
|
||||
return
|
||||
}
|
||||
|
||||
b := encrypted.Bytes()
|
||||
|
||||
m.wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer m.wg.Done()
|
||||
defer encrypted.Close()
|
||||
defer closeFunc()
|
||||
|
||||
if err := m.st.PutBlob(m.ctx, blobID, b, blob.PutOptions{}); err != nil {
|
||||
// nothing can be done about this, we're not in a place where we can return error, log it.
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// NewLogger creates new logger.
|
||||
func (m *internalLogManager) NewLogger() *zap.SugaredLogger {
|
||||
var rnd [2]byte
|
||||
|
||||
rand.Read(rnd[:]) //nolint:errcheck
|
||||
|
||||
w := &internalLogger{
|
||||
m: m,
|
||||
prefix: blob.ID(fmt.Sprintf("%v%v_%x", TextLogBlobPrefix, clock.Now().Local().Format("20060102150405"), rnd)),
|
||||
}
|
||||
|
||||
return zap.New(zapcore.NewCore(
|
||||
zaplogutil.NewStdConsoleEncoder(zaplogutil.StdConsoleEncoderConfig{
|
||||
TimeLayout: zaplogutil.PreciseLayout,
|
||||
LocalTime: false,
|
||||
}),
|
||||
w, zap.DebugLevel), zap.WithClock(zaplogutil.Clock())).Sugar()
|
||||
}
|
||||
|
||||
// internalLogger represents a single log session that saves log files as blobs in the repository.
|
||||
// The logger starts disabled and to actually persist logs enable() must be called.
|
||||
type internalLogger struct {
|
||||
nextChunkNumber atomic.Int32
|
||||
|
||||
m *internalLogManager
|
||||
mu sync.Mutex
|
||||
|
||||
// +checklocks:mu
|
||||
buf *gather.WriteBuffer
|
||||
// +checklocks:mu
|
||||
gzw *gzip.Writer
|
||||
|
||||
// +checklocks:mu
|
||||
startTime int64 // unix timestamp of the first log
|
||||
|
||||
prefix blob.ID // +checklocksignore
|
||||
}
|
||||
|
||||
func (m *internalLogManager) enable() {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.enabled.Store(true)
|
||||
}
|
||||
|
||||
func (l *internalLogger) Write(b []byte) (int, error) {
|
||||
l.maybeEncryptAndWriteChunkUnlocked(l.addAndMaybeFlush(b))
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
func (l *internalLogger) maybeEncryptAndWriteChunkUnlocked(data gather.Bytes, closeFunc func()) {
|
||||
if data.Length() == 0 {
|
||||
closeFunc()
|
||||
return
|
||||
}
|
||||
|
||||
if !l.m.enabled.Load() {
|
||||
closeFunc()
|
||||
return
|
||||
}
|
||||
|
||||
endTime := l.m.timeFunc().Unix()
|
||||
|
||||
l.mu.Lock()
|
||||
st := l.startTime
|
||||
l.mu.Unlock()
|
||||
|
||||
prefix := blob.ID(fmt.Sprintf("%v_%v_%v_%v_", l.prefix, st, endTime, l.nextChunkNumber.Add(1)))
|
||||
|
||||
l.m.encryptAndWriteLogBlob(prefix, data, closeFunc)
|
||||
}
|
||||
|
||||
func (l *internalLogger) addAndMaybeFlush(b []byte) (payload gather.Bytes, closeFunc func()) {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
w := l.ensureWriterInitializedLocked()
|
||||
|
||||
_, err := w.Write(b)
|
||||
l.logUnexpectedError(err)
|
||||
|
||||
if l.buf.Length() < l.m.flushThreshold {
|
||||
return gather.Bytes{}, func() {}
|
||||
}
|
||||
|
||||
return l.flushAndResetLocked()
|
||||
}
|
||||
|
||||
// +checklocks:l.mu
|
||||
func (l *internalLogger) ensureWriterInitializedLocked() io.Writer {
|
||||
if l.gzw == nil {
|
||||
l.buf = gather.NewWriteBuffer()
|
||||
l.gzw = gzip.NewWriter(l.buf)
|
||||
l.startTime = l.m.timeFunc().Unix()
|
||||
}
|
||||
|
||||
return l.gzw
|
||||
}
|
||||
|
||||
// +checklocks:l.mu
|
||||
func (l *internalLogger) flushAndResetLocked() (payload gather.Bytes, closeFunc func()) {
|
||||
if l.gzw == nil {
|
||||
return gather.Bytes{}, func() {}
|
||||
}
|
||||
|
||||
l.logUnexpectedError(l.gzw.Flush())
|
||||
l.logUnexpectedError(l.gzw.Close())
|
||||
|
||||
closeBuf := l.buf.Close
|
||||
res := l.buf.Bytes()
|
||||
|
||||
l.buf = nil
|
||||
l.gzw = nil
|
||||
|
||||
return res, closeBuf
|
||||
}
|
||||
|
||||
func (l *internalLogger) logUnexpectedError(err error) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (l *internalLogger) Sync() error {
|
||||
l.mu.Lock()
|
||||
data, closeFunc := l.flushAndResetLocked()
|
||||
l.mu.Unlock()
|
||||
|
||||
l.maybeEncryptAndWriteChunkUnlocked(data, closeFunc)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// newInternalLogManager creates a new blobLogManager that will emit logs as repository blobs with a given prefix.
|
||||
func newInternalLogManager(ctx context.Context, st blob.Storage, bc crypter) *internalLogManager {
|
||||
return &internalLogManager{
|
||||
ctx: ctx,
|
||||
st: st,
|
||||
bc: bc,
|
||||
flushThreshold: blobLoggerFlushThreshold,
|
||||
timeFunc: clock.Now,
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobcrypto"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
)
|
||||
@@ -111,7 +112,7 @@ func (bm *WriteManager) writeSessionMarkerLocked(ctx context.Context) error {
|
||||
var encrypted gather.WriteBuffer
|
||||
defer encrypted.Close()
|
||||
|
||||
sessionBlobID, err := EncryptBLOB(bm.format, gather.FromSlice(js), BlobIDPrefixSession, bm.currentSessionInfo.ID, &encrypted)
|
||||
sessionBlobID, err := blobcrypto.Encrypt(bm.format, gather.FromSlice(js), BlobIDPrefixSession, blob.ID(bm.currentSessionInfo.ID), &encrypted)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to encrypt session marker")
|
||||
}
|
||||
@@ -178,7 +179,7 @@ func (bm *WriteManager) ListActiveSessions(ctx context.Context) (map[SessionID]*
|
||||
return nil, errors.Wrapf(err, "error loading session: %v", b.BlobID)
|
||||
}
|
||||
|
||||
err = DecryptBLOB(bm.format, payload.Bytes(), b.BlobID, &decrypted)
|
||||
err = blobcrypto.Decrypt(bm.format, payload.Bytes(), b.BlobID, &decrypted)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error decrypting session: %v", b.BlobID)
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
)
|
||||
|
||||
// DropDeletedContents rewrites indexes while dropping deleted contents above certain age.
|
||||
@@ -13,7 +13,7 @@ func DropDeletedContents(ctx context.Context, rep repo.DirectRepositoryWriter, d
|
||||
log(ctx).Infof("Dropping contents deleted before %v", dropDeletedBefore)
|
||||
|
||||
//nolint:wrapcheck
|
||||
return rep.ContentManager().CompactIndexes(ctx, content.CompactOptions{
|
||||
return rep.ContentManager().CompactIndexes(ctx, indexblob.CompactOptions{
|
||||
AllIndexes: true,
|
||||
DropDeletedBefore: dropDeletedBefore,
|
||||
DisableEventualConsistencySafety: safety.DisableEventualConsistencySafety,
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
)
|
||||
|
||||
// runTaskIndexCompactionQuick rewrites index blobs to reduce their count but does not drop any contents.
|
||||
@@ -14,7 +14,7 @@ func runTaskIndexCompactionQuick(ctx context.Context, runParams RunParameters, s
|
||||
const maxSmallBlobsForIndexCompaction = 8
|
||||
|
||||
//nolint:wrapcheck
|
||||
return runParams.rep.ContentManager().CompactIndexes(ctx, content.CompactOptions{
|
||||
return runParams.rep.ContentManager().CompactIndexes(ctx, indexblob.CompactOptions{
|
||||
MaxSmallBlobs: maxSmallBlobsForIndexCompaction,
|
||||
DisableEventualConsistencySafety: safety.DisableEventualConsistencySafety,
|
||||
})
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
"github.com/kopia/kopia/repo/blob/storagemetrics"
|
||||
"github.com/kopia/kopia/repo/blob/throttling"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
"github.com/kopia/kopia/repo/format"
|
||||
"github.com/kopia/kopia/repo/logging"
|
||||
"github.com/kopia/kopia/repo/manifest"
|
||||
@@ -386,7 +387,7 @@ func wrapLockingStorage(st blob.Storage, r format.BlobStorageConfiguration) blob
|
||||
prefixes = append(prefixes, string(prefix))
|
||||
}
|
||||
|
||||
prefixes = append(prefixes, content.LegacyIndexBlobPrefix, epoch.EpochManagerIndexUberPrefix, format.KopiaRepositoryBlobID,
|
||||
prefixes = append(prefixes, indexblob.V0IndexBlobPrefix, epoch.EpochManagerIndexUberPrefix, format.KopiaRepositoryBlobID,
|
||||
format.KopiaBlobCfgBlobID)
|
||||
|
||||
return beforeop.NewWrapper(st, nil, nil, nil, func(ctx context.Context, id blob.ID, opts *blob.PutOptions) error {
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/blob/throttling"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
"github.com/kopia/kopia/repo/format"
|
||||
"github.com/kopia/kopia/repo/manifest"
|
||||
"github.com/kopia/kopia/repo/object"
|
||||
@@ -64,7 +65,7 @@ type DirectRepository interface {
|
||||
BlobReader() blob.Reader
|
||||
BlobVolume() blob.Volume
|
||||
ContentReader() content.Reader
|
||||
IndexBlobs(ctx context.Context, includeInactive bool) ([]content.IndexBlobInfo, error)
|
||||
IndexBlobs(ctx context.Context, includeInactive bool) ([]indexblob.Metadata, error)
|
||||
NewDirectWriter(ctx context.Context, opt WriteSessionOptions) (context.Context, DirectRepositoryWriter, error)
|
||||
AlsoLogToContentLog(ctx context.Context) context.Context
|
||||
UniqueID() []byte
|
||||
@@ -344,7 +345,7 @@ func (r *directRepository) ContentReader() content.Reader {
|
||||
}
|
||||
|
||||
// IndexBlobs returns the index blobs in use.
|
||||
func (r *directRepository) IndexBlobs(ctx context.Context, includeInactive bool) ([]content.IndexBlobInfo, error) {
|
||||
func (r *directRepository) IndexBlobs(ctx context.Context, includeInactive bool) ([]indexblob.Metadata, error) {
|
||||
//nolint:wrapcheck
|
||||
return r.cmgr.IndexBlobs(ctx, includeInactive)
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/blob/beforeop"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
"github.com/kopia/kopia/repo/format"
|
||||
"github.com/kopia/kopia/repo/object"
|
||||
)
|
||||
@@ -131,7 +132,7 @@ func (s *formatSpecificTestSuite) TestPackingSimple(t *testing.T) {
|
||||
verify(ctx, t, env.RepositoryWriter, oid2a, []byte(content2), "packed-object-2")
|
||||
verify(ctx, t, env.RepositoryWriter, oid3a, []byte(content3), "packed-object-3")
|
||||
|
||||
if err := env.RepositoryWriter.ContentManager().CompactIndexes(ctx, content.CompactOptions{MaxSmallBlobs: 1}); err != nil {
|
||||
if err := env.RepositoryWriter.ContentManager().CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}); err != nil {
|
||||
t.Errorf("optimize error: %v", err)
|
||||
}
|
||||
|
||||
@@ -141,7 +142,7 @@ func (s *formatSpecificTestSuite) TestPackingSimple(t *testing.T) {
|
||||
verify(ctx, t, env.RepositoryWriter, oid2a, []byte(content2), "packed-object-2")
|
||||
verify(ctx, t, env.RepositoryWriter, oid3a, []byte(content3), "packed-object-3")
|
||||
|
||||
if err := env.RepositoryWriter.ContentManager().CompactIndexes(ctx, content.CompactOptions{MaxSmallBlobs: 1}); err != nil {
|
||||
if err := env.RepositoryWriter.ContentManager().CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}); err != nil {
|
||||
t.Errorf("optimize error: %v", err)
|
||||
}
|
||||
|
||||
@@ -570,7 +571,7 @@ func TestObjectWritesWithRetention(t *testing.T) {
|
||||
prefixesWithRetention = append(prefixesWithRetention, string(prefix))
|
||||
}
|
||||
|
||||
prefixesWithRetention = append(prefixesWithRetention, content.LegacyIndexBlobPrefix, epoch.EpochManagerIndexUberPrefix,
|
||||
prefixesWithRetention = append(prefixesWithRetention, indexblob.V0IndexBlobPrefix, epoch.EpochManagerIndexUberPrefix,
|
||||
format.KopiaRepositoryBlobID, format.KopiaBlobCfgBlobID)
|
||||
|
||||
// make sure that we cannot set mtime on the kopia objects created due to the
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob/filesystem"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
"github.com/kopia/kopia/repo/logging"
|
||||
"github.com/kopia/kopia/tests/repository_stress_test/repomodel"
|
||||
)
|
||||
@@ -444,7 +445,7 @@ func compact(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.R
|
||||
log.Debugf("compact()")
|
||||
|
||||
return errors.Wrapf(
|
||||
r.ContentManager().CompactIndexes(ctx, content.CompactOptions{MaxSmallBlobs: 1}),
|
||||
r.ContentManager().CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}),
|
||||
"compact()")
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user