Files
kopia/repo/content/committed_read_manager.go
Jarek Kowalski 0f7253eb66 feat(general): rewrote content logs to always be JSON-based and reorganized log structure (#4822)
This is a breaking change to users who might be using Kopia as a library.

### Log Format

```json
{"t":"<timestamp-rfc-3389-microseconds>", "span:T1":"V1", "span:T2":"V2", "n":"<source>", "m":"<message>", /*parameters*/}
```

Where each record is associated with one or more spans that describe its scope:

* `"span:client": "<hash-of-username@hostname>"`
* `"span:repo": "<random>"` - random identifier of a repository connection (from `repo.Open`)
* `"span:maintenance": "<random>"` - random identifier of a maintenance session
* `"span:upload": "<hash-of-username@host:/path>"` - uniquely identifies upload session of a given directory
* `"span:checkpoint": "<random>"` - encapsulates each checkpoint operation during Upload
* `"span:server-session": "<random>"` -single client connection to the server
* `"span:flush": "<random>"` - encapsulates each Flush session
* `"span:maintenance": "<random>"` - encapsulates each maintenance operation
* `"span:loadIndex" : "<random>"` - encapsulates index loading operation
* `"span:emr" : "<random>"` - encapsulates epoch manager refresh
* `"span:writePack": "<pack-blob-ID>"` - encapsulates pack blob preparation and writing

(plus additional minor spans for various phases of the maintenance).

Notable points:

- Used internal zero allocation JSON writer for reduced memory usage.
- renamed `--disable-internal-log` to `--disable-repository-log` (controls saving blobs to repository)
- added `--disable-content-log` (controls writing of `content-log` files)
- all storage operations are also logged in a structural way and associated with the corresponding spans.
- all content IDs are logged in a truncated format (since first N bytes that are usually enough to be unique) to improve compressibility of logs (blob IDs are frequently repeated but content IDs usually appear just once).

This format should make it possible to recreate the journey of any single content throughout pack blobs, indexes and compaction events.
2025-09-27 17:11:13 -07:00

646 lines
19 KiB
Go

package content
import (
"context"
"os"
"path/filepath"
"sync"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/blobparam"
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/internal/cacheprot"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/contentlog"
"github.com/kopia/kopia/internal/contentlog/logparam"
"github.com/kopia/kopia/internal/epoch"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/listcache"
"github.com/kopia/kopia/internal/metrics"
"github.com/kopia/kopia/internal/ownwrites"
"github.com/kopia/kopia/internal/repodiag"
"github.com/kopia/kopia/internal/timetrack"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/filesystem"
"github.com/kopia/kopia/repo/blob/sharded"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content/indexblob"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/hashing"
"github.com/kopia/kopia/repo/logging"
)
// number of bytes to read from each pack index when recovering the index.
// per-pack indexes are usually short (<100-200 contents).
const indexRecoverPostambleSize = 8192
const indexRefreshFrequency = 15 * time.Minute
const ownWritesCacheDuration = 15 * time.Minute
var log = logging.Module("kopia/content") // +checklocksignore
// constants below specify how long to prevent cache entries from expiring.
const (
DefaultMetadataCacheSweepAge = 24 * time.Hour
DefaultDataCacheSweepAge = 10 * time.Minute
DefaultIndexCacheSweepAge = 1 * time.Hour
)
//nolint:gochecknoglobals
var cachedIndexBlobPrefixes = []blob.ID{
indexblob.V0IndexBlobPrefix,
indexblob.V0CompactionLogBlobPrefix,
indexblob.V0CleanupBlobPrefix,
epoch.UncompactedIndexBlobPrefix,
epoch.EpochMarkerIndexBlobPrefix,
epoch.SingleEpochCompactionBlobPrefix,
epoch.RangeCheckpointIndexBlobPrefix,
}
//nolint:gochecknoglobals
var allIndexBlobPrefixes = []blob.ID{
indexblob.V0IndexBlobPrefix,
epoch.UncompactedIndexBlobPrefix,
epoch.SingleEpochCompactionBlobPrefix,
epoch.RangeCheckpointIndexBlobPrefix,
}
// IndexBlobReader provides an API for reading index blobs.
type IndexBlobReader interface {
ListIndexBlobInfos(ctx context.Context) ([]indexblob.Metadata, error)
}
// SharedManager is responsible for read-only access to committed data.
type SharedManager struct {
Stats *Stats
st blob.Storage
indexBlobManagerV0 *indexblob.ManagerV0
indexBlobManagerV1 *indexblob.ManagerV1
contentCache cache.ContentCache
metadataCache cache.ContentCache
indexBlobCache *cache.PersistentCache
committedContents *committedContentIndex
timeNow func() time.Time
// lock to protect the set of committed indexes
// shared lock will be acquired when writing new content to allow it to happen in parallel
// exclusive lock will be acquired during compaction or refresh.
indexesLock sync.RWMutex
permissiveCacheLoading bool
// maybeRefreshIndexes() will call Refresh() after this point in ime.
// +checklocks:indexesLock
refreshIndexesAfter time.Time
format format.Provider
checkInvariantsOnUnlock bool
minPreambleLength int
maxPreambleLength int
paddingUnit int
// logger where logs should be written
log *contentlog.Logger
// logger associated with the context that opened the repository.
repoLogManager *repodiag.LogManager
metricsStruct
}
// IsReadOnly returns whether this instance of the SharedManager only supports
// reads or if it also supports mutations to the index.
func (sm *SharedManager) IsReadOnly() bool {
return sm.st.IsReadOnly()
}
// LoadIndexBlob return index information loaded from the specified blob.
func (sm *SharedManager) LoadIndexBlob(ctx context.Context, ibid blob.ID, d *gather.WriteBuffer) ([]Info, error) {
err := sm.st.GetBlob(ctx, ibid, 0, -1, d)
if err != nil {
return nil, errors.Wrapf(err, "could not find index blob %q", ibid)
}
return ParseIndexBlob(ibid, d.Bytes(), sm.format)
}
// IndexReaderV0 return an index reader for reading V0 indexes.
func (sm *SharedManager) IndexReaderV0() IndexBlobReader {
return sm.indexBlobManagerV0
}
// IndexReaderV1 return an index reader for reading V1 indexes.
func (sm *SharedManager) IndexReaderV1() IndexBlobReader {
return sm.indexBlobManagerV1
}
func (sm *SharedManager) readPackFileLocalIndex(ctx context.Context, packFile blob.ID, packFileLength int64, output *gather.WriteBuffer) error {
var err error
if packFileLength >= indexRecoverPostambleSize {
if err = sm.attemptReadPackFileLocalIndex(ctx, packFile, packFileLength-indexRecoverPostambleSize, indexRecoverPostambleSize, output); err == nil {
contentlog.Log2(ctx, sm.log, "recovered index bytes from blob using optimized method", logparam.Int("length", output.Length()), blobparam.BlobID("packFile", packFile))
return nil
}
contentlog.Log1(ctx, sm.log,
"unable to recover using optimized method",
logparam.Error("err", err))
}
if err = sm.attemptReadPackFileLocalIndex(ctx, packFile, 0, -1, output); err == nil {
contentlog.Log2(ctx, sm.log,
"recovered index bytes from blob using full blob read",
logparam.Int("length", output.Length()),
blobparam.BlobID("packFile", packFile))
return nil
}
return err
}
func (sm *SharedManager) attemptReadPackFileLocalIndex(ctx context.Context, packFile blob.ID, offset, length int64, output *gather.WriteBuffer) error {
var payload gather.WriteBuffer
defer payload.Close()
output.Reset()
err := sm.st.GetBlob(ctx, packFile, offset, length, &payload)
if err != nil {
return errors.Wrapf(err, "error getting blob %v", packFile)
}
postamble := findPostamble(payload.Bytes().ToByteSlice())
if postamble == nil {
return errors.Errorf("unable to find valid postamble in file %v", packFile)
}
if uint32(offset) > postamble.localIndexOffset { //nolint:gosec
return errors.Errorf("not enough data read during optimized attempt %v", packFile)
}
postamble.localIndexOffset -= uint32(offset) //nolint:gosec
//nolint:gosec
if uint64(postamble.localIndexOffset+postamble.localIndexLength) > uint64(payload.Length()) {
// invalid offset/length
return errors.Errorf("unable to find valid local index in file %v - invalid offset/length", packFile)
}
var encryptedLocalIndexBytes gather.WriteBuffer
defer encryptedLocalIndexBytes.Close()
if err := payload.AppendSectionTo(&encryptedLocalIndexBytes, int(postamble.localIndexOffset), int(postamble.localIndexLength)); err != nil {
// should never happen
return errors.Wrap(err, "error appending to local index bytes")
}
return errors.Wrap(
sm.decryptAndVerify(encryptedLocalIndexBytes.Bytes(), postamble.localIndexIV, output),
"unable to decrypt local index")
}
// +checklocks:sm.indexesLock
func (sm *SharedManager) loadPackIndexesLocked(ctx context.Context) error {
ctx0 := contentlog.WithParams(ctx,
logparam.String("span:loadindex", contentlog.RandomSpanID()))
nextSleepTime := 100 * time.Millisecond //nolint:mnd
for i := range indexLoadAttempts {
ctx := contentlog.WithParams(ctx0,
logparam.Int("loadAttempt", i))
ibm, err0 := sm.indexBlobManager(ctx)
if err0 != nil {
return err0
}
if err := ctx.Err(); err != nil {
//nolint:wrapcheck
return err
}
if i > 0 {
// invalidate any list caches.
flushTimer := timetrack.StartTimer()
flushErr := sm.st.FlushCaches(ctx)
contentlog.Log2(ctx, sm.log, "flushCaches",
logparam.Duration("latency", flushTimer.Elapsed()),
logparam.Error("error", flushErr))
time.Sleep(nextSleepTime)
nextSleepTime *= 2
}
indexBlobs, ignoreDeletedBefore, err := ibm.ListActiveIndexBlobs(ctx)
if err != nil {
return errors.Wrap(err, "error listing index blobs")
}
var indexBlobIDs []blob.ID
for _, b := range indexBlobs {
indexBlobIDs = append(indexBlobIDs, b.BlobID)
}
err = sm.committedContents.fetchIndexBlobs(ctx, sm.permissiveCacheLoading, indexBlobIDs)
if err == nil {
err = sm.committedContents.use(ctx, indexBlobIDs, ignoreDeletedBefore)
if err != nil {
return err
}
if len(indexBlobs) > indexBlobCompactionWarningThreshold {
log(ctx).Errorf("Found too many index blobs (%v), this may result in degraded performance.\n\nPlease ensure periodic repository maintenance is enabled or run 'kopia maintenance'.", len(indexBlobs))
contentlog.Log1(ctx, sm.log, "Found too many index blobs", logparam.Int("len", len(indexBlobs)))
}
sm.refreshIndexesAfter = sm.timeNow().Add(indexRefreshFrequency)
return nil
}
if !errors.Is(err, blob.ErrBlobNotFound) {
return err
}
}
return errors.Errorf("unable to load pack indexes despite %v retries", indexLoadAttempts)
}
func (sm *SharedManager) getCacheForContentID(id ID) cache.ContentCache {
if id.HasPrefix() {
return sm.metadataCache
}
return sm.contentCache
}
// indexBlobManager return the index manager for content.
func (sm *SharedManager) indexBlobManager(ctx context.Context) (indexblob.Manager, error) {
mp, mperr := sm.format.GetMutableParameters(ctx)
if mperr != nil {
return nil, errors.Wrap(mperr, "mutable parameters")
}
var q indexblob.Manager = sm.indexBlobManagerV0
if mp.EpochParameters.Enabled {
q = sm.indexBlobManagerV1
}
return q, nil
}
func (sm *SharedManager) decryptContentAndVerify(payload gather.Bytes, bi Info, output *gather.WriteBuffer) error {
sm.Stats.readContent(payload.Length())
var hashBuf [hashing.MaxHashSize]byte
iv := getPackedContentIV(hashBuf[:0], bi.ContentID)
// reserved for future use
if k := bi.EncryptionKeyID; k != 0 {
return errors.Errorf("unsupported encryption key ID: %v", k)
}
h := bi.CompressionHeaderID
if h == 0 {
return errors.Wrapf(
sm.decryptAndVerify(payload, iv, output),
"invalid checksum at %v offset %v length %v/%v", bi.PackBlobID, bi.PackOffset, bi.PackedLength, payload.Length())
}
var tmp gather.WriteBuffer
defer tmp.Close()
if err := sm.decryptAndVerify(payload, iv, &tmp); err != nil {
return errors.Wrapf(err, "invalid checksum at %v offset %v length %v/%v", bi.PackBlobID, bi.PackOffset, bi.PackedLength, payload.Length())
}
c := compression.ByHeaderID[h]
if c == nil {
return errors.Errorf("unsupported compressor %x", h)
}
t0 := timetrack.StartTimer()
if err := c.Decompress(output, tmp.Bytes().Reader(), true); err != nil {
return errors.Wrap(err, "error decompressing")
}
sm.decompressedBytes.Observe(int64(tmp.Length()), t0.Elapsed())
return nil
}
func (sm *SharedManager) decryptAndVerify(encrypted gather.Bytes, iv []byte, output *gather.WriteBuffer) error {
t0 := timetrack.StartTimer()
if err := sm.format.Encryptor().Decrypt(encrypted, iv, output); err != nil {
sm.Stats.foundInvalidContent()
return errors.Wrap(err, "decrypt")
}
sm.decryptedBytes.Observe(int64(encrypted.Length()), t0.Elapsed())
sm.Stats.foundValidContent()
sm.Stats.decrypted(output.Length())
// already verified
return nil
}
// IndexBlobs returns the list of active index blobs.
func (sm *SharedManager) IndexBlobs(ctx context.Context, includeInactive bool) ([]indexblob.Metadata, error) {
if includeInactive {
var result []indexblob.Metadata
for _, prefix := range allIndexBlobPrefixes {
blobs, err := blob.ListAllBlobs(ctx, sm.st, prefix)
if err != nil {
return nil, errors.Wrapf(err, "error listing %v blogs", prefix)
}
for _, bm := range blobs {
result = append(result, indexblob.Metadata{Metadata: bm})
}
}
return result, nil
}
ibm, err0 := sm.indexBlobManager(ctx)
if err0 != nil {
return nil, err0
}
blobs, _, err := ibm.ListActiveIndexBlobs(ctx)
//nolint:wrapcheck
return blobs, err
}
func newOwnWritesCache(ctx context.Context, st blob.Storage, caching *CachingOptions) (blob.Storage, error) {
cacheSt, err := newCacheBackingStorage(ctx, caching, "own-writes")
if err != nil {
return nil, errors.Wrap(err, "unable to get list cache backing storage")
}
return ownwrites.NewWrapper(st, cacheSt, cachedIndexBlobPrefixes, ownWritesCacheDuration), nil
}
func newListCache(ctx context.Context, st blob.Storage, caching *CachingOptions) (blob.Storage, error) {
cacheSt, err := newCacheBackingStorage(ctx, caching, "blob-list")
if err != nil {
return nil, errors.Wrap(err, "unable to get list cache backing storage")
}
return listcache.NewWrapper(st, cacheSt, cachedIndexBlobPrefixes, caching.HMACSecret, caching.MaxListCacheDuration.DurationOrDefault(0)), nil
}
func newCacheBackingStorage(ctx context.Context, caching *CachingOptions, subdir string) (blob.Storage, error) {
if caching.CacheDirectory == "" {
return nil, nil
}
blobListCacheDir := filepath.Join(caching.CacheDirectory, subdir)
if _, err := os.Stat(blobListCacheDir); os.IsNotExist(err) {
if err := os.MkdirAll(blobListCacheDir, cache.DirMode); err != nil {
return nil, errors.Wrap(err, "error creating list cache directory")
}
}
//nolint:wrapcheck
return filesystem.New(ctx, &filesystem.Options{
Path: blobListCacheDir,
Options: sharded.Options{
DirectoryShards: []int{},
},
}, false)
}
func (sm *SharedManager) namedLogger(n string) *contentlog.Logger {
return sm.repoLogManager.NewLogger(n)
}
func contentCacheSweepSettings(caching *CachingOptions) cache.SweepSettings {
return cache.SweepSettings{
MaxSizeBytes: caching.ContentCacheSizeBytes,
LimitBytes: caching.ContentCacheSizeLimitBytes,
MinSweepAge: caching.MinContentSweepAge.DurationOrDefault(DefaultDataCacheSweepAge),
}
}
func metadataCacheSizeSweepSettings(caching *CachingOptions) cache.SweepSettings {
return cache.SweepSettings{
MaxSizeBytes: caching.EffectiveMetadataCacheSizeBytes(),
LimitBytes: caching.MetadataCacheSizeLimitBytes,
MinSweepAge: caching.MinMetadataSweepAge.DurationOrDefault(DefaultMetadataCacheSweepAge),
}
}
func indexBlobCacheSweepSettings(caching *CachingOptions) cache.SweepSettings {
return cache.SweepSettings{
MaxSizeBytes: caching.EffectiveMetadataCacheSizeBytes(),
MinSweepAge: caching.MinMetadataSweepAge.DurationOrDefault(DefaultMetadataCacheSweepAge),
}
}
func (sm *SharedManager) setupCachesAndIndexManagers(ctx context.Context, caching *CachingOptions, mr *metrics.Registry) error {
dataCache, err := cache.NewContentCache(ctx, sm.st, cache.Options{
BaseCacheDirectory: caching.CacheDirectory,
CacheSubDir: "contents",
HMACSecret: caching.HMACSecret,
Sweep: contentCacheSweepSettings(caching),
}, mr)
if err != nil {
return errors.Wrap(err, "unable to initialize content cache")
}
metadataCache, err := cache.NewContentCache(ctx, sm.st, cache.Options{
BaseCacheDirectory: caching.CacheDirectory,
CacheSubDir: "metadata",
HMACSecret: caching.HMACSecret,
FetchFullBlobs: true,
Sweep: metadataCacheSizeSweepSettings(caching),
}, mr)
if err != nil {
return errors.Wrap(err, "unable to initialize metadata cache")
}
indexBlobStorage, err := cache.NewStorageOrNil(ctx, caching.CacheDirectory, caching.EffectiveMetadataCacheSizeBytes(), "index-blobs")
if err != nil {
return errors.Wrap(err, "unable to initialize index blob cache storage")
}
indexBlobCache, err := cache.NewPersistentCache(ctx, "index-blobs",
indexBlobStorage,
cacheprot.ChecksumProtection(caching.HMACSecret),
indexBlobCacheSweepSettings(caching),
mr, sm.timeNow)
if err != nil {
return errors.Wrap(err, "unable to create index blob cache")
}
ownWritesCachingSt, err := newOwnWritesCache(ctx, sm.st, caching)
if err != nil {
return errors.Wrap(err, "unable to initialize own writes cache")
}
cachedSt, err := newListCache(ctx, ownWritesCachingSt, caching)
if err != nil {
return errors.Wrap(err, "unable to initialize list cache")
}
enc := indexblob.NewEncryptionManager(
cachedSt,
sm.format,
indexBlobCache,
sm.namedLogger("encrypted-blob-manager"))
// set up legacy index blob manager
sm.indexBlobManagerV0 = indexblob.NewManagerV0(
cachedSt,
enc,
sm.timeNow,
sm.format,
sm.namedLogger("index-blob-manager"),
)
// set up new index blob manager
sm.indexBlobManagerV1 = indexblob.NewManagerV1(
cachedSt,
enc,
epoch.NewManager(cachedSt,
epochParameters{sm.format},
func(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error {
return errors.Wrap(sm.indexBlobManagerV1.CompactEpoch(ctx, blobIDs, outputPrefix), "CompactEpoch")
},
sm.namedLogger("epoch-manager"),
sm.timeNow),
sm.timeNow,
sm.format,
sm.namedLogger("index-blob-manager"),
)
// once everything is ready, set it up
sm.contentCache = dataCache
sm.metadataCache = metadataCache
sm.indexBlobCache = indexBlobCache
sm.committedContents = newCommittedContentIndex(caching,
sm.format.Encryptor().Overhead,
sm.format,
sm.permissiveCacheLoading,
enc.GetEncryptedBlob,
sm.namedLogger("committed-content-index"),
caching.MinIndexSweepAge.DurationOrDefault(DefaultIndexCacheSweepAge))
return nil
}
type epochParameters struct {
prov format.Provider
}
func (p epochParameters) GetParameters(ctx context.Context) (*epoch.Parameters, error) {
mp, mperr := p.prov.GetMutableParameters(ctx)
if mperr != nil {
return nil, errors.Wrap(mperr, "mutable parameters")
}
return &mp.EpochParameters, nil
}
// EpochManager returns the epoch manager.
func (sm *SharedManager) EpochManager(ctx context.Context) (*epoch.Manager, bool, error) {
ibm, err := sm.indexBlobManager(ctx)
if err != nil {
return nil, false, err
}
ibm1, ok := ibm.(*indexblob.ManagerV1)
if !ok {
return nil, false, nil
}
return ibm1.EpochManager(), true, nil
}
// CloseShared releases all resources in a shared manager.
func (sm *SharedManager) CloseShared(ctx context.Context) error {
if err := sm.committedContents.close(); err != nil {
return errors.Wrap(err, "error closing committed content index")
}
sm.contentCache.Close(ctx)
sm.metadataCache.Close(ctx)
sm.indexBlobCache.Close(ctx)
sm.indexBlobManagerV1.EpochManager().Flush()
sm.repoLogManager.Sync()
return nil
}
func (sm *SharedManager) shouldRefreshIndexes() bool {
sm.indexesLock.RLock()
defer sm.indexesLock.RUnlock()
return sm.timeNow().After(sm.refreshIndexesAfter)
}
// PrepareUpgradeToIndexBlobManagerV1 prepares the repository for migrating to IndexBlobManagerV1.
func (sm *SharedManager) PrepareUpgradeToIndexBlobManagerV1(ctx context.Context) error {
//nolint:wrapcheck
return sm.indexBlobManagerV1.PrepareUpgradeToIndexBlobManagerV1(ctx, sm.indexBlobManagerV0)
}
// NewSharedManager returns SharedManager that is used by SessionWriteManagers on top of a repository.
func NewSharedManager(ctx context.Context, st blob.Storage, prov format.Provider, caching *CachingOptions, opts *ManagerOptions, repoLogManager *repodiag.LogManager, mr *metrics.Registry) (*SharedManager, error) {
opts = opts.CloneOrDefault()
if opts.TimeNow == nil {
opts.TimeNow = clock.Now
}
sm := &SharedManager{
st: st,
Stats: new(Stats),
timeNow: opts.TimeNow,
format: prov,
permissiveCacheLoading: opts.PermissiveCacheLoading,
minPreambleLength: defaultMinPreambleLength,
maxPreambleLength: defaultMaxPreambleLength,
paddingUnit: defaultPaddingUnit,
checkInvariantsOnUnlock: os.Getenv("KOPIA_VERIFY_INVARIANTS") != "",
repoLogManager: repoLogManager,
metricsStruct: initMetricsStruct(mr),
}
sm.log = sm.repoLogManager.NewLogger("shared-manager")
caching = caching.CloneOrDefault()
if err := sm.setupCachesAndIndexManagers(ctx, caching, mr); err != nil {
return nil, errors.Wrap(err, "error setting up read manager caches")
}
sm.indexesLock.Lock()
defer sm.indexesLock.Unlock()
if err := sm.loadPackIndexesLocked(ctx); err != nil {
return nil, errors.Wrap(err, "error loading indexes")
}
return sm, nil
}