From 9f33f70eaa8687d22aa39e86ccaf0e292f2147c8 Mon Sep 17 00:00:00 2001 From: Julio Lopez <1953782+julio-lopez@users.noreply.github.com> Date: Sun, 6 Aug 2023 11:44:31 -0700 Subject: [PATCH] refactor(cli): break down observability.startMetrics() (#3196) Motivation: reduce the function complexity for the linter, so additional functionality can be added to `startMetrics()` No functional changes. Changes and transformations: - Break down observability.startMetrics() into: maybeStartListener(). maybeStartMetricsPusher() and maybeStartTraceExporter(). - Inline observabilityFlags.getExporter in maybeStartTraceExporter. - Only create resource when exporter is non-nil. - Simplify maybeStartListener: reduce indentation by returning early. - Simplify maybeStartMetricsPusher: reduce indentation by returning early. --- cli/observability_flags.go | 160 ++++++++++++++++++++----------------- 1 file changed, 86 insertions(+), 74 deletions(-) diff --git a/cli/observability_flags.go b/cli/observability_flags.go index 404ad32d6..5149ce0a5 100644 --- a/cli/observability_flags.go +++ b/cli/observability_flags.go @@ -79,72 +79,98 @@ func (c *observabilityFlags) setup(svc appServices, app *kingpin.Application) { } func (c *observabilityFlags) startMetrics(ctx context.Context) error { - if c.metricsListenAddr != "" { - m := mux.NewRouter() - initPrometheus(m) + c.maybeStartListener(ctx) - if c.enablePProf { - m.HandleFunc("/debug/pprof/", pprof.Index) - m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - m.HandleFunc("/debug/pprof/profile", pprof.Profile) - m.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - m.HandleFunc("/debug/pprof/trace", pprof.Trace) - m.HandleFunc("/debug/pprof/{cmd}", pprof.Index) // special handling for Gorilla mux, see https://stackoverflow.com/questions/30560859/cant-use-go-tool-pprof-with-an-existing-server/71032595#71032595 - } - - log(ctx).Infof("starting prometheus metrics on %v", c.metricsListenAddr) - - go http.ListenAndServe(c.metricsListenAddr, m) //nolint:errcheck,gosec - } - - if c.metricsPushAddr != "" { - c.stopPusher = make(chan struct{}) - c.pusherWG.Add(1) - - pusher := push.New(c.metricsPushAddr, c.metricsJob) - - pusher.Gatherer(prometheus.DefaultGatherer) - - for _, g := range c.metricsGroupings { - const nParts = 2 - - parts := strings.SplitN(g, ":", nParts) - if len(parts) != nParts { - return errors.Errorf("grouping must be name:value") - } - - name := parts[0] - val := parts[1] - - pusher.Grouping(name, val) - } - - if c.metricsPushUsername != "" { - pusher.BasicAuth(c.metricsPushUsername, c.metricsPushPassword) - } - - if c.metricsPushFormat != "" { - pusher.Format(metricsPushFormats[c.metricsPushFormat]) - } - - log(ctx).Infof("starting prometheus pusher on %v every %v", c.metricsPushAddr, c.metricsPushInterval) - c.pushOnce(ctx, "initial", pusher) - - go c.pushPeriodically(ctx, pusher) - } - - se, err := c.getSpanExporter() - if err != nil { + if err := c.maybeStartMetricsPusher(ctx); err != nil { return err } - r := resource.NewWithAttributes( - semconv.SchemaURL, - semconv.ServiceNameKey.String("kopia"), - semconv.ServiceVersionKey.String(repo.BuildVersion), - ) + return c.maybeStartTraceExporter() +} + +// Starts observability listener when a listener address is specified. +func (c *observabilityFlags) maybeStartListener(ctx context.Context) { + if c.metricsListenAddr == "" { + return + } + + m := mux.NewRouter() + initPrometheus(m) + + if c.enablePProf { + m.HandleFunc("/debug/pprof/", pprof.Index) + m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + m.HandleFunc("/debug/pprof/profile", pprof.Profile) + m.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + m.HandleFunc("/debug/pprof/trace", pprof.Trace) + m.HandleFunc("/debug/pprof/{cmd}", pprof.Index) // special handling for Gorilla mux, see https://stackoverflow.com/questions/30560859/cant-use-go-tool-pprof-with-an-existing-server/71032595#71032595 + } + + log(ctx).Infof("starting prometheus metrics on %v", c.metricsListenAddr) + + go http.ListenAndServe(c.metricsListenAddr, m) //nolint:errcheck,gosec +} + +func (c *observabilityFlags) maybeStartMetricsPusher(ctx context.Context) error { + if c.metricsPushAddr == "" { + return nil + } + + c.stopPusher = make(chan struct{}) + c.pusherWG.Add(1) + + pusher := push.New(c.metricsPushAddr, c.metricsJob) + + pusher.Gatherer(prometheus.DefaultGatherer) + + for _, g := range c.metricsGroupings { + const nParts = 2 + + parts := strings.SplitN(g, ":", nParts) + if len(parts) != nParts { + return errors.Errorf("grouping must be name:value") + } + + name := parts[0] + val := parts[1] + + pusher.Grouping(name, val) + } + + if c.metricsPushUsername != "" { + pusher.BasicAuth(c.metricsPushUsername, c.metricsPushPassword) + } + + if c.metricsPushFormat != "" { + pusher.Format(metricsPushFormats[c.metricsPushFormat]) + } + + log(ctx).Infof("starting prometheus pusher on %v every %v", c.metricsPushAddr, c.metricsPushInterval) + c.pushOnce(ctx, "initial", pusher) + + go c.pushPeriodically(ctx, pusher) + + return nil +} + +func (c *observabilityFlags) maybeStartTraceExporter() error { + if !c.enableJaeger { + return nil + } + + // Create the Jaeger exporter + se, err := jaeger.New(jaeger.WithCollectorEndpoint()) + if err != nil { + return errors.Wrap(err, "unable to create Jaeger exporter") + } if se != nil { + r := resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String("kopia"), + semconv.ServiceVersionKey.String(repo.BuildVersion), + ) + tp := trace.NewTracerProvider( trace.WithBatcher(se), trace.WithResource(r), @@ -158,20 +184,6 @@ func (c *observabilityFlags) startMetrics(ctx context.Context) error { return nil } -func (c *observabilityFlags) getSpanExporter() (trace.SpanExporter, error) { - if c.enableJaeger { - // Create the Jaeger exporter - exp, err := jaeger.New(jaeger.WithCollectorEndpoint()) - if err != nil { - return nil, errors.Wrap(err, "unable to create Jaeger exporter") - } - - return exp, nil - } - - return nil, nil -} - func (c *observabilityFlags) stopMetrics(ctx context.Context) { if c.stopPusher != nil { close(c.stopPusher)