Files
kopia/internal/server/api_estimate.go
Jarek Kowalski 0f7253eb66 feat(general): rewrote content logs to always be JSON-based and reorganized log structure (#4822)
This is a breaking change to users who might be using Kopia as a library.

### Log Format

```json
{"t":"<timestamp-rfc-3389-microseconds>", "span:T1":"V1", "span:T2":"V2", "n":"<source>", "m":"<message>", /*parameters*/}
```

Where each record is associated with one or more spans that describe its scope:

* `"span:client": "<hash-of-username@hostname>"`
* `"span:repo": "<random>"` - random identifier of a repository connection (from `repo.Open`)
* `"span:maintenance": "<random>"` - random identifier of a maintenance session
* `"span:upload": "<hash-of-username@host:/path>"` - uniquely identifies upload session of a given directory
* `"span:checkpoint": "<random>"` - encapsulates each checkpoint operation during Upload
* `"span:server-session": "<random>"` -single client connection to the server
* `"span:flush": "<random>"` - encapsulates each Flush session
* `"span:maintenance": "<random>"` - encapsulates each maintenance operation
* `"span:loadIndex" : "<random>"` - encapsulates index loading operation
* `"span:emr" : "<random>"` - encapsulates epoch manager refresh
* `"span:writePack": "<pack-blob-ID>"` - encapsulates pack blob preparation and writing

(plus additional minor spans for various phases of the maintenance).

Notable points:

- Used internal zero allocation JSON writer for reduced memory usage.
- renamed `--disable-internal-log` to `--disable-repository-log` (controls saving blobs to repository)
- added `--disable-content-log` (controls writing of `content-log` files)
- all storage operations are also logged in a structural way and associated with the corresponding spans.
- all content IDs are logged in a truncated format (since first N bytes that are usually enough to be unique) to improve compressibility of logs (blob IDs are frequently repeated but content IDs usually appear just once).

This format should make it possible to recreate the journey of any single content throughout pack blobs, indexes and compaction events.
2025-09-27 17:11:13 -07:00

155 lines
4.5 KiB
Go

package server
import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"sync/atomic"
"github.com/pkg/errors"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/localfs"
"github.com/kopia/kopia/internal/ospath"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/internal/uitask"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/upload"
)
type estimateTaskProgress struct {
ctrl uitask.Controller
}
func (p estimateTaskProgress) Processing(_ context.Context, dirname string) {
p.ctrl.ReportProgressInfo(dirname)
}
func (p estimateTaskProgress) Error(ctx context.Context, dirname string, err error, isIgnored bool) {
if isIgnored {
userLog(ctx).Errorf("ignored error in %v: %v", dirname, err)
} else {
userLog(ctx).Errorf("error in %v: %v", dirname, err)
}
}
func (p estimateTaskProgress) Stats(ctx context.Context, st *snapshot.Stats, included, excluded upload.SampleBuckets, excludedDirs []string, final bool) {
_ = excludedDirs
_ = final
p.ctrl.ReportCounters(map[string]uitask.CounterValue{
"Bytes": uitask.BytesCounter(atomic.LoadInt64(&st.TotalFileSize)),
"Excluded Bytes": uitask.BytesCounter(atomic.LoadInt64(&st.ExcludedTotalFileSize)),
"Files": uitask.SimpleCounter(int64(atomic.LoadInt32(&st.TotalFileCount))),
"Directories": uitask.SimpleCounter(int64(atomic.LoadInt32(&st.TotalDirectoryCount))),
"Excluded Files": uitask.SimpleCounter(int64(atomic.LoadInt32(&st.ExcludedFileCount))),
"Excluded Directories": uitask.SimpleCounter(int64(atomic.LoadInt32(&st.ExcludedDirCount))),
"Errors": uitask.ErrorCounter(int64(atomic.LoadInt32(&st.ErrorCount))),
"Ignored Errors": uitask.ErrorCounter(int64(atomic.LoadInt32(&st.IgnoredErrorCount))),
})
if final {
logBucketSamples(ctx, included, "Included", false)
logBucketSamples(ctx, excluded, "Excluded", true)
}
}
func logBucketSamples(ctx context.Context, buckets upload.SampleBuckets, prefix string, showExamples bool) {
hasAny := false
for i, bucket := range buckets {
if bucket.Count == 0 {
continue
}
var sizeRange string
if i == 0 {
sizeRange = fmt.Sprintf("< %-6v",
units.BytesString(bucket.MinSize))
} else {
sizeRange = fmt.Sprintf("%-6v...%6v",
units.BytesString(bucket.MinSize),
units.BytesString(buckets[i-1].MinSize))
}
userLog(ctx).Infof("%v files %v: %7v files, total size %v\n",
prefix,
sizeRange,
bucket.Count, units.BytesString(bucket.TotalSize))
hasAny = true
if showExamples && len(bucket.Examples) > 0 {
userLog(ctx).Info("Examples:")
for _, sample := range bucket.Examples {
userLog(ctx).Infof(" - %v\n", sample)
}
}
}
if !hasAny {
userLog(ctx).Infof("%v files: None", prefix)
}
}
var _ upload.EstimateProgress = estimateTaskProgress{}
func handleEstimate(ctx context.Context, rc requestContext) (any, *apiError) {
var req serverapi.EstimateRequest
if err := json.Unmarshal(rc.body, &req); err != nil {
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body")
}
resolvedRoot := filepath.Clean(ospath.ResolveUserFriendlyPath(req.Root, true))
e, err := localfs.NewEntry(resolvedRoot)
if err != nil {
return nil, internalServerError(errors.Wrap(err, "can't get local fs entry"))
}
dir, ok := e.(fs.Directory)
if !ok {
return nil, internalServerError(errors.Wrap(err, "estimation is only supported on directories"))
}
taskIDChan := make(chan string)
policyTree, err := policy.TreeForSourceWithOverride(ctx, rc.rep, snapshot.SourceInfo{
Host: rc.rep.ClientOptions().Hostname,
UserName: rc.rep.ClientOptions().Username,
Path: resolvedRoot,
}, req.PolicyOverride)
if err != nil {
return nil, internalServerError(errors.Wrap(err, "unable to get policy tree"))
}
// launch a goroutine that will continue the estimate and can be observed in the Tasks UI.
//nolint:errcheck
go rc.srv.taskManager().Run(ctx, "Estimate", resolvedRoot, func(ctx context.Context, ctrl uitask.Controller) error {
taskIDChan <- ctrl.CurrentTaskID()
estimatectx, cancel := context.WithCancel(ctx)
defer cancel()
ctrl.OnCancel(cancel)
return upload.Estimate(estimatectx, dir, policyTree, estimateTaskProgress{ctrl}, req.MaxExamplesPerBucket)
})
taskID := <-taskIDChan
task, ok := rc.srv.taskManager().GetTask(taskID)
if !ok {
return nil, internalServerError(errors.New("task not found"))
}
return task, nil
}