Files
kopia/cli/observability_flags.go
Julio López 39fb62970f refactor(cli): refactor diagnosis flags (#5026)
Refactor `--profile-*` flags:
- Multiple profile types can be enabled at once, before only
  a single type profiling could be done during a process execution.
- The new `--profiles-store-on-exit` enables all available profile
  types, except for CPU profiling which needs to be explicitly enabled.
- Profiling parameters can now be set via new flags. This allows setting
  the profile parameters for the pprof endpoint, as well as when saving
  profiles to files on exit.
- Group profiling flags with other observability flags
- Adds a `--diagnostics-output-directory` flag that unifies and
  supersedes the `--profile-dir` and `--metrics-directory` flags

Enhancements and behavior changes:
- Profile flags now have effect for all kopia commands, including
  `server start`. Before these flags did not have any effect
  in a few commands.
- Multiple profile types can be enabled at once, before only
  a single type profiling could be done during a process execution.
- The new `--profiles-store-on-exit` enables all available profile
  types, except for CPU profiling which needs to be explicitly enabled.
- Profiling parameters can now be set via new flags. This allows setting
  the profile parameters for the pprof endpoint, as well as when saving
  profiles to files on exit.

The following flags have been removed:
- `--profile-dir`: superseded by the `--diagnostics-output-directory` flag
- `--profile-blocking`: the `--profile-store-on-exit` flag enables blocking
  profiling. Use `--profile-blocking-rate=0` to explicitly disable it.
- `--profile-memory`: the `--profile-store-on-exit` flag enables memory
  profiling. Use `--profile-memory-rate=0` to explicitly disable it.
- `--profile-mutex`: the `--profile-store-on-exit` flag enables mutex
  profiling. Use `--profile-mutex-fraction=0` to explicitly disable it.

Add CLI test for profile flags.
2025-11-26 09:40:01 -08:00

324 lines
10 KiB
Go

package cli
import (
"context"
"net/http"
"net/http/pprof"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/alecthomas/kingpin/v2"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
"github.com/prometheus/common/expfmt"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
oteltrace "go.opentelemetry.io/otel/trace"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo"
)
// DirMode is the directory mode for output directories.
const DirMode = 0o700
//nolint:gochecknoglobals
var metricsPushFormats = map[string]expfmt.Format{
"text": expfmt.NewFormat(expfmt.TypeTextPlain),
"proto-text": expfmt.NewFormat(expfmt.TypeProtoText),
"proto-delim": expfmt.NewFormat(expfmt.TypeProtoDelim),
"proto-compact": expfmt.NewFormat(expfmt.TypeProtoCompact),
"open-metrics": expfmt.NewFormat(expfmt.TypeOpenMetrics),
"open-metrics-0.0.1": "application/openmetrics-text; version=0.0.1; charset=utf-8",
}
type observabilityFlags struct {
outputDirectory string
dumpAllocatorStats bool
enablePProfEndpoint bool
metricsListenAddr string
metricsPushAddr string
metricsJob string
metricsPushInterval time.Duration
metricsGroupings []string
metricsPushUsername string
metricsPushPassword string
metricsPushFormat string
otlpTrace bool
saveMetrics bool
pf profileFlags
outputSubdirectoryName string
stopPusher chan struct{}
pusherWG sync.WaitGroup
traceProvider *trace.TracerProvider
}
func (c *observabilityFlags) setup(svc appServices, app *kingpin.Application) {
app.Flag("dump-allocator-stats", "Dump allocator stats at the end of execution.").Hidden().Envar(svc.EnvName("KOPIA_DUMP_ALLOCATOR_STATS")).BoolVar(&c.dumpAllocatorStats)
app.Flag("metrics-listen-addr", "Expose Prometheus metrics on a given host:port").Hidden().StringVar(&c.metricsListenAddr)
app.Flag("enable-pprof", "Expose pprof handlers").Hidden().BoolVar(&c.enablePProfEndpoint)
// push gateway parameters
app.Flag("metrics-push-addr", "Address of push gateway").Envar(svc.EnvName("KOPIA_METRICS_PUSH_ADDR")).Hidden().StringVar(&c.metricsPushAddr)
app.Flag("metrics-push-interval", "Frequency of metrics push").Envar(svc.EnvName("KOPIA_METRICS_PUSH_INTERVAL")).Hidden().Default("5s").DurationVar(&c.metricsPushInterval)
app.Flag("metrics-push-job", "Job ID for to push gateway").Envar(svc.EnvName("KOPIA_METRICS_JOB")).Hidden().Default("kopia").StringVar(&c.metricsJob)
app.Flag("metrics-push-grouping", "Grouping for push gateway").Envar(svc.EnvName("KOPIA_METRICS_PUSH_GROUPING")).Hidden().StringsVar(&c.metricsGroupings)
app.Flag("metrics-push-username", "Username for push gateway").Envar(svc.EnvName("KOPIA_METRICS_PUSH_USERNAME")).Hidden().StringVar(&c.metricsPushUsername)
app.Flag("metrics-push-password", "Password for push gateway").Envar(svc.EnvName("KOPIA_METRICS_PUSH_PASSWORD")).Hidden().StringVar(&c.metricsPushPassword)
// tracing (OTLP) parameters
app.Flag("otlp-trace", "Send OpenTelemetry traces to OTLP collector using gRPC").Hidden().Envar(svc.EnvName("KOPIA_ENABLE_OTLP_TRACE")).BoolVar(&c.otlpTrace)
var formats []string
for k := range metricsPushFormats {
formats = append(formats, k)
}
sort.Strings(formats)
app.Flag("metrics-push-format", "Format to use for push gateway").Envar(svc.EnvName("KOPIA_METRICS_FORMAT")).Hidden().EnumVar(&c.metricsPushFormat, formats...)
//nolint:lll
app.Flag("diagnostics-output-directory", "Directory where the diagnostics output should be stored saved when kopia exits. Diagnostics data includes among others: metrics, traces, profiles. The output files are stored in a sub-directory for each kopia (process) execution").Hidden().Default(filepath.Join(os.TempDir(), "kopia-diagnostics")).StringVar(&c.outputDirectory)
app.Flag("metrics-store-on-exit", "Writes metrics to a file in a sub-directory of the directory specified with the --diagnostics-output-directory").Hidden().BoolVar(&c.saveMetrics)
c.pf.setup(app)
app.PreAction(c.initialize)
}
func (c *observabilityFlags) initialize(ctx *kingpin.ParseContext) error {
// write to a separate file per command and process execution to avoid
// conflicts with previously created files
command := "unknown"
if cmd := ctx.SelectedCommand; cmd != nil {
command = strings.ReplaceAll(cmd.FullCommand(), " ", "-")
}
c.outputSubdirectoryName = clock.Now().Format("20060102-150405-") + command
if (c.saveMetrics || c.pf.saveProfiles || c.pf.profileCPU) && c.outputDirectory == "" {
return errors.New("writing diagnostics output requires a non-empty directory name (specified with the '--diagnostics-output-directory' flag)")
}
return nil
}
// spanName specifies the name of the span at the start of a trace. A tracer is
// started only when spanName is not empty.
func (c *observabilityFlags) run(ctx context.Context, spanName string, f func(context.Context) error) error {
if err := c.start(ctx); err != nil {
return errors.Wrap(err, "unable to start observability facilities")
}
defer c.stop(ctx)
if err := c.pf.start(ctx, filepath.Join(c.outputDirectory, c.outputSubdirectoryName)); err != nil {
return errors.Wrap(err, "failed to start profiling")
}
defer c.pf.stop(ctx)
if spanName != "" {
tctx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindClient))
ctx = tctx
defer span.End()
}
return f(ctx)
}
func (c *observabilityFlags) start(ctx context.Context) error {
c.maybeStartListener(ctx)
if err := c.maybeStartMetricsPusher(ctx); err != nil {
return err
}
if c.saveMetrics {
// ensure the metrics output dir can be created
if _, err := mkSubdirectories(c.outputDirectory, c.outputSubdirectoryName); err != nil {
return err
}
}
return c.maybeStartTraceExporter(ctx)
}
func mkSubdirectories(directoryNames ...string) (dirName string, err error) {
dirName = filepath.Join(directoryNames...)
if err := os.MkdirAll(dirName, DirMode); err != nil {
return "", errors.Wrapf(err, "could not create '%q' subdirectory to save diagnostics output", dirName)
}
return dirName, nil
}
// Starts observability listener when a listener address is specified.
func (c *observabilityFlags) maybeStartListener(ctx context.Context) {
if c.metricsListenAddr == "" {
return
}
m := mux.NewRouter()
initPrometheus(m)
if c.enablePProfEndpoint {
m.HandleFunc("/debug/pprof/", pprof.Index)
m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
m.HandleFunc("/debug/pprof/profile", pprof.Profile)
m.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
m.HandleFunc("/debug/pprof/trace", pprof.Trace)
m.HandleFunc("/debug/pprof/{cmd}", pprof.Index) // special handling for Gorilla mux, see https://stackoverflow.com/questions/30560859/cant-use-go-tool-pprof-with-an-existing-server/71032595#71032595
}
log(ctx).Infof("starting prometheus metrics on %v", c.metricsListenAddr)
go http.ListenAndServe(c.metricsListenAddr, m) //nolint:errcheck,gosec
}
func (c *observabilityFlags) maybeStartMetricsPusher(ctx context.Context) error {
if c.metricsPushAddr == "" {
return nil
}
c.stopPusher = make(chan struct{})
c.pusherWG.Add(1)
pusher := push.New(c.metricsPushAddr, c.metricsJob)
pusher.Gatherer(prometheus.DefaultGatherer)
for _, g := range c.metricsGroupings {
const nParts = 2
parts := strings.SplitN(g, ":", nParts)
if len(parts) != nParts {
return errors.New("grouping must be name:value")
}
name := parts[0]
val := parts[1]
pusher.Grouping(name, val)
}
if c.metricsPushUsername != "" {
pusher.BasicAuth(c.metricsPushUsername, c.metricsPushPassword)
}
if c.metricsPushFormat != "" {
pusher.Format(metricsPushFormats[c.metricsPushFormat])
}
log(ctx).Infof("starting prometheus pusher on %v every %v", c.metricsPushAddr, c.metricsPushInterval)
c.pushOnce(ctx, "initial", pusher)
go c.pushPeriodically(ctx, pusher)
return nil
}
func (c *observabilityFlags) maybeStartTraceExporter(ctx context.Context) error {
if !c.otlpTrace {
return nil
}
// Create the OTLP exporter.
se := otlptracegrpc.NewUnstarted()
r := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("kopia"),
semconv.ServiceVersionKey.String(repo.BuildVersion),
)
tp := trace.NewTracerProvider(
trace.WithBatcher(se),
trace.WithResource(r),
)
if err := se.Start(ctx); err != nil {
return errors.Wrap(err, "unable to start OTLP exporter")
}
otel.SetTracerProvider(tp)
c.traceProvider = tp
return nil
}
func (c *observabilityFlags) stop(ctx context.Context) {
if c.dumpAllocatorStats {
gather.DumpStats(ctx)
}
if c.stopPusher != nil {
close(c.stopPusher)
c.pusherWG.Wait()
}
if c.traceProvider != nil {
if err := c.traceProvider.Shutdown(ctx); err != nil {
log(ctx).Warnf("unable to shutdown trace provicer: %v", err)
}
}
if c.saveMetrics {
if metricsDir, err := mkSubdirectories(c.outputDirectory, c.outputSubdirectoryName); err != nil {
log(ctx).Warnf("unable to create metrics output directory '%s': %v", metricsDir, err)
} else {
if err := prometheus.WriteToTextfile(filepath.Join(metricsDir, "kopia-metrics.prom"), prometheus.DefaultGatherer); err != nil {
log(ctx).Warnf("unable to write metrics to file: %v", err)
}
}
}
}
func (c *observabilityFlags) pushPeriodically(ctx context.Context, p *push.Pusher) {
defer c.pusherWG.Done()
ticker := time.NewTicker(c.metricsPushInterval)
for {
select {
case <-ticker.C:
c.pushOnce(ctx, "periodic", p)
case <-c.stopPusher:
ticker.Stop()
c.pushOnce(ctx, "final", p)
return
}
}
}
func (c *observabilityFlags) pushOnce(ctx context.Context, kind string, p *push.Pusher) {
log(ctx).Debugw("pushing prometheus metrics", "kind", kind)
if err := p.Push(); err != nil {
log(ctx).Debugw("error pushing prometheus metrics", "kind", kind, "err", err)
}
}