refactor(repository): refactor internal/repolog package to support writing metrics in the future (#3610)

* renamed internal/repolog to internal/repodiag

* refactored initialization

* additional tests

* linter fixes
This commit is contained in:
Jarek Kowalski
2024-02-02 22:19:24 -08:00
committed by GitHub
parent 37da48b641
commit c478141bbf
14 changed files with 323 additions and 93 deletions

View File

@@ -5,7 +5,7 @@
"strings"
"github.com/kopia/kopia/internal/epoch"
"github.com/kopia/kopia/internal/repolog"
"github.com/kopia/kopia/internal/repodiag"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content/indexblob"
@@ -66,7 +66,7 @@ func (c *commandBlobList) shouldInclude(b blob.Metadata) bool {
return false
}
if strings.HasPrefix(string(b.BlobID), repolog.BlobPrefix) {
if strings.HasPrefix(string(b.BlobID), repodiag.LogBlobPrefix) {
return false
}

View File

@@ -11,7 +11,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/repolog"
"github.com/kopia/kopia/internal/repodiag"
"github.com/kopia/kopia/repo/blob"
)
@@ -70,7 +70,7 @@ func getLogSessions(ctx context.Context, st blob.Reader) ([]*logSessionInfo, err
var allSessions []*logSessionInfo
if err := st.ListBlobs(ctx, repolog.BlobPrefix, func(bm blob.Metadata) error {
if err := st.ListBlobs(ctx, repodiag.LogBlobPrefix, func(bm blob.Metadata) error {
parts := strings.Split(string(bm.BlobID), "_")
//nolint:gomnd

View File

@@ -23,7 +23,7 @@ func TestBlobCrypto(t *testing.T) {
enc, err := encryption.CreateEncryptor(f)
require.NoError(t, err)
cr := staticCrypter{hf, enc}
cr := StaticCrypter{hf, enc}
var tmp, tmp2, tmp3 gather.WriteBuffer
defer tmp.Close()
@@ -70,7 +70,7 @@ func (badEncryptor) Decrypt(input gather.Bytes, contentID []byte, output *gather
func (badEncryptor) Overhead() int { return 0 }
func TestBlobCrypto_Invalid(t *testing.T) {
cr := staticCrypter{
cr := StaticCrypter{
func(output []byte, data gather.Bytes) []byte {
// invalid hash
return append(output, 9, 9, 9, 9)
@@ -95,21 +95,8 @@ func(output []byte, data gather.Bytes) []byte {
hf, err := hashing.CreateHashFunc(f)
require.NoError(t, err)
cr.h = hf
cr.Hash = hf
_, err = Encrypt(cr, gather.FromSlice([]byte{1, 2, 3}), "n", "mysessionid", &tmp)
require.Error(t, err)
}
type staticCrypter struct {
h hashing.HashFunc
e encryption.Encryptor
}
func (p staticCrypter) Encryptor() encryption.Encryptor {
return p.e
}
func (p staticCrypter) HashFunc() hashing.HashFunc {
return p.h
}

View File

@@ -0,0 +1,24 @@
package blobcrypto
import (
"github.com/kopia/kopia/repo/encryption"
"github.com/kopia/kopia/repo/hashing"
)
// StaticCrypter implements Crypter interface with static hash and encryption functions.
type StaticCrypter struct {
Hash hashing.HashFunc
Encryption encryption.Encryptor
}
// Encryptor returns the encryption algorithm.
func (p StaticCrypter) Encryptor() encryption.Encryptor {
return p.Encryption
}
// HashFunc returns the hashing algorithm.
func (p StaticCrypter) HashFunc() hashing.HashFunc {
return p.Hash
}
var _ Crypter = (*StaticCrypter)(nil)

View File

@@ -0,0 +1,67 @@
package repodiag
import (
"context"
"sync"
"github.com/kopia/kopia/internal/blobcrypto"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/logging"
)
var log = logging.Module("repodiag")
// Writer manages encryption and asynchronous writing of diagnostic blobs to the repository.
type Writer struct {
st blob.Storage
bc blobcrypto.Crypter
wg sync.WaitGroup
}
// EncryptAndWriteBlobAsync encrypts given content and writes it to the repository asynchronously,
// folllowed by calling the provided closeFunc.
func (w *Writer) EncryptAndWriteBlobAsync(ctx context.Context, prefix blob.ID, data gather.Bytes, closeFunc func()) {
encrypted := gather.NewWriteBuffer()
// Close happens in a goroutine
blobID, err := blobcrypto.Encrypt(w.bc, data, prefix, "", encrypted)
if err != nil {
encrypted.Close()
// this should not happen, also nothing can be done about this, we're not in a place where we can return error, log it.
log(ctx).Warnf("unable to encrypt diagnostics blob: %v", err)
return
}
b := encrypted.Bytes()
w.wg.Add(1)
go func() {
defer w.wg.Done()
defer encrypted.Close()
defer closeFunc()
if err := w.st.PutBlob(ctx, blobID, b, blob.PutOptions{}); err != nil {
// nothing can be done about this, we're not in a place where we can return error, log it.
log(ctx).Warnf("unable to write diagnostics blob: %v", err)
return
}
}()
}
// Wait waits for all the writes to complete.
func (w *Writer) Wait(ctx context.Context) error {
w.wg.Wait()
return nil
}
// NewWriter creates a new writer.
func NewWriter(
st blob.Storage,
bc blobcrypto.Crypter,
) *Writer {
return &Writer{st: st, bc: bc}
}

View File

@@ -0,0 +1,94 @@
package repodiag_test
import (
"testing"
"time"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/blobcrypto"
"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/repodiag"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/encryption"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/hashing"
)
func TestDiagWriter(t *testing.T) {
d := blobtesting.DataMap{}
st := blobtesting.NewMapStorage(d, nil, nil)
fs := blobtesting.NewFaultyStorage(st)
w := repodiag.NewWriter(fs, newStaticCrypter(t))
ctx := testlogging.Context(t)
closeCalled1 := make(chan struct{})
closeCalled2 := make(chan struct{})
w.EncryptAndWriteBlobAsync(ctx, "prefix1_", gather.FromSlice([]byte{1, 2, 3}), func() {
close(closeCalled1)
})
w.EncryptAndWriteBlobAsync(ctx, "prefix2_", gather.FromSlice([]byte{2, 3, 4}), func() {
close(closeCalled2)
})
<-closeCalled1
<-closeCalled2
// simulate write failure
someErr := errors.Errorf("some error")
fs.AddFault(blobtesting.MethodPutBlob).ErrorInstead(someErr)
closeCalled3 := make(chan struct{})
w.EncryptAndWriteBlobAsync(ctx, "prefix3_", gather.FromSlice([]byte{1}), func() {
close(closeCalled3)
})
<-closeCalled3
// blob IDs are deterministic based on content
require.Len(t, d, 2)
require.Contains(t, d, blob.ID("prefix1_11c0e79b71c3976ccd0c02d1310e2516"))
require.Contains(t, d, blob.ID("prefix2_24ff687b6ca564bd005a99420c90a0db"))
t0 := clock.Now()
w.EncryptAndWriteBlobAsync(ctx, "prefix4_", gather.FromSlice([]byte{3, 4, 5}), func() {
time.Sleep(1100 * time.Millisecond)
})
// make sure close waits for all async writes to complete
w.Wait(ctx)
require.Greater(t, clock.Now().Sub(t0), time.Second)
}
func newStaticCrypter(t *testing.T) blobcrypto.Crypter {
t.Helper()
p := &format.ContentFormat{
Encryption: encryption.DefaultAlgorithm,
Hash: hashing.DefaultAlgorithm,
}
enc, err := encryption.CreateEncryptor(p)
if err != nil {
t.Fatalf("unable to create encryptor: %v", err)
}
hf, err := hashing.CreateHashFunc(p)
if err != nil {
t.Fatalf("unable to create hash: %v", err)
}
return blobcrypto.StaticCrypter{
Hash: hf,
Encryption: enc,
}
}

View File

@@ -1,4 +1,4 @@
package repolog
package repodiag
import (
"compress/gzip"
@@ -31,7 +31,10 @@ type internalLogger struct {
}
func (l *internalLogger) Write(b []byte) (int, error) {
l.maybeEncryptAndWriteChunkUnlocked(l.addAndMaybeFlush(b))
if l != nil {
l.maybeEncryptAndWriteChunkUnlocked(l.addAndMaybeFlush(b))
}
return len(b), nil
}

View File

@@ -1,18 +1,16 @@
// Package repolog manages logs in the repository.
package repolog
// Package repodiag manages logs and metrics in the repository.
package repodiag
import (
"context"
"crypto/rand"
"fmt"
"sync"
"sync/atomic"
"time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/kopia/kopia/internal/blobcrypto"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/zaplogutil"
@@ -21,8 +19,8 @@
const blobLoggerFlushThreshold = 4 << 20
// BlobPrefix is a prefix given to text logs stored in repository.
const BlobPrefix = "_log_"
// LogBlobPrefix is a prefix given to text logs stored in repository.
const LogBlobPrefix = "_log_"
// LogManager manages writing encrypted log blobs to the repository.
type LogManager struct {
@@ -32,55 +30,29 @@ type LogManager struct {
// repository asynchronously when the context is not provided.
ctx context.Context //nolint:containedctx
st blob.Storage
bc blobcrypto.Crypter
wg sync.WaitGroup
writer *Writer
timeFunc func() time.Time
flushThreshold int
}
// Close closes the log manager.
func (m *LogManager) Close(ctx context.Context) {
m.wg.Wait()
}
func (m *LogManager) encryptAndWriteLogBlob(prefix blob.ID, data gather.Bytes, closeFunc func()) {
encrypted := gather.NewWriteBuffer()
// Close happens in a goroutine
blobID, err := blobcrypto.Encrypt(m.bc, data, prefix, "", encrypted)
if err != nil {
encrypted.Close()
// this should not happen, also nothing can be done about this, we're not in a place where we can return error, log it.
return
}
b := encrypted.Bytes()
m.wg.Add(1)
go func() {
defer m.wg.Done()
defer encrypted.Close()
defer closeFunc()
if err := m.st.PutBlob(m.ctx, blobID, b, blob.PutOptions{}); err != nil {
// nothing can be done about this, we're not in a place where we can return error, log it.
return
}
}()
m.writer.EncryptAndWriteBlobAsync(m.ctx, prefix, data, closeFunc)
}
// NewLogger creates new logger.
func (m *LogManager) NewLogger() *zap.SugaredLogger {
if m == nil {
return zap.NewNop().Sugar()
}
var rnd [2]byte
rand.Read(rnd[:]) //nolint:errcheck
w := &internalLogger{
m: m,
prefix: blob.ID(fmt.Sprintf("%v%v_%x", BlobPrefix, clock.Now().Local().Format("20060102150405"), rnd)),
prefix: blob.ID(fmt.Sprintf("%v%v_%x", LogBlobPrefix, clock.Now().Local().Format("20060102150405"), rnd)),
}
return zap.New(zapcore.NewCore(
@@ -101,11 +73,10 @@ func (m *LogManager) Enable() {
}
// NewLogManager creates a new LogManager that will emit logs as repository blobs.
func NewLogManager(ctx context.Context, st blob.Storage, bc blobcrypto.Crypter) *LogManager {
func NewLogManager(ctx context.Context, w *Writer) *LogManager {
return &LogManager{
ctx: ctx,
st: st,
bc: bc,
writer: w,
flushThreshold: blobLoggerFlushThreshold,
timeFunc: clock.Now,
}

View File

@@ -0,0 +1,95 @@
package repodiag_test
import (
"crypto/rand"
"encoding/hex"
"strings"
"testing"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/repodiag"
"github.com/kopia/kopia/internal/testlogging"
)
func TestLogManager_Enabled(t *testing.T) {
d := blobtesting.DataMap{}
st := blobtesting.NewMapStorage(d, nil, nil)
w := repodiag.NewWriter(st, newStaticCrypter(t))
ctx := testlogging.Context(t)
lm := repodiag.NewLogManager(ctx, w)
lm.Enable()
l := lm.NewLogger()
l.Infof("hello")
require.Len(t, d, 0)
l.Sync()
w.Wait(ctx)
// make sure log messages are written
require.Len(t, d, 1)
// make sure blob ID is prefixed
for k := range d {
require.True(t, strings.HasPrefix(string(k), repodiag.LogBlobPrefix))
}
}
func TestLogManager_AutoFlush(t *testing.T) {
d := blobtesting.DataMap{}
st := blobtesting.NewMapStorage(d, nil, nil)
w := repodiag.NewWriter(st, newStaticCrypter(t))
ctx := testlogging.Context(t)
lm := repodiag.NewLogManager(ctx, w)
lm.Enable()
l := lm.NewLogger()
// flush happens after 4 << 20 bytes (4MB) after compression,
// write ~10MB of base16 data which compresses to ~5MB and writes 1 blob
for i := 0; i < 5000; i++ {
var b [1024]byte
rand.Read(b[:])
l.Infof(hex.EncodeToString(b[:]))
}
w.Wait(ctx)
require.Equal(t, 1, len(d))
l.Sync()
w.Wait(ctx)
require.Equal(t, 2, len(d))
}
func TestLogManager_NotEnabled(t *testing.T) {
d := blobtesting.DataMap{}
st := blobtesting.NewMapStorage(d, nil, nil)
w := repodiag.NewWriter(st, newStaticCrypter(t))
ctx := testlogging.Context(t)
lm := repodiag.NewLogManager(ctx, w)
l := lm.NewLogger()
l.Infof("hello")
require.Len(t, d, 0)
l.Sync()
w.Wait(ctx)
// make sure log messages are not written
require.Len(t, d, 0)
}
func TestLogManager_Null(t *testing.T) {
var lm *repodiag.LogManager
lm.Enable()
l := lm.NewLogger()
l.Infof("hello")
l.Sync()
}

View File

@@ -18,7 +18,7 @@
"github.com/kopia/kopia/internal/listcache"
"github.com/kopia/kopia/internal/metrics"
"github.com/kopia/kopia/internal/ownwrites"
"github.com/kopia/kopia/internal/repolog"
"github.com/kopia/kopia/internal/repodiag"
"github.com/kopia/kopia/internal/timetrack"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/filesystem"
@@ -106,7 +106,7 @@ type SharedManager struct {
// logger associated with the context that opened the repository.
contextLogger logging.Logger
repoLogManager *repolog.LogManager
repoLogManager *repodiag.LogManager
internalLogger *zap.SugaredLogger // backing logger for 'sharedBaseLogger'
metricsStruct
@@ -577,14 +577,8 @@ func (sm *SharedManager) CloseShared(ctx context.Context) error {
sm.internalLogger.Sync() //nolint:errcheck
}
sm.repoLogManager.Close(ctx)
sm.indexBlobManagerV1.EpochManager().Flush()
if err := sm.st.Close(ctx); err != nil {
return errors.Wrap(err, "error closing storage")
}
return nil
}
@@ -612,7 +606,7 @@ func (sm *SharedManager) PrepareUpgradeToIndexBlobManagerV1(ctx context.Context)
}
// NewSharedManager returns SharedManager that is used by SessionWriteManagers on top of a repository.
func NewSharedManager(ctx context.Context, st blob.Storage, prov format.Provider, caching *CachingOptions, opts *ManagerOptions, mr *metrics.Registry) (*SharedManager, error) {
func NewSharedManager(ctx context.Context, st blob.Storage, prov format.Provider, caching *CachingOptions, opts *ManagerOptions, repoLogManager *repodiag.LogManager, mr *metrics.Registry) (*SharedManager, error) {
opts = opts.CloneOrDefault()
if opts.TimeNow == nil {
opts.TimeNow = clock.Now
@@ -628,7 +622,7 @@ func NewSharedManager(ctx context.Context, st blob.Storage, prov format.Provider
maxPreambleLength: defaultMaxPreambleLength,
paddingUnit: defaultPaddingUnit,
checkInvariantsOnUnlock: os.Getenv("KOPIA_VERIFY_INVARIANTS") != "",
repoLogManager: repolog.NewLogManager(ctx, st, prov),
repoLogManager: repoLogManager,
contextLogger: logging.Module(FormatLogModule)(ctx),
metricsStruct: initMetricsStruct(mr),

View File

@@ -965,7 +965,7 @@ func NewManagerForTesting(ctx context.Context, st blob.Storage, f format.Provide
options.TimeNow = clock.Now
}
sharedManager, err := NewSharedManager(ctx, st, f, caching, options, nil)
sharedManager, err := NewSharedManager(ctx, st, f, caching, options, nil, nil)
if err != nil {
return nil, errors.Wrap(err, "error initializing read manager")
}

View File

@@ -6,6 +6,7 @@
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/blobcrypto"
"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/testlogging"
@@ -40,7 +41,7 @@ func TestEncryptedBlobManager(t *testing.T) {
ebm := EncryptionManager{
st: fs,
crypter: staticCrypter{hf, enc},
crypter: blobcrypto.StaticCrypter{Hash: hf, Encryption: enc},
indexBlobCache: nil,
log: logging.NullLogger,
}
@@ -76,21 +77,8 @@ func TestEncryptedBlobManager(t *testing.T) {
someError2 := errors.Errorf("some error 2")
ebm.crypter = staticCrypter{hf, failingEncryptor{nil, someError2}}
ebm.crypter = blobcrypto.StaticCrypter{Hash: hf, Encryption: failingEncryptor{nil, someError2}}
_, err = ebm.EncryptAndWriteBlob(ctx, gather.FromSlice([]byte{1, 2, 3, 4}), "x", "session1")
require.ErrorIs(t, err, someError2)
}
type staticCrypter struct {
h hashing.HashFunc
e encryption.Encryptor
}
func (p staticCrypter) Encryptor() encryption.Encryptor {
return p.e
}
func (p staticCrypter) HashFunc() hashing.HashFunc {
return p.h
}

View File

@@ -14,6 +14,7 @@
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/internal/blobcrypto"
"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/faketime"
@@ -788,7 +789,7 @@ func newIndexBlobManagerForTesting(t *testing.T, st blob.Storage, localTimeNow f
enc: &EncryptionManager{
st: st,
indexBlobCache: nil,
crypter: staticCrypter{hf, enc},
crypter: blobcrypto.StaticCrypter{Hash: hf, Encryption: enc},
log: log,
},
timeNow: localTimeNow,

View File

@@ -17,6 +17,7 @@
"github.com/kopia/kopia/internal/crypto"
"github.com/kopia/kopia/internal/feature"
"github.com/kopia/kopia/internal/metrics"
"github.com/kopia/kopia/internal/repodiag"
"github.com/kopia/kopia/internal/retry"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/beforeop"
@@ -322,7 +323,10 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions,
st = upgradeLockMonitor(fmgr, options.UpgradeOwnerID, st, cmOpts.TimeNow, options.OnFatalError, options.TestOnlyIgnoreMissingRequiredFeatures)
}
scm, ferr := content.NewSharedManager(ctx, st, fmgr, cacheOpts, cmOpts, mr)
dw := repodiag.NewWriter(st, fmgr)
logManager := repodiag.NewLogManager(ctx, dw)
scm, ferr := content.NewSharedManager(ctx, st, fmgr, cacheOpts, cmOpts, logManager, mr)
if ferr != nil {
return nil, errors.Wrap(ferr, "unable to create shared content manager")
}
@@ -344,7 +348,9 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions,
closer := newRefCountedCloser(
scm.CloseShared,
dw.Wait,
mr.Close,
st.Close,
)
dr := &directRepository{