diff --git a/cli/command_blob_list.go b/cli/command_blob_list.go index 64f540093..f555976f4 100644 --- a/cli/command_blob_list.go +++ b/cli/command_blob_list.go @@ -5,7 +5,7 @@ "strings" "github.com/kopia/kopia/internal/epoch" - "github.com/kopia/kopia/internal/repolog" + "github.com/kopia/kopia/internal/repodiag" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/content/indexblob" @@ -66,7 +66,7 @@ func (c *commandBlobList) shouldInclude(b blob.Metadata) bool { return false } - if strings.HasPrefix(string(b.BlobID), repolog.BlobPrefix) { + if strings.HasPrefix(string(b.BlobID), repodiag.LogBlobPrefix) { return false } diff --git a/cli/command_logs_session.go b/cli/command_logs_session.go index ce32af2a2..0cccecd6c 100644 --- a/cli/command_logs_session.go +++ b/cli/command_logs_session.go @@ -11,7 +11,7 @@ "github.com/pkg/errors" "github.com/kopia/kopia/internal/clock" - "github.com/kopia/kopia/internal/repolog" + "github.com/kopia/kopia/internal/repodiag" "github.com/kopia/kopia/repo/blob" ) @@ -70,7 +70,7 @@ func getLogSessions(ctx context.Context, st blob.Reader) ([]*logSessionInfo, err var allSessions []*logSessionInfo - if err := st.ListBlobs(ctx, repolog.BlobPrefix, func(bm blob.Metadata) error { + if err := st.ListBlobs(ctx, repodiag.LogBlobPrefix, func(bm blob.Metadata) error { parts := strings.Split(string(bm.BlobID), "_") //nolint:gomnd diff --git a/internal/blobcrypto/blob_crypto_test.go b/internal/blobcrypto/blob_crypto_test.go index a6204c0c7..83d13ad3e 100644 --- a/internal/blobcrypto/blob_crypto_test.go +++ b/internal/blobcrypto/blob_crypto_test.go @@ -23,7 +23,7 @@ func TestBlobCrypto(t *testing.T) { enc, err := encryption.CreateEncryptor(f) require.NoError(t, err) - cr := staticCrypter{hf, enc} + cr := StaticCrypter{hf, enc} var tmp, tmp2, tmp3 gather.WriteBuffer defer tmp.Close() @@ -70,7 +70,7 @@ func (badEncryptor) Decrypt(input gather.Bytes, contentID []byte, output *gather func (badEncryptor) Overhead() int { return 0 } func TestBlobCrypto_Invalid(t *testing.T) { - cr := staticCrypter{ + cr := StaticCrypter{ func(output []byte, data gather.Bytes) []byte { // invalid hash return append(output, 9, 9, 9, 9) @@ -95,21 +95,8 @@ func(output []byte, data gather.Bytes) []byte { hf, err := hashing.CreateHashFunc(f) require.NoError(t, err) - cr.h = hf + cr.Hash = hf _, err = Encrypt(cr, gather.FromSlice([]byte{1, 2, 3}), "n", "mysessionid", &tmp) require.Error(t, err) } - -type staticCrypter struct { - h hashing.HashFunc - e encryption.Encryptor -} - -func (p staticCrypter) Encryptor() encryption.Encryptor { - return p.e -} - -func (p staticCrypter) HashFunc() hashing.HashFunc { - return p.h -} diff --git a/internal/blobcrypto/static_crypter.go b/internal/blobcrypto/static_crypter.go new file mode 100644 index 000000000..f66c47472 --- /dev/null +++ b/internal/blobcrypto/static_crypter.go @@ -0,0 +1,24 @@ +package blobcrypto + +import ( + "github.com/kopia/kopia/repo/encryption" + "github.com/kopia/kopia/repo/hashing" +) + +// StaticCrypter implements Crypter interface with static hash and encryption functions. +type StaticCrypter struct { + Hash hashing.HashFunc + Encryption encryption.Encryptor +} + +// Encryptor returns the encryption algorithm. +func (p StaticCrypter) Encryptor() encryption.Encryptor { + return p.Encryption +} + +// HashFunc returns the hashing algorithm. +func (p StaticCrypter) HashFunc() hashing.HashFunc { + return p.Hash +} + +var _ Crypter = (*StaticCrypter)(nil) diff --git a/internal/repodiag/diag_writer.go b/internal/repodiag/diag_writer.go new file mode 100644 index 000000000..e81090abb --- /dev/null +++ b/internal/repodiag/diag_writer.go @@ -0,0 +1,67 @@ +package repodiag + +import ( + "context" + "sync" + + "github.com/kopia/kopia/internal/blobcrypto" + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/logging" +) + +var log = logging.Module("repodiag") + +// Writer manages encryption and asynchronous writing of diagnostic blobs to the repository. +type Writer struct { + st blob.Storage + bc blobcrypto.Crypter + wg sync.WaitGroup +} + +// EncryptAndWriteBlobAsync encrypts given content and writes it to the repository asynchronously, +// folllowed by calling the provided closeFunc. +func (w *Writer) EncryptAndWriteBlobAsync(ctx context.Context, prefix blob.ID, data gather.Bytes, closeFunc func()) { + encrypted := gather.NewWriteBuffer() + // Close happens in a goroutine + + blobID, err := blobcrypto.Encrypt(w.bc, data, prefix, "", encrypted) + if err != nil { + encrypted.Close() + + // this should not happen, also nothing can be done about this, we're not in a place where we can return error, log it. + log(ctx).Warnf("unable to encrypt diagnostics blob: %v", err) + + return + } + + b := encrypted.Bytes() + + w.wg.Add(1) + + go func() { + defer w.wg.Done() + defer encrypted.Close() + defer closeFunc() + + if err := w.st.PutBlob(ctx, blobID, b, blob.PutOptions{}); err != nil { + // nothing can be done about this, we're not in a place where we can return error, log it. + log(ctx).Warnf("unable to write diagnostics blob: %v", err) + return + } + }() +} + +// Wait waits for all the writes to complete. +func (w *Writer) Wait(ctx context.Context) error { + w.wg.Wait() + return nil +} + +// NewWriter creates a new writer. +func NewWriter( + st blob.Storage, + bc blobcrypto.Crypter, +) *Writer { + return &Writer{st: st, bc: bc} +} diff --git a/internal/repodiag/diag_writer_test.go b/internal/repodiag/diag_writer_test.go new file mode 100644 index 000000000..aeeb5bc03 --- /dev/null +++ b/internal/repodiag/diag_writer_test.go @@ -0,0 +1,94 @@ +package repodiag_test + +import ( + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/blobcrypto" + "github.com/kopia/kopia/internal/blobtesting" + "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/repodiag" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/encryption" + "github.com/kopia/kopia/repo/format" + "github.com/kopia/kopia/repo/hashing" +) + +func TestDiagWriter(t *testing.T) { + d := blobtesting.DataMap{} + st := blobtesting.NewMapStorage(d, nil, nil) + fs := blobtesting.NewFaultyStorage(st) + + w := repodiag.NewWriter(fs, newStaticCrypter(t)) + ctx := testlogging.Context(t) + closeCalled1 := make(chan struct{}) + closeCalled2 := make(chan struct{}) + + w.EncryptAndWriteBlobAsync(ctx, "prefix1_", gather.FromSlice([]byte{1, 2, 3}), func() { + close(closeCalled1) + }) + + w.EncryptAndWriteBlobAsync(ctx, "prefix2_", gather.FromSlice([]byte{2, 3, 4}), func() { + close(closeCalled2) + }) + + <-closeCalled1 + <-closeCalled2 + + // simulate write failure + someErr := errors.Errorf("some error") + fs.AddFault(blobtesting.MethodPutBlob).ErrorInstead(someErr) + + closeCalled3 := make(chan struct{}) + + w.EncryptAndWriteBlobAsync(ctx, "prefix3_", gather.FromSlice([]byte{1}), func() { + close(closeCalled3) + }) + + <-closeCalled3 + + // blob IDs are deterministic based on content + require.Len(t, d, 2) + require.Contains(t, d, blob.ID("prefix1_11c0e79b71c3976ccd0c02d1310e2516")) + require.Contains(t, d, blob.ID("prefix2_24ff687b6ca564bd005a99420c90a0db")) + + t0 := clock.Now() + + w.EncryptAndWriteBlobAsync(ctx, "prefix4_", gather.FromSlice([]byte{3, 4, 5}), func() { + time.Sleep(1100 * time.Millisecond) + }) + + // make sure close waits for all async writes to complete + w.Wait(ctx) + + require.Greater(t, clock.Now().Sub(t0), time.Second) +} + +func newStaticCrypter(t *testing.T) blobcrypto.Crypter { + t.Helper() + + p := &format.ContentFormat{ + Encryption: encryption.DefaultAlgorithm, + Hash: hashing.DefaultAlgorithm, + } + + enc, err := encryption.CreateEncryptor(p) + if err != nil { + t.Fatalf("unable to create encryptor: %v", err) + } + + hf, err := hashing.CreateHashFunc(p) + if err != nil { + t.Fatalf("unable to create hash: %v", err) + } + + return blobcrypto.StaticCrypter{ + Hash: hf, + Encryption: enc, + } +} diff --git a/internal/repolog/internal_logger.go b/internal/repodiag/internal_logger.go similarity index 95% rename from internal/repolog/internal_logger.go rename to internal/repodiag/internal_logger.go index f537385cf..0200ce252 100644 --- a/internal/repolog/internal_logger.go +++ b/internal/repodiag/internal_logger.go @@ -1,4 +1,4 @@ -package repolog +package repodiag import ( "compress/gzip" @@ -31,7 +31,10 @@ type internalLogger struct { } func (l *internalLogger) Write(b []byte) (int, error) { - l.maybeEncryptAndWriteChunkUnlocked(l.addAndMaybeFlush(b)) + if l != nil { + l.maybeEncryptAndWriteChunkUnlocked(l.addAndMaybeFlush(b)) + } + return len(b), nil } diff --git a/internal/repolog/log_manager.go b/internal/repodiag/log_manager.go similarity index 56% rename from internal/repolog/log_manager.go rename to internal/repodiag/log_manager.go index 8e2379f3c..51be3f2d0 100644 --- a/internal/repolog/log_manager.go +++ b/internal/repodiag/log_manager.go @@ -1,18 +1,16 @@ -// Package repolog manages logs in the repository. -package repolog +// Package repodiag manages logs and metrics in the repository. +package repodiag import ( "context" "crypto/rand" "fmt" - "sync" "sync/atomic" "time" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "github.com/kopia/kopia/internal/blobcrypto" "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/zaplogutil" @@ -21,8 +19,8 @@ const blobLoggerFlushThreshold = 4 << 20 -// BlobPrefix is a prefix given to text logs stored in repository. -const BlobPrefix = "_log_" +// LogBlobPrefix is a prefix given to text logs stored in repository. +const LogBlobPrefix = "_log_" // LogManager manages writing encrypted log blobs to the repository. type LogManager struct { @@ -32,55 +30,29 @@ type LogManager struct { // repository asynchronously when the context is not provided. ctx context.Context //nolint:containedctx - st blob.Storage - bc blobcrypto.Crypter - wg sync.WaitGroup + writer *Writer + timeFunc func() time.Time flushThreshold int } -// Close closes the log manager. -func (m *LogManager) Close(ctx context.Context) { - m.wg.Wait() -} - func (m *LogManager) encryptAndWriteLogBlob(prefix blob.ID, data gather.Bytes, closeFunc func()) { - encrypted := gather.NewWriteBuffer() - // Close happens in a goroutine - - blobID, err := blobcrypto.Encrypt(m.bc, data, prefix, "", encrypted) - if err != nil { - encrypted.Close() - - // this should not happen, also nothing can be done about this, we're not in a place where we can return error, log it. - return - } - - b := encrypted.Bytes() - - m.wg.Add(1) - - go func() { - defer m.wg.Done() - defer encrypted.Close() - defer closeFunc() - - if err := m.st.PutBlob(m.ctx, blobID, b, blob.PutOptions{}); err != nil { - // nothing can be done about this, we're not in a place where we can return error, log it. - return - } - }() + m.writer.EncryptAndWriteBlobAsync(m.ctx, prefix, data, closeFunc) } // NewLogger creates new logger. func (m *LogManager) NewLogger() *zap.SugaredLogger { + if m == nil { + return zap.NewNop().Sugar() + } + var rnd [2]byte rand.Read(rnd[:]) //nolint:errcheck w := &internalLogger{ m: m, - prefix: blob.ID(fmt.Sprintf("%v%v_%x", BlobPrefix, clock.Now().Local().Format("20060102150405"), rnd)), + prefix: blob.ID(fmt.Sprintf("%v%v_%x", LogBlobPrefix, clock.Now().Local().Format("20060102150405"), rnd)), } return zap.New(zapcore.NewCore( @@ -101,11 +73,10 @@ func (m *LogManager) Enable() { } // NewLogManager creates a new LogManager that will emit logs as repository blobs. -func NewLogManager(ctx context.Context, st blob.Storage, bc blobcrypto.Crypter) *LogManager { +func NewLogManager(ctx context.Context, w *Writer) *LogManager { return &LogManager{ ctx: ctx, - st: st, - bc: bc, + writer: w, flushThreshold: blobLoggerFlushThreshold, timeFunc: clock.Now, } diff --git a/internal/repodiag/log_manager_test.go b/internal/repodiag/log_manager_test.go new file mode 100644 index 000000000..8f1f0810c --- /dev/null +++ b/internal/repodiag/log_manager_test.go @@ -0,0 +1,95 @@ +package repodiag_test + +import ( + "crypto/rand" + "encoding/hex" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/blobtesting" + "github.com/kopia/kopia/internal/repodiag" + "github.com/kopia/kopia/internal/testlogging" +) + +func TestLogManager_Enabled(t *testing.T) { + d := blobtesting.DataMap{} + st := blobtesting.NewMapStorage(d, nil, nil) + w := repodiag.NewWriter(st, newStaticCrypter(t)) + ctx := testlogging.Context(t) + lm := repodiag.NewLogManager(ctx, w) + + lm.Enable() + l := lm.NewLogger() + l.Infof("hello") + + require.Len(t, d, 0) + l.Sync() + w.Wait(ctx) + + // make sure log messages are written + require.Len(t, d, 1) + + // make sure blob ID is prefixed + for k := range d { + require.True(t, strings.HasPrefix(string(k), repodiag.LogBlobPrefix)) + } +} + +func TestLogManager_AutoFlush(t *testing.T) { + d := blobtesting.DataMap{} + st := blobtesting.NewMapStorage(d, nil, nil) + w := repodiag.NewWriter(st, newStaticCrypter(t)) + ctx := testlogging.Context(t) + lm := repodiag.NewLogManager(ctx, w) + + lm.Enable() + l := lm.NewLogger() + + // flush happens after 4 << 20 bytes (4MB) after compression, + // write ~10MB of base16 data which compresses to ~5MB and writes 1 blob + for i := 0; i < 5000; i++ { + var b [1024]byte + + rand.Read(b[:]) + l.Infof(hex.EncodeToString(b[:])) + } + + w.Wait(ctx) + + require.Equal(t, 1, len(d)) + + l.Sync() + w.Wait(ctx) + + require.Equal(t, 2, len(d)) +} + +func TestLogManager_NotEnabled(t *testing.T) { + d := blobtesting.DataMap{} + st := blobtesting.NewMapStorage(d, nil, nil) + w := repodiag.NewWriter(st, newStaticCrypter(t)) + ctx := testlogging.Context(t) + lm := repodiag.NewLogManager(ctx, w) + + l := lm.NewLogger() + l.Infof("hello") + + require.Len(t, d, 0) + l.Sync() + w.Wait(ctx) + + // make sure log messages are not written + require.Len(t, d, 0) +} + +func TestLogManager_Null(t *testing.T) { + var lm *repodiag.LogManager + + lm.Enable() + + l := lm.NewLogger() + l.Infof("hello") + l.Sync() +} diff --git a/repo/content/committed_read_manager.go b/repo/content/committed_read_manager.go index 3ac6d67bd..bde6a84f9 100644 --- a/repo/content/committed_read_manager.go +++ b/repo/content/committed_read_manager.go @@ -18,7 +18,7 @@ "github.com/kopia/kopia/internal/listcache" "github.com/kopia/kopia/internal/metrics" "github.com/kopia/kopia/internal/ownwrites" - "github.com/kopia/kopia/internal/repolog" + "github.com/kopia/kopia/internal/repodiag" "github.com/kopia/kopia/internal/timetrack" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/blob/filesystem" @@ -106,7 +106,7 @@ type SharedManager struct { // logger associated with the context that opened the repository. contextLogger logging.Logger - repoLogManager *repolog.LogManager + repoLogManager *repodiag.LogManager internalLogger *zap.SugaredLogger // backing logger for 'sharedBaseLogger' metricsStruct @@ -577,14 +577,8 @@ func (sm *SharedManager) CloseShared(ctx context.Context) error { sm.internalLogger.Sync() //nolint:errcheck } - sm.repoLogManager.Close(ctx) - sm.indexBlobManagerV1.EpochManager().Flush() - if err := sm.st.Close(ctx); err != nil { - return errors.Wrap(err, "error closing storage") - } - return nil } @@ -612,7 +606,7 @@ func (sm *SharedManager) PrepareUpgradeToIndexBlobManagerV1(ctx context.Context) } // NewSharedManager returns SharedManager that is used by SessionWriteManagers on top of a repository. -func NewSharedManager(ctx context.Context, st blob.Storage, prov format.Provider, caching *CachingOptions, opts *ManagerOptions, mr *metrics.Registry) (*SharedManager, error) { +func NewSharedManager(ctx context.Context, st blob.Storage, prov format.Provider, caching *CachingOptions, opts *ManagerOptions, repoLogManager *repodiag.LogManager, mr *metrics.Registry) (*SharedManager, error) { opts = opts.CloneOrDefault() if opts.TimeNow == nil { opts.TimeNow = clock.Now @@ -628,7 +622,7 @@ func NewSharedManager(ctx context.Context, st blob.Storage, prov format.Provider maxPreambleLength: defaultMaxPreambleLength, paddingUnit: defaultPaddingUnit, checkInvariantsOnUnlock: os.Getenv("KOPIA_VERIFY_INVARIANTS") != "", - repoLogManager: repolog.NewLogManager(ctx, st, prov), + repoLogManager: repoLogManager, contextLogger: logging.Module(FormatLogModule)(ctx), metricsStruct: initMetricsStruct(mr), diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 73f42a6e7..06a3431bc 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -965,7 +965,7 @@ func NewManagerForTesting(ctx context.Context, st blob.Storage, f format.Provide options.TimeNow = clock.Now } - sharedManager, err := NewSharedManager(ctx, st, f, caching, options, nil) + sharedManager, err := NewSharedManager(ctx, st, f, caching, options, nil, nil) if err != nil { return nil, errors.Wrap(err, "error initializing read manager") } diff --git a/repo/content/indexblob/index_blob_encryption_test.go b/repo/content/indexblob/index_blob_encryption_test.go index 47332aa2c..b57f673a5 100644 --- a/repo/content/indexblob/index_blob_encryption_test.go +++ b/repo/content/indexblob/index_blob_encryption_test.go @@ -6,6 +6,7 @@ "github.com/pkg/errors" "github.com/stretchr/testify/require" + "github.com/kopia/kopia/internal/blobcrypto" "github.com/kopia/kopia/internal/blobtesting" "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/testlogging" @@ -40,7 +41,7 @@ func TestEncryptedBlobManager(t *testing.T) { ebm := EncryptionManager{ st: fs, - crypter: staticCrypter{hf, enc}, + crypter: blobcrypto.StaticCrypter{Hash: hf, Encryption: enc}, indexBlobCache: nil, log: logging.NullLogger, } @@ -76,21 +77,8 @@ func TestEncryptedBlobManager(t *testing.T) { someError2 := errors.Errorf("some error 2") - ebm.crypter = staticCrypter{hf, failingEncryptor{nil, someError2}} + ebm.crypter = blobcrypto.StaticCrypter{Hash: hf, Encryption: failingEncryptor{nil, someError2}} _, err = ebm.EncryptAndWriteBlob(ctx, gather.FromSlice([]byte{1, 2, 3, 4}), "x", "session1") require.ErrorIs(t, err, someError2) } - -type staticCrypter struct { - h hashing.HashFunc - e encryption.Encryptor -} - -func (p staticCrypter) Encryptor() encryption.Encryptor { - return p.e -} - -func (p staticCrypter) HashFunc() hashing.HashFunc { - return p.h -} diff --git a/repo/content/indexblob/index_blob_manager_v0_test.go b/repo/content/indexblob/index_blob_manager_v0_test.go index 0ecebbb81..979d1a6dd 100644 --- a/repo/content/indexblob/index_blob_manager_v0_test.go +++ b/repo/content/indexblob/index_blob_manager_v0_test.go @@ -14,6 +14,7 @@ "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" + "github.com/kopia/kopia/internal/blobcrypto" "github.com/kopia/kopia/internal/blobtesting" "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/faketime" @@ -788,7 +789,7 @@ func newIndexBlobManagerForTesting(t *testing.T, st blob.Storage, localTimeNow f enc: &EncryptionManager{ st: st, indexBlobCache: nil, - crypter: staticCrypter{hf, enc}, + crypter: blobcrypto.StaticCrypter{Hash: hf, Encryption: enc}, log: log, }, timeNow: localTimeNow, diff --git a/repo/open.go b/repo/open.go index 4ed528531..2f82c1c4f 100644 --- a/repo/open.go +++ b/repo/open.go @@ -17,6 +17,7 @@ "github.com/kopia/kopia/internal/crypto" "github.com/kopia/kopia/internal/feature" "github.com/kopia/kopia/internal/metrics" + "github.com/kopia/kopia/internal/repodiag" "github.com/kopia/kopia/internal/retry" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/blob/beforeop" @@ -322,7 +323,10 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions, st = upgradeLockMonitor(fmgr, options.UpgradeOwnerID, st, cmOpts.TimeNow, options.OnFatalError, options.TestOnlyIgnoreMissingRequiredFeatures) } - scm, ferr := content.NewSharedManager(ctx, st, fmgr, cacheOpts, cmOpts, mr) + dw := repodiag.NewWriter(st, fmgr) + logManager := repodiag.NewLogManager(ctx, dw) + + scm, ferr := content.NewSharedManager(ctx, st, fmgr, cacheOpts, cmOpts, logManager, mr) if ferr != nil { return nil, errors.Wrap(ferr, "unable to create shared content manager") } @@ -344,7 +348,9 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions, closer := newRefCountedCloser( scm.CloseShared, + dw.Wait, mr.Close, + st.Close, ) dr := &directRepository{