mirror of
https://github.com/kopia/kopia.git
synced 2026-01-26 15:28:06 -05:00
logging: revamped logs from content manager to be machine parseable (#617)
* logging: revamped logs from content manager to be machine parseable Logs from the content manager (except reads) are sent to separate log file that is always free from personally-identifiable information (e.g. no file names, just content IDs and blob IDs). Also moved CLI logs to a subdirectory (cli-logs) and put content logs in a parallel directory (content-logs) Also, the log file name will now include the type of the command that was invoked: kopia-20200913-134157-16110-snapshot-create.log Fixes #588 * tests: moved all logs from tests to a separate directory
This commit is contained in:
@@ -20,9 +20,13 @@
|
||||
"github.com/kopia/kopia/cli"
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/ospath"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
repologging "github.com/kopia/kopia/repo/logging"
|
||||
)
|
||||
|
||||
var contentLogFormat = logging.MustStringFormatter(
|
||||
`%{time:2006-01-02 15:04:05.0000000} %{message}`)
|
||||
|
||||
var fileLogFormat = logging.MustStringFormatter(
|
||||
`%{time:2006-01-02 15:04:05.000} %{level:.1s} [%{shortfile}] %{message}`)
|
||||
|
||||
@@ -31,12 +35,16 @@
|
||||
|
||||
var logLevels = []string{"debug", "info", "warning", "error"}
|
||||
var (
|
||||
logFile = cli.App().Flag("log-file", "Log file name.").String()
|
||||
logDir = cli.App().Flag("log-dir", "Directory where log files should be written.").Envar("KOPIA_LOG_DIR").Default(ospath.LogsDir()).String()
|
||||
logDirMaxFiles = cli.App().Flag("log-dir-max-files", "Maximum number of log files to retain").Envar("KOPIA_LOG_DIR_MAX_FILES").Default("1000").Hidden().Int()
|
||||
logDirMaxAge = cli.App().Flag("log-dir-max-age", "Maximum age of log files to retain").Envar("KOPIA_LOG_DIR_MAX_AGE").Hidden().Duration()
|
||||
logLevel = cli.App().Flag("log-level", "Console log level").Default("info").Enum(logLevels...)
|
||||
fileLogLevel = cli.App().Flag("file-log-level", "File log level").Default("debug").Enum(logLevels...)
|
||||
logFile = cli.App().Flag("log-file", "Override log file.").String()
|
||||
contentLogFile = cli.App().Flag("content-log-file", "Override content log file.").String()
|
||||
|
||||
logDir = cli.App().Flag("log-dir", "Directory where log files should be written.").Envar("KOPIA_LOG_DIR").Default(ospath.LogsDir()).String()
|
||||
logDirMaxFiles = cli.App().Flag("log-dir-max-files", "Maximum number of log files to retain").Envar("KOPIA_LOG_DIR_MAX_FILES").Default("1000").Hidden().Int()
|
||||
logDirMaxAge = cli.App().Flag("log-dir-max-age", "Maximum age of log files to retain").Envar("KOPIA_LOG_DIR_MAX_AGE").Hidden().Default("720h").Duration()
|
||||
contentLogDirMaxFiles = cli.App().Flag("content-log-dir-max-files", "Maximum number of content log files to retain").Envar("KOPIA_CONTENT_LOG_DIR_MAX_FILES").Default("5000").Hidden().Int()
|
||||
contentLogDirMaxAge = cli.App().Flag("content-log-dir-max-age", "Maximum age of content log files to retain").Envar("KOPIA_CONTENT_LOG_DIR_MAX_AGE").Default("720h").Hidden().Duration()
|
||||
logLevel = cli.App().Flag("log-level", "Console log level").Default("info").Enum(logLevels...)
|
||||
fileLogLevel = cli.App().Flag("file-log-level", "File log level").Default("debug").Enum(logLevels...)
|
||||
)
|
||||
|
||||
var log = repologging.GetContextLoggerFunc("kopia")
|
||||
@@ -48,67 +56,108 @@
|
||||
|
||||
// Initialize is invoked as part of command execution to create log file just before it's needed.
|
||||
func Initialize(ctx *kingpin.ParseContext) error {
|
||||
var logBackends []logging.Backend
|
||||
now := clock.Now()
|
||||
|
||||
var logFileName, symlinkName string
|
||||
|
||||
if lfn := *logFile; lfn != "" {
|
||||
var err error
|
||||
|
||||
logFileName, err = filepath.Abs(lfn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
suffix := "unknown"
|
||||
if c := ctx.SelectedCommand; c != nil {
|
||||
suffix = strings.ReplaceAll(c.FullCommand(), " ", "-")
|
||||
}
|
||||
|
||||
var shouldSweepLogs bool
|
||||
|
||||
if logFileName == "" && *logDir != "" {
|
||||
logBaseName := fmt.Sprintf("%v%v-%v%v", logFileNamePrefix, clock.Now().Format("20060102-150405"), os.Getpid(), logFileNameSuffix)
|
||||
logFileName = filepath.Join(*logDir, logBaseName)
|
||||
symlinkName = "kopia.latest.log"
|
||||
|
||||
if *logDirMaxAge > 0 || *logDirMaxFiles > 0 {
|
||||
shouldSweepLogs = true
|
||||
}
|
||||
}
|
||||
|
||||
if logFileName != "" {
|
||||
logFileDir := filepath.Dir(logFileName)
|
||||
logFileBaseName := filepath.Base(logFileName)
|
||||
|
||||
if err := os.MkdirAll(logFileDir, 0o700); err != nil {
|
||||
fmt.Fprintln(os.Stderr, "Unable to create logs directory:", err)
|
||||
}
|
||||
|
||||
logBackends = append(
|
||||
logBackends,
|
||||
levelFilter(
|
||||
*fileLogLevel,
|
||||
logging.NewBackendFormatter(
|
||||
&onDemandBackend{
|
||||
logDir: logFileDir,
|
||||
logFileBaseName: logFileBaseName,
|
||||
symlinkName: symlinkName,
|
||||
}, fileLogFormat)))
|
||||
}
|
||||
|
||||
logBackends = append(logBackends,
|
||||
levelFilter(
|
||||
*logLevel,
|
||||
logging.NewBackendFormatter(
|
||||
logging.NewLogBackend(os.Stderr, "", 0),
|
||||
consoleLogFormat)))
|
||||
|
||||
logging.SetBackend(logBackends...)
|
||||
|
||||
if shouldSweepLogs {
|
||||
go sweepLogDir(context.TODO(), *logDir, *logDirMaxFiles, *logDirMaxAge)
|
||||
}
|
||||
// activate backends
|
||||
logging.SetBackend(
|
||||
setupConsoleBackend(),
|
||||
setupLogFileBackend(now, suffix),
|
||||
setupContentLogFileBackend(now, suffix),
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func setupConsoleBackend() logging.Backend {
|
||||
l := logging.AddModuleLevel(logging.NewBackendFormatter(
|
||||
logging.NewLogBackend(os.Stderr, "", 0),
|
||||
consoleLogFormat))
|
||||
|
||||
// do not output content logs to the console
|
||||
l.SetLevel(logging.CRITICAL, content.FormatLogModule)
|
||||
|
||||
// log everything else at a level specified using --log-level
|
||||
l.SetLevel(logLevelFromFlag(*logLevel), "")
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
func setupLogFileBasedLogger(now time.Time, subdir, suffix, logFileOverride string, maxFiles int, maxAge time.Duration) logging.Backend {
|
||||
var logFileName, symlinkName string
|
||||
|
||||
if logFileOverride != "" {
|
||||
var err error
|
||||
|
||||
logFileName, err = filepath.Abs(logFileOverride)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "Unable to resolve logs path", err)
|
||||
}
|
||||
}
|
||||
|
||||
if logFileName == "" {
|
||||
logBaseName := fmt.Sprintf("%v%v-%v-%v%v", logFileNamePrefix, now.Format("20060102-150405"), os.Getpid(), suffix, logFileNameSuffix)
|
||||
logFileName = filepath.Join(*logDir, subdir, logBaseName)
|
||||
symlinkName = "latest.log"
|
||||
}
|
||||
|
||||
logDir := filepath.Dir(logFileName)
|
||||
logFileBaseName := filepath.Base(logFileName)
|
||||
|
||||
if err := os.MkdirAll(logDir, 0o700); err != nil {
|
||||
fmt.Fprintln(os.Stderr, "Unable to create logs directory:", err)
|
||||
}
|
||||
|
||||
// do not scrub directory if custom log file has been provided.
|
||||
if logFileOverride == "" && shouldSweepLog(maxFiles, maxAge) {
|
||||
go sweepLogDir(context.TODO(), logDir, maxFiles, maxAge)
|
||||
}
|
||||
|
||||
return &onDemandBackend{
|
||||
logDir: logDir,
|
||||
logFileBaseName: logFileBaseName,
|
||||
symlinkName: symlinkName,
|
||||
}
|
||||
}
|
||||
|
||||
func setupLogFileBackend(now time.Time, suffix string) logging.Backend {
|
||||
l := logging.AddModuleLevel(
|
||||
logging.NewBackendFormatter(
|
||||
setupLogFileBasedLogger(now, "cli-logs", suffix, *logFile, *logDirMaxFiles, *logDirMaxAge),
|
||||
fileLogFormat))
|
||||
|
||||
// do not output content logs to the regular log file
|
||||
l.SetLevel(logging.CRITICAL, content.FormatLogModule)
|
||||
|
||||
// log everything else at a level specified using --file-level
|
||||
l.SetLevel(logLevelFromFlag(*fileLogLevel), "")
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
func setupContentLogFileBackend(now time.Time, suffix string) logging.Backend {
|
||||
l := logging.AddModuleLevel(
|
||||
logging.NewBackendFormatter(
|
||||
setupLogFileBasedLogger(now, "content-logs", suffix, *contentLogFile, *contentLogDirMaxFiles, *contentLogDirMaxAge),
|
||||
contentLogFormat))
|
||||
|
||||
// only log content entries
|
||||
l.SetLevel(logging.DEBUG, content.FormatLogModule)
|
||||
|
||||
// do not log anything else
|
||||
l.SetLevel(logging.CRITICAL, "")
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
func shouldSweepLog(maxFiles int, maxAge time.Duration) bool {
|
||||
return maxFiles > 0 || maxAge > 0
|
||||
}
|
||||
|
||||
func sweepLogDir(ctx context.Context, dirname string, maxCount int, maxAge time.Duration) {
|
||||
var timeCutoff time.Time
|
||||
if maxAge > 0 {
|
||||
@@ -150,23 +199,19 @@ func sweepLogDir(ctx context.Context, dirname string, maxCount int, maxAge time.
|
||||
}
|
||||
}
|
||||
|
||||
func levelFilter(level string, writer logging.Backend) logging.Backend {
|
||||
l := logging.AddModuleLevel(writer)
|
||||
|
||||
switch level {
|
||||
func logLevelFromFlag(levelString string) logging.Level {
|
||||
switch levelString {
|
||||
case "debug":
|
||||
l.SetLevel(logging.DEBUG, "")
|
||||
return logging.DEBUG
|
||||
case "info":
|
||||
l.SetLevel(logging.INFO, "")
|
||||
return logging.INFO
|
||||
case "warning":
|
||||
l.SetLevel(logging.WARNING, "")
|
||||
return logging.WARNING
|
||||
case "error":
|
||||
l.SetLevel(logging.ERROR, "")
|
||||
return logging.ERROR
|
||||
default:
|
||||
l.SetLevel(logging.CRITICAL, "")
|
||||
return logging.CRITICAL
|
||||
}
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
type onDemandBackend struct {
|
||||
|
||||
@@ -101,8 +101,6 @@ func (b *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) (b
|
||||
return false, nil
|
||||
}
|
||||
|
||||
log(ctx).Debugf("set of index files has changed (had %v, now %v)", len(b.inUse), len(packFiles))
|
||||
|
||||
var newMerged mergedIndex
|
||||
|
||||
newInUse := map[blob.ID]packIndex{}
|
||||
|
||||
@@ -24,7 +24,7 @@
|
||||
|
||||
var (
|
||||
log = logging.GetContextLoggerFunc("kopia/content")
|
||||
formatLog = logging.GetContextLoggerFunc("kopia/content/format")
|
||||
formatLog = logging.GetContextLoggerFunc(FormatLogModule)
|
||||
)
|
||||
|
||||
// Prefixes for pack blobs.
|
||||
@@ -32,6 +32,8 @@
|
||||
PackBlobIDPrefixRegular blob.ID = "p"
|
||||
PackBlobIDPrefixSpecial blob.ID = "q"
|
||||
|
||||
FormatLogModule = "kopia/format"
|
||||
|
||||
maxHashSize = 64
|
||||
defaultEncryptionBufferPoolSegmentSize = 8 << 20 // 8 MB
|
||||
)
|
||||
@@ -104,7 +106,7 @@ func (bm *Manager) DeleteContent(ctx context.Context, contentID ID) error {
|
||||
bm.lock()
|
||||
defer bm.unlock()
|
||||
|
||||
log(ctx).Debugf("DeleteContent(%q)", contentID)
|
||||
formatLog(ctx).Debugf("delete-content %v", contentID)
|
||||
|
||||
// remove from all pending packs
|
||||
for _, pp := range bm.pendingPacks {
|
||||
@@ -161,7 +163,7 @@ func (bm *Manager) addToPackUnlocked(ctx context.Context, contentID ID, data []b
|
||||
|
||||
// do not start new uploads while flushing
|
||||
for bm.flushing {
|
||||
formatLog(ctx).Debugf("waiting before flush completes")
|
||||
formatLog(ctx).Debugf("wait-before-flush")
|
||||
bm.cond.Wait()
|
||||
}
|
||||
|
||||
@@ -378,10 +380,11 @@ func (bm *Manager) prepareAndWritePackInternal(ctx context.Context, pp *pendingP
|
||||
|
||||
if pp.currentPackData.Length() > 0 {
|
||||
if err := bm.writePackFileNotLocked(ctx, pp.packBlobID, pp.currentPackData.Bytes); err != nil {
|
||||
formatLog(ctx).Debugf("failed-pack %v %v", pp.packBlobID, err)
|
||||
return nil, errors.Wrap(err, "can't save pack data content")
|
||||
}
|
||||
|
||||
formatLog(ctx).Debugf("wrote pack file: %v (%v bytes)", pp.packBlobID, pp.currentPackData.Length())
|
||||
formatLog(ctx).Debugf("wrote-pack %v %v", pp.packBlobID, pp.currentPackData.Length())
|
||||
}
|
||||
|
||||
return packFileIndex, nil
|
||||
@@ -423,6 +426,8 @@ func (bm *Manager) Flush(ctx context.Context) error {
|
||||
bm.lock()
|
||||
defer bm.unlock()
|
||||
|
||||
formatLog(ctx).Debugf("flush")
|
||||
|
||||
bm.flushing = true
|
||||
|
||||
defer func() {
|
||||
@@ -453,7 +458,7 @@ func (bm *Manager) Flush(ctx context.Context) error {
|
||||
|
||||
// RewriteContent causes reads and re-writes a given content using the most recent format.
|
||||
func (bm *Manager) RewriteContent(ctx context.Context, contentID ID) error {
|
||||
log(ctx).Debugf("RewriteContent(%q)", contentID)
|
||||
formatLog(ctx).Debugf("rewrite-content %v", contentID)
|
||||
|
||||
pp, bi, err := bm.getContentInfo(contentID)
|
||||
if err != nil {
|
||||
@@ -543,8 +548,13 @@ func (bm *Manager) WriteContent(ctx context.Context, data []byte, prefix ID) (ID
|
||||
// content already tracked
|
||||
if _, bi, err := bm.getContentInfo(contentID); err == nil {
|
||||
if !bi.Deleted {
|
||||
formatLog(ctx).Debugf("write-content %v already-exists", contentID)
|
||||
return contentID, nil
|
||||
}
|
||||
|
||||
formatLog(ctx).Debugf("write-content %v previously-deleted", contentID)
|
||||
} else {
|
||||
formatLog(ctx).Debugf("write-content %v new", contentID)
|
||||
}
|
||||
|
||||
err := bm.addToPackUnlocked(ctx, contentID, data, false)
|
||||
|
||||
@@ -33,7 +33,6 @@ func (bm *Manager) CompactIndexes(ctx context.Context, opt CompactOptions) error
|
||||
}
|
||||
|
||||
blobsToCompact := bm.getBlobsToCompact(ctx, indexBlobs, opt)
|
||||
log(ctx).Debugf("blobs to compact: %v", blobsToCompact)
|
||||
|
||||
if err := bm.compactIndexBlobs(ctx, blobsToCompact, opt); err != nil {
|
||||
return errors.Wrap(err, "error performing compaction")
|
||||
@@ -68,16 +67,16 @@ func (bm *Manager) getBlobsToCompact(ctx context.Context, indexBlobs []IndexBlob
|
||||
|
||||
if len(nonCompactedBlobs) < opt.MaxSmallBlobs {
|
||||
// current count is below min allowed - nothing to do
|
||||
formatLog(ctx).Debugf("no small contents to compact")
|
||||
log(ctx).Debugf("no small contents to compact")
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(verySmallBlobs) > len(nonCompactedBlobs)/2 && mediumSizedBlobCount+1 < opt.MaxSmallBlobs {
|
||||
formatLog(ctx).Debugf("compacting %v very small contents", len(verySmallBlobs))
|
||||
log(ctx).Debugf("compacting %v very small contents", len(verySmallBlobs))
|
||||
return verySmallBlobs
|
||||
}
|
||||
|
||||
formatLog(ctx).Debugf("compacting all %v non-compacted contents", len(nonCompactedBlobs))
|
||||
log(ctx).Debugf("compacting all %v non-compacted contents", len(nonCompactedBlobs))
|
||||
|
||||
return nonCompactedBlobs
|
||||
}
|
||||
@@ -87,14 +86,12 @@ func (bm *Manager) compactIndexBlobs(ctx context.Context, indexBlobs []IndexBlob
|
||||
return nil
|
||||
}
|
||||
|
||||
formatLog(ctx).Debugf("compacting %v index blobs", len(indexBlobs))
|
||||
|
||||
bld := make(packIndexBuilder)
|
||||
|
||||
var inputs, outputs []blob.Metadata
|
||||
|
||||
for _, indexBlob := range indexBlobs {
|
||||
formatLog(ctx).Debugf("adding index entries %v to compacted index", indexBlob)
|
||||
for i, indexBlob := range indexBlobs {
|
||||
formatLog(ctx).Debugf("compacting-entries[%v/%v] %v", i, len(indexBlobs), indexBlob)
|
||||
|
||||
if err := bm.addIndexBlobsToBuilder(ctx, bld, indexBlob); err != nil {
|
||||
return errors.Wrap(err, "error adding index to builder")
|
||||
@@ -121,7 +118,7 @@ func (bm *Manager) compactIndexBlobs(ctx context.Context, indexBlobs []IndexBlob
|
||||
// it must be a no-op.
|
||||
for _, indexBlob := range indexBlobs {
|
||||
if indexBlob.BlobID == compactedIndexBlob.BlobID {
|
||||
formatLog(ctx).Debugf("compaction was a no-op")
|
||||
formatLog(ctx).Debugf("compaction-noop")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -138,18 +135,22 @@ func (bm *Manager) compactIndexBlobs(ctx context.Context, indexBlobs []IndexBlob
|
||||
func dropContentsFromBuilder(ctx context.Context, bld packIndexBuilder, opt CompactOptions) {
|
||||
for _, dc := range opt.DropContents {
|
||||
if _, ok := bld[dc]; ok {
|
||||
log(ctx).Debugf("dropping content %v", dc)
|
||||
formatLog(ctx).Debugf("manual-drop-from-index %v", dc)
|
||||
delete(bld, dc)
|
||||
}
|
||||
}
|
||||
|
||||
if !opt.DropDeletedBefore.IsZero() {
|
||||
formatLog(ctx).Debugf("drop-content-deleted-before %v", opt.DropDeletedBefore)
|
||||
|
||||
for _, i := range bld {
|
||||
if i.Deleted && i.Timestamp().Before(opt.DropDeletedBefore) {
|
||||
log(ctx).Debugf("dropping content %v deleted at %v", i.ID, i.Timestamp())
|
||||
formatLog(ctx).Debugf("drop-from-index-old-deleted %v %v", i.ID, i.Timestamp())
|
||||
delete(bld, i.ID)
|
||||
}
|
||||
}
|
||||
|
||||
formatLog(ctx).Debugf("finished drop-content-deleted-before %v", opt.DropDeletedBefore)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -197,7 +197,7 @@ func (bm *lockFreeManager) unprocessedIndexBlobsUnlocked(ctx context.Context, co
|
||||
}
|
||||
|
||||
if has {
|
||||
log(ctx).Debugf("index blob %q already in cache, skipping", c.BlobID)
|
||||
formatLog(ctx).Debugf("index-already-cached %v", c.BlobID)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -280,8 +280,6 @@ func (bm *lockFreeManager) decryptAndVerify(encrypted, iv []byte) ([]byte, error
|
||||
}
|
||||
|
||||
func (bm *lockFreeManager) preparePackDataContent(ctx context.Context, pp *pendingPackInfo) (packIndexBuilder, error) {
|
||||
formatLog(ctx).Debugf("preparing content data with %v items (contents %v)", len(pp.currentPackItems), pp.currentPackData.Length())
|
||||
|
||||
packFileIndex := packIndexBuilder{}
|
||||
haveContent := false
|
||||
|
||||
@@ -290,6 +288,8 @@ func (bm *lockFreeManager) preparePackDataContent(ctx context.Context, pp *pendi
|
||||
haveContent = true
|
||||
}
|
||||
|
||||
formatLog(ctx).Debugf("add-to-pack %v %v p:%v %v d:%v", pp.packBlobID, info.ID, info.PackBlobID, info.Length, info.Deleted)
|
||||
|
||||
packFileIndex.Add(info)
|
||||
}
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ type memoryOwnWritesCache struct {
|
||||
}
|
||||
|
||||
func (n *memoryOwnWritesCache) add(ctx context.Context, mb blob.Metadata) error {
|
||||
log(ctx).Debugf("adding %v to own-writes cache", mb.BlobID)
|
||||
formatLog(ctx).Debugf("own-writes-cache-add %v", mb)
|
||||
n.entries.Store(mb.BlobID, mb)
|
||||
|
||||
return nil
|
||||
@@ -73,7 +73,7 @@ func (n *memoryOwnWritesCache) merge(ctx context.Context, prefix blob.ID, source
|
||||
if age := n.timeNow().Sub(md.Timestamp); age < ownWritesCacheRetention {
|
||||
result = append(result, md)
|
||||
} else {
|
||||
log(ctx).Debugf("deleting stale own writes cache entry: %v (%v)", key, age)
|
||||
formatLog(ctx).Debugf("own-writes-cache-expired %v %v", key, age)
|
||||
|
||||
n.entries.Delete(key)
|
||||
}
|
||||
@@ -81,7 +81,7 @@ func (n *memoryOwnWritesCache) merge(ctx context.Context, prefix blob.ID, source
|
||||
return true
|
||||
})
|
||||
|
||||
return mergeOwnWrites(ctx, source, result), nil
|
||||
return mergeOwnWrites(source, result), nil
|
||||
}
|
||||
|
||||
// persistentOwnWritesCache is an implementation of ownWritesCache that caches entries to strongly consistent blob storage.
|
||||
@@ -123,7 +123,7 @@ func (d *persistentOwnWritesCache) merge(ctx context.Context, prefix blob.ID, so
|
||||
if age := d.timeNow().Sub(md.Timestamp); age < ownWritesCacheRetention {
|
||||
myWrites = append(myWrites, originalMD)
|
||||
} else {
|
||||
log(ctx).Debugf("deleting blob %v from own-write cache because it's too old: %v (%v)", md.BlobID, age, originalMD.Timestamp)
|
||||
log(ctx).Debugf("own-writes-cache-expired: %v (%v)", md, age)
|
||||
|
||||
if err := d.st.DeleteBlob(ctx, md.BlobID); err != nil && !errors.Is(err, blob.ErrBlobNotFound) {
|
||||
return errors.Wrap(err, "error deleting stale blob")
|
||||
@@ -133,7 +133,7 @@ func (d *persistentOwnWritesCache) merge(ctx context.Context, prefix blob.ID, so
|
||||
return nil
|
||||
})
|
||||
|
||||
return mergeOwnWrites(ctx, source, myWrites), err
|
||||
return mergeOwnWrites(source, myWrites), err
|
||||
}
|
||||
|
||||
func (d *persistentOwnWritesCache) delete(ctx context.Context, blobID blob.ID) error {
|
||||
@@ -144,7 +144,7 @@ func (d *persistentOwnWritesCache) delete(ctx context.Context, blobID blob.ID) e
|
||||
})
|
||||
}
|
||||
|
||||
func mergeOwnWrites(ctx context.Context, source, own []blob.Metadata) []blob.Metadata {
|
||||
func mergeOwnWrites(source, own []blob.Metadata) []blob.Metadata {
|
||||
m := map[blob.ID]blob.Metadata{}
|
||||
|
||||
for _, v := range source {
|
||||
@@ -165,8 +165,6 @@ func mergeOwnWrites(ctx context.Context, source, own []blob.Metadata) []blob.Met
|
||||
s = append(s, v)
|
||||
}
|
||||
|
||||
log(ctx).Debugf("merged %v backend blobs and %v local blobs into %v", source, own, s)
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
|
||||
@@ -90,6 +90,14 @@ func (m *indexBlobManagerImpl) listIndexBlobs(ctx context.Context, includeInacti
|
||||
return nil, errors.Wrap(err, "error merging local writes for index blobs")
|
||||
}
|
||||
|
||||
for i, sib := range storageIndexBlobs {
|
||||
formatLog(ctx).Debugf("found-index-blobs[%v] = %v", i, sib)
|
||||
}
|
||||
|
||||
for i, clm := range compactionLogMetadata {
|
||||
formatLog(ctx).Debugf("found-compaction-blobs[%v] %v", i, clm)
|
||||
}
|
||||
|
||||
indexMap := map[blob.ID]*IndexBlobInfo{}
|
||||
addBlobsToIndex(indexMap, storageIndexBlobs)
|
||||
|
||||
@@ -106,6 +114,10 @@ func (m *indexBlobManagerImpl) listIndexBlobs(ctx context.Context, includeInacti
|
||||
results = append(results, *v)
|
||||
}
|
||||
|
||||
for i, res := range results {
|
||||
formatLog(ctx).Debugf("active-index-blobs[%v] = %v", i, res)
|
||||
}
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
@@ -128,7 +140,15 @@ func (m *indexBlobManagerImpl) registerCompaction(ctx context.Context, inputs, o
|
||||
return errors.Wrap(err, "unable to write compaction log")
|
||||
}
|
||||
|
||||
formatLog(ctx).Debugf("compacted indexes %v into %v and wrote log %v", inputs, outputs, compactionLogBlobMetadata)
|
||||
for i, input := range inputs {
|
||||
formatLog(ctx).Debugf("compacted-input[%v/%v] %v", i, len(inputs), input)
|
||||
}
|
||||
|
||||
for i, output := range outputs {
|
||||
formatLog(ctx).Debugf("compacted-output[%v/%v] %v", i, len(outputs), output)
|
||||
}
|
||||
|
||||
formatLog(ctx).Debugf("compaction-log %v %v", compactionLogBlobMetadata.BlobID, compactionLogBlobMetadata.Timestamp)
|
||||
|
||||
if err := m.deleteOldBlobs(ctx, compactionLogBlobMetadata); err != nil {
|
||||
return errors.Wrap(err, "error deleting old index blobs")
|
||||
@@ -204,16 +224,20 @@ func (m *indexBlobManagerImpl) encryptAndWriteBlob(ctx context.Context, data []b
|
||||
|
||||
err = m.st.PutBlob(ctx, blobID, gather.FromSlice(data2))
|
||||
if err != nil {
|
||||
formatLog(ctx).Debugf("write-index-blob %v failed %v", blobID, err)
|
||||
return blob.Metadata{}, err
|
||||
}
|
||||
|
||||
bm, err := m.st.GetMetadata(ctx, blobID)
|
||||
if err != nil {
|
||||
formatLog(ctx).Debugf("write-index-blob-get-metadata %v failed %v", blobID, err)
|
||||
return blob.Metadata{}, errors.Wrap(err, "unable to get blob metadata")
|
||||
}
|
||||
|
||||
formatLog(ctx).Debugf("write-index-blob %v %v %v", blobID, bm.Length, bm.Timestamp)
|
||||
|
||||
if err := m.ownWritesCache.add(ctx, bm); err != nil {
|
||||
log(ctx).Warningf("unable to cache own write: %v", err)
|
||||
formatLog(ctx).Warningf("own-writes-cache failure: %v", err)
|
||||
}
|
||||
|
||||
return bm, nil
|
||||
@@ -381,9 +405,12 @@ func (m *indexBlobManagerImpl) delayCleanupBlobs(ctx context.Context, blobIDs []
|
||||
func (m *indexBlobManagerImpl) deleteBlobsFromStorageAndCache(ctx context.Context, blobIDs []blob.ID) error {
|
||||
for _, blobID := range blobIDs {
|
||||
if err := m.st.DeleteBlob(ctx, blobID); err != nil && !errors.Is(err, blob.ErrBlobNotFound) {
|
||||
formatLog(ctx).Debugf("delete-blob failed %v %v", blobID, err)
|
||||
return errors.Wrapf(err, "unable to delete blob %v", blobID)
|
||||
}
|
||||
|
||||
formatLog(ctx).Debugf("delete-blob succeeded %v", blobID)
|
||||
|
||||
if err := m.ownWritesCache.delete(ctx, blobID); err != nil {
|
||||
return errors.Wrapf(err, "unable to delete blob %v from own-writes cache", blobID)
|
||||
}
|
||||
@@ -475,7 +502,7 @@ func removeCompactedIndexes(ctx context.Context, m map[blob.ID]*IndexBlobInfo, c
|
||||
for _, cl := range validCompactionLogs {
|
||||
for _, ib := range cl.InputMetadata {
|
||||
if md := m[ib.BlobID]; md != nil && md.Superseded == nil {
|
||||
log(ctx).Debugf("ignoring index blob %v (%v) because it's been compacted to %v", ib, md.Timestamp, cl.OutputMetadata)
|
||||
formatLog(ctx).Debugf("ignore-index-blob %v compacted to %v", ib, cl.OutputMetadata)
|
||||
|
||||
if markAsSuperseded {
|
||||
md.Superseded = cl.OutputMetadata
|
||||
|
||||
@@ -30,7 +30,7 @@ func (c *listCache) listBlobs(ctx context.Context, prefix blob.ID) ([]blob.Metad
|
||||
if err == nil {
|
||||
expirationTime := ci.Timestamp.Add(c.listCacheDuration)
|
||||
if clock.Now().Before(expirationTime) {
|
||||
log(ctx).Debugf("retrieved list of %v '%v' index blobs from cache", len(ci.Blobs), prefix)
|
||||
formatLog(ctx).Debugf("list-from-cache '%v' found %v", prefix, len(ci.Blobs))
|
||||
return ci.Blobs, nil
|
||||
}
|
||||
} else if !errors.Is(err, blob.ErrBlobNotFound) {
|
||||
|
||||
@@ -41,6 +41,8 @@ type CLITest struct {
|
||||
Environment []string
|
||||
|
||||
PassthroughStderr bool
|
||||
|
||||
LogsDir string
|
||||
}
|
||||
|
||||
// SourceInfo reprents a single source (user@host:/path) with its snapshots.
|
||||
@@ -76,9 +78,15 @@ func NewCLITest(t *testing.T) *CLITest {
|
||||
t.Fatalf("can't create temp directory: %v", err)
|
||||
}
|
||||
|
||||
logsDir, err := ioutil.TempDir("", "kopia-logs")
|
||||
if err != nil {
|
||||
t.Fatalf("can't create temp directory: %v", err)
|
||||
}
|
||||
|
||||
fixedArgs := []string{
|
||||
// use per-test config file, to avoid clobbering current user's setup.
|
||||
"--config-file", filepath.Join(ConfigDir, ".kopia.config"),
|
||||
"--log-dir", logsDir,
|
||||
}
|
||||
|
||||
// disable the use of keyring
|
||||
@@ -97,6 +105,7 @@ func NewCLITest(t *testing.T) *CLITest {
|
||||
ConfigDir: ConfigDir,
|
||||
Exe: filepath.FromSlash(exe),
|
||||
fixedArgs: fixedArgs,
|
||||
LogsDir: logsDir,
|
||||
Environment: []string{
|
||||
"KOPIA_PASSWORD=" + repoPassword,
|
||||
"KOPIA_ADVANCED_COMMANDS=enabled",
|
||||
@@ -107,7 +116,7 @@ func NewCLITest(t *testing.T) *CLITest {
|
||||
// Cleanup cleans up the test Environment unless the test has failed.
|
||||
func (e *CLITest) Cleanup(t *testing.T) {
|
||||
if t.Failed() {
|
||||
t.Logf("skipped cleanup for failed test, examine repository: %v", e.RepoDir)
|
||||
t.Logf("skipped cleanup for failed test, examine repository: %v and logs %v", e.RepoDir, e.LogsDir)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -118,6 +127,10 @@ func (e *CLITest) Cleanup(t *testing.T) {
|
||||
if e.ConfigDir != "" {
|
||||
os.RemoveAll(e.ConfigDir)
|
||||
}
|
||||
|
||||
if e.LogsDir != "" {
|
||||
os.RemoveAll(e.LogsDir)
|
||||
}
|
||||
}
|
||||
|
||||
// RunAndExpectSuccess runs the given command, expects it to succeed and returns its output lines.
|
||||
|
||||
Reference in New Issue
Block a user