mirror of
https://github.com/kopia/kopia.git
synced 2025-12-23 22:57:50 -05:00
Refactor `--profile-*` flags: - Multiple profile types can be enabled at once, before only a single type profiling could be done during a process execution. - The new `--profiles-store-on-exit` enables all available profile types, except for CPU profiling which needs to be explicitly enabled. - Profiling parameters can now be set via new flags. This allows setting the profile parameters for the pprof endpoint, as well as when saving profiles to files on exit. - Group profiling flags with other observability flags - Adds a `--diagnostics-output-directory` flag that unifies and supersedes the `--profile-dir` and `--metrics-directory` flags Enhancements and behavior changes: - Profile flags now have effect for all kopia commands, including `server start`. Before these flags did not have any effect in a few commands. - Multiple profile types can be enabled at once, before only a single type profiling could be done during a process execution. - The new `--profiles-store-on-exit` enables all available profile types, except for CPU profiling which needs to be explicitly enabled. - Profiling parameters can now be set via new flags. This allows setting the profile parameters for the pprof endpoint, as well as when saving profiles to files on exit. The following flags have been removed: - `--profile-dir`: superseded by the `--diagnostics-output-directory` flag - `--profile-blocking`: the `--profile-store-on-exit` flag enables blocking profiling. Use `--profile-blocking-rate=0` to explicitly disable it. - `--profile-memory`: the `--profile-store-on-exit` flag enables memory profiling. Use `--profile-memory-rate=0` to explicitly disable it. - `--profile-mutex`: the `--profile-store-on-exit` flag enables mutex profiling. Use `--profile-mutex-fraction=0` to explicitly disable it. Add CLI test for profile flags.
324 lines
10 KiB
Go
324 lines
10 KiB
Go
package cli
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"net/http/pprof"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/alecthomas/kingpin/v2"
|
|
"github.com/gorilla/mux"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/push"
|
|
"github.com/prometheus/common/expfmt"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
|
"go.opentelemetry.io/otel/sdk/resource"
|
|
"go.opentelemetry.io/otel/sdk/trace"
|
|
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
|
|
oteltrace "go.opentelemetry.io/otel/trace"
|
|
|
|
"github.com/kopia/kopia/internal/clock"
|
|
"github.com/kopia/kopia/internal/gather"
|
|
"github.com/kopia/kopia/repo"
|
|
)
|
|
|
|
// DirMode is the directory mode for output directories.
|
|
const DirMode = 0o700
|
|
|
|
//nolint:gochecknoglobals
|
|
var metricsPushFormats = map[string]expfmt.Format{
|
|
"text": expfmt.NewFormat(expfmt.TypeTextPlain),
|
|
"proto-text": expfmt.NewFormat(expfmt.TypeProtoText),
|
|
"proto-delim": expfmt.NewFormat(expfmt.TypeProtoDelim),
|
|
"proto-compact": expfmt.NewFormat(expfmt.TypeProtoCompact),
|
|
"open-metrics": expfmt.NewFormat(expfmt.TypeOpenMetrics),
|
|
"open-metrics-0.0.1": "application/openmetrics-text; version=0.0.1; charset=utf-8",
|
|
}
|
|
|
|
type observabilityFlags struct {
|
|
outputDirectory string
|
|
|
|
dumpAllocatorStats bool
|
|
enablePProfEndpoint bool
|
|
metricsListenAddr string
|
|
metricsPushAddr string
|
|
metricsJob string
|
|
metricsPushInterval time.Duration
|
|
metricsGroupings []string
|
|
metricsPushUsername string
|
|
metricsPushPassword string
|
|
metricsPushFormat string
|
|
otlpTrace bool
|
|
saveMetrics bool
|
|
pf profileFlags
|
|
|
|
outputSubdirectoryName string
|
|
|
|
stopPusher chan struct{}
|
|
pusherWG sync.WaitGroup
|
|
|
|
traceProvider *trace.TracerProvider
|
|
}
|
|
|
|
func (c *observabilityFlags) setup(svc appServices, app *kingpin.Application) {
|
|
app.Flag("dump-allocator-stats", "Dump allocator stats at the end of execution.").Hidden().Envar(svc.EnvName("KOPIA_DUMP_ALLOCATOR_STATS")).BoolVar(&c.dumpAllocatorStats)
|
|
app.Flag("metrics-listen-addr", "Expose Prometheus metrics on a given host:port").Hidden().StringVar(&c.metricsListenAddr)
|
|
app.Flag("enable-pprof", "Expose pprof handlers").Hidden().BoolVar(&c.enablePProfEndpoint)
|
|
|
|
// push gateway parameters
|
|
app.Flag("metrics-push-addr", "Address of push gateway").Envar(svc.EnvName("KOPIA_METRICS_PUSH_ADDR")).Hidden().StringVar(&c.metricsPushAddr)
|
|
app.Flag("metrics-push-interval", "Frequency of metrics push").Envar(svc.EnvName("KOPIA_METRICS_PUSH_INTERVAL")).Hidden().Default("5s").DurationVar(&c.metricsPushInterval)
|
|
app.Flag("metrics-push-job", "Job ID for to push gateway").Envar(svc.EnvName("KOPIA_METRICS_JOB")).Hidden().Default("kopia").StringVar(&c.metricsJob)
|
|
app.Flag("metrics-push-grouping", "Grouping for push gateway").Envar(svc.EnvName("KOPIA_METRICS_PUSH_GROUPING")).Hidden().StringsVar(&c.metricsGroupings)
|
|
app.Flag("metrics-push-username", "Username for push gateway").Envar(svc.EnvName("KOPIA_METRICS_PUSH_USERNAME")).Hidden().StringVar(&c.metricsPushUsername)
|
|
app.Flag("metrics-push-password", "Password for push gateway").Envar(svc.EnvName("KOPIA_METRICS_PUSH_PASSWORD")).Hidden().StringVar(&c.metricsPushPassword)
|
|
|
|
// tracing (OTLP) parameters
|
|
app.Flag("otlp-trace", "Send OpenTelemetry traces to OTLP collector using gRPC").Hidden().Envar(svc.EnvName("KOPIA_ENABLE_OTLP_TRACE")).BoolVar(&c.otlpTrace)
|
|
|
|
var formats []string
|
|
|
|
for k := range metricsPushFormats {
|
|
formats = append(formats, k)
|
|
}
|
|
|
|
sort.Strings(formats)
|
|
|
|
app.Flag("metrics-push-format", "Format to use for push gateway").Envar(svc.EnvName("KOPIA_METRICS_FORMAT")).Hidden().EnumVar(&c.metricsPushFormat, formats...)
|
|
|
|
//nolint:lll
|
|
app.Flag("diagnostics-output-directory", "Directory where the diagnostics output should be stored saved when kopia exits. Diagnostics data includes among others: metrics, traces, profiles. The output files are stored in a sub-directory for each kopia (process) execution").Hidden().Default(filepath.Join(os.TempDir(), "kopia-diagnostics")).StringVar(&c.outputDirectory)
|
|
|
|
app.Flag("metrics-store-on-exit", "Writes metrics to a file in a sub-directory of the directory specified with the --diagnostics-output-directory").Hidden().BoolVar(&c.saveMetrics)
|
|
|
|
c.pf.setup(app)
|
|
|
|
app.PreAction(c.initialize)
|
|
}
|
|
|
|
func (c *observabilityFlags) initialize(ctx *kingpin.ParseContext) error {
|
|
// write to a separate file per command and process execution to avoid
|
|
// conflicts with previously created files
|
|
command := "unknown"
|
|
if cmd := ctx.SelectedCommand; cmd != nil {
|
|
command = strings.ReplaceAll(cmd.FullCommand(), " ", "-")
|
|
}
|
|
|
|
c.outputSubdirectoryName = clock.Now().Format("20060102-150405-") + command
|
|
|
|
if (c.saveMetrics || c.pf.saveProfiles || c.pf.profileCPU) && c.outputDirectory == "" {
|
|
return errors.New("writing diagnostics output requires a non-empty directory name (specified with the '--diagnostics-output-directory' flag)")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// spanName specifies the name of the span at the start of a trace. A tracer is
|
|
// started only when spanName is not empty.
|
|
func (c *observabilityFlags) run(ctx context.Context, spanName string, f func(context.Context) error) error {
|
|
if err := c.start(ctx); err != nil {
|
|
return errors.Wrap(err, "unable to start observability facilities")
|
|
}
|
|
|
|
defer c.stop(ctx)
|
|
|
|
if err := c.pf.start(ctx, filepath.Join(c.outputDirectory, c.outputSubdirectoryName)); err != nil {
|
|
return errors.Wrap(err, "failed to start profiling")
|
|
}
|
|
|
|
defer c.pf.stop(ctx)
|
|
|
|
if spanName != "" {
|
|
tctx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindClient))
|
|
ctx = tctx
|
|
|
|
defer span.End()
|
|
}
|
|
|
|
return f(ctx)
|
|
}
|
|
|
|
func (c *observabilityFlags) start(ctx context.Context) error {
|
|
c.maybeStartListener(ctx)
|
|
|
|
if err := c.maybeStartMetricsPusher(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
if c.saveMetrics {
|
|
// ensure the metrics output dir can be created
|
|
if _, err := mkSubdirectories(c.outputDirectory, c.outputSubdirectoryName); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return c.maybeStartTraceExporter(ctx)
|
|
}
|
|
|
|
func mkSubdirectories(directoryNames ...string) (dirName string, err error) {
|
|
dirName = filepath.Join(directoryNames...)
|
|
|
|
if err := os.MkdirAll(dirName, DirMode); err != nil {
|
|
return "", errors.Wrapf(err, "could not create '%q' subdirectory to save diagnostics output", dirName)
|
|
}
|
|
|
|
return dirName, nil
|
|
}
|
|
|
|
// Starts observability listener when a listener address is specified.
|
|
func (c *observabilityFlags) maybeStartListener(ctx context.Context) {
|
|
if c.metricsListenAddr == "" {
|
|
return
|
|
}
|
|
|
|
m := mux.NewRouter()
|
|
initPrometheus(m)
|
|
|
|
if c.enablePProfEndpoint {
|
|
m.HandleFunc("/debug/pprof/", pprof.Index)
|
|
m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
|
|
m.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
|
m.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
|
m.HandleFunc("/debug/pprof/trace", pprof.Trace)
|
|
m.HandleFunc("/debug/pprof/{cmd}", pprof.Index) // special handling for Gorilla mux, see https://stackoverflow.com/questions/30560859/cant-use-go-tool-pprof-with-an-existing-server/71032595#71032595
|
|
}
|
|
|
|
log(ctx).Infof("starting prometheus metrics on %v", c.metricsListenAddr)
|
|
|
|
go http.ListenAndServe(c.metricsListenAddr, m) //nolint:errcheck,gosec
|
|
}
|
|
|
|
func (c *observabilityFlags) maybeStartMetricsPusher(ctx context.Context) error {
|
|
if c.metricsPushAddr == "" {
|
|
return nil
|
|
}
|
|
|
|
c.stopPusher = make(chan struct{})
|
|
c.pusherWG.Add(1)
|
|
|
|
pusher := push.New(c.metricsPushAddr, c.metricsJob)
|
|
|
|
pusher.Gatherer(prometheus.DefaultGatherer)
|
|
|
|
for _, g := range c.metricsGroupings {
|
|
const nParts = 2
|
|
|
|
parts := strings.SplitN(g, ":", nParts)
|
|
if len(parts) != nParts {
|
|
return errors.New("grouping must be name:value")
|
|
}
|
|
|
|
name := parts[0]
|
|
val := parts[1]
|
|
|
|
pusher.Grouping(name, val)
|
|
}
|
|
|
|
if c.metricsPushUsername != "" {
|
|
pusher.BasicAuth(c.metricsPushUsername, c.metricsPushPassword)
|
|
}
|
|
|
|
if c.metricsPushFormat != "" {
|
|
pusher.Format(metricsPushFormats[c.metricsPushFormat])
|
|
}
|
|
|
|
log(ctx).Infof("starting prometheus pusher on %v every %v", c.metricsPushAddr, c.metricsPushInterval)
|
|
c.pushOnce(ctx, "initial", pusher)
|
|
|
|
go c.pushPeriodically(ctx, pusher)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *observabilityFlags) maybeStartTraceExporter(ctx context.Context) error {
|
|
if !c.otlpTrace {
|
|
return nil
|
|
}
|
|
|
|
// Create the OTLP exporter.
|
|
se := otlptracegrpc.NewUnstarted()
|
|
|
|
r := resource.NewWithAttributes(
|
|
semconv.SchemaURL,
|
|
semconv.ServiceNameKey.String("kopia"),
|
|
semconv.ServiceVersionKey.String(repo.BuildVersion),
|
|
)
|
|
|
|
tp := trace.NewTracerProvider(
|
|
trace.WithBatcher(se),
|
|
trace.WithResource(r),
|
|
)
|
|
|
|
if err := se.Start(ctx); err != nil {
|
|
return errors.Wrap(err, "unable to start OTLP exporter")
|
|
}
|
|
|
|
otel.SetTracerProvider(tp)
|
|
|
|
c.traceProvider = tp
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *observabilityFlags) stop(ctx context.Context) {
|
|
if c.dumpAllocatorStats {
|
|
gather.DumpStats(ctx)
|
|
}
|
|
|
|
if c.stopPusher != nil {
|
|
close(c.stopPusher)
|
|
|
|
c.pusherWG.Wait()
|
|
}
|
|
|
|
if c.traceProvider != nil {
|
|
if err := c.traceProvider.Shutdown(ctx); err != nil {
|
|
log(ctx).Warnf("unable to shutdown trace provicer: %v", err)
|
|
}
|
|
}
|
|
|
|
if c.saveMetrics {
|
|
if metricsDir, err := mkSubdirectories(c.outputDirectory, c.outputSubdirectoryName); err != nil {
|
|
log(ctx).Warnf("unable to create metrics output directory '%s': %v", metricsDir, err)
|
|
} else {
|
|
if err := prometheus.WriteToTextfile(filepath.Join(metricsDir, "kopia-metrics.prom"), prometheus.DefaultGatherer); err != nil {
|
|
log(ctx).Warnf("unable to write metrics to file: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *observabilityFlags) pushPeriodically(ctx context.Context, p *push.Pusher) {
|
|
defer c.pusherWG.Done()
|
|
|
|
ticker := time.NewTicker(c.metricsPushInterval)
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
c.pushOnce(ctx, "periodic", p)
|
|
|
|
case <-c.stopPusher:
|
|
ticker.Stop()
|
|
c.pushOnce(ctx, "final", p)
|
|
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *observabilityFlags) pushOnce(ctx context.Context, kind string, p *push.Pusher) {
|
|
log(ctx).Debugw("pushing prometheus metrics", "kind", kind)
|
|
|
|
if err := p.Push(); err != nil {
|
|
log(ctx).Debugw("error pushing prometheus metrics", "kind", kind, "err", err)
|
|
}
|
|
}
|