refactor(cli): break down observability.startMetrics() (#3196)

Motivation: reduce the function complexity for the linter, so additional
functionality can be added to `startMetrics()`

No functional changes.

Changes and transformations:
- Break down observability.startMetrics() into: maybeStartListener().
  maybeStartMetricsPusher() and maybeStartTraceExporter().
- Inline observabilityFlags.getExporter in maybeStartTraceExporter.
- Only create resource when exporter is non-nil.
- Simplify maybeStartListener: reduce indentation by returning early.
- Simplify maybeStartMetricsPusher: reduce indentation by returning early.
This commit is contained in:
Julio Lopez
2023-08-06 11:44:31 -07:00
committed by GitHub
parent 0d2a7c83a0
commit 9f33f70eaa

View File

@@ -79,72 +79,98 @@ func (c *observabilityFlags) setup(svc appServices, app *kingpin.Application) {
}
func (c *observabilityFlags) startMetrics(ctx context.Context) error {
if c.metricsListenAddr != "" {
m := mux.NewRouter()
initPrometheus(m)
c.maybeStartListener(ctx)
if c.enablePProf {
m.HandleFunc("/debug/pprof/", pprof.Index)
m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
m.HandleFunc("/debug/pprof/profile", pprof.Profile)
m.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
m.HandleFunc("/debug/pprof/trace", pprof.Trace)
m.HandleFunc("/debug/pprof/{cmd}", pprof.Index) // special handling for Gorilla mux, see https://stackoverflow.com/questions/30560859/cant-use-go-tool-pprof-with-an-existing-server/71032595#71032595
}
log(ctx).Infof("starting prometheus metrics on %v", c.metricsListenAddr)
go http.ListenAndServe(c.metricsListenAddr, m) //nolint:errcheck,gosec
}
if c.metricsPushAddr != "" {
c.stopPusher = make(chan struct{})
c.pusherWG.Add(1)
pusher := push.New(c.metricsPushAddr, c.metricsJob)
pusher.Gatherer(prometheus.DefaultGatherer)
for _, g := range c.metricsGroupings {
const nParts = 2
parts := strings.SplitN(g, ":", nParts)
if len(parts) != nParts {
return errors.Errorf("grouping must be name:value")
}
name := parts[0]
val := parts[1]
pusher.Grouping(name, val)
}
if c.metricsPushUsername != "" {
pusher.BasicAuth(c.metricsPushUsername, c.metricsPushPassword)
}
if c.metricsPushFormat != "" {
pusher.Format(metricsPushFormats[c.metricsPushFormat])
}
log(ctx).Infof("starting prometheus pusher on %v every %v", c.metricsPushAddr, c.metricsPushInterval)
c.pushOnce(ctx, "initial", pusher)
go c.pushPeriodically(ctx, pusher)
}
se, err := c.getSpanExporter()
if err != nil {
if err := c.maybeStartMetricsPusher(ctx); err != nil {
return err
}
r := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("kopia"),
semconv.ServiceVersionKey.String(repo.BuildVersion),
)
return c.maybeStartTraceExporter()
}
// Starts observability listener when a listener address is specified.
func (c *observabilityFlags) maybeStartListener(ctx context.Context) {
if c.metricsListenAddr == "" {
return
}
m := mux.NewRouter()
initPrometheus(m)
if c.enablePProf {
m.HandleFunc("/debug/pprof/", pprof.Index)
m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
m.HandleFunc("/debug/pprof/profile", pprof.Profile)
m.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
m.HandleFunc("/debug/pprof/trace", pprof.Trace)
m.HandleFunc("/debug/pprof/{cmd}", pprof.Index) // special handling for Gorilla mux, see https://stackoverflow.com/questions/30560859/cant-use-go-tool-pprof-with-an-existing-server/71032595#71032595
}
log(ctx).Infof("starting prometheus metrics on %v", c.metricsListenAddr)
go http.ListenAndServe(c.metricsListenAddr, m) //nolint:errcheck,gosec
}
func (c *observabilityFlags) maybeStartMetricsPusher(ctx context.Context) error {
if c.metricsPushAddr == "" {
return nil
}
c.stopPusher = make(chan struct{})
c.pusherWG.Add(1)
pusher := push.New(c.metricsPushAddr, c.metricsJob)
pusher.Gatherer(prometheus.DefaultGatherer)
for _, g := range c.metricsGroupings {
const nParts = 2
parts := strings.SplitN(g, ":", nParts)
if len(parts) != nParts {
return errors.Errorf("grouping must be name:value")
}
name := parts[0]
val := parts[1]
pusher.Grouping(name, val)
}
if c.metricsPushUsername != "" {
pusher.BasicAuth(c.metricsPushUsername, c.metricsPushPassword)
}
if c.metricsPushFormat != "" {
pusher.Format(metricsPushFormats[c.metricsPushFormat])
}
log(ctx).Infof("starting prometheus pusher on %v every %v", c.metricsPushAddr, c.metricsPushInterval)
c.pushOnce(ctx, "initial", pusher)
go c.pushPeriodically(ctx, pusher)
return nil
}
func (c *observabilityFlags) maybeStartTraceExporter() error {
if !c.enableJaeger {
return nil
}
// Create the Jaeger exporter
se, err := jaeger.New(jaeger.WithCollectorEndpoint())
if err != nil {
return errors.Wrap(err, "unable to create Jaeger exporter")
}
if se != nil {
r := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("kopia"),
semconv.ServiceVersionKey.String(repo.BuildVersion),
)
tp := trace.NewTracerProvider(
trace.WithBatcher(se),
trace.WithResource(r),
@@ -158,20 +184,6 @@ func (c *observabilityFlags) startMetrics(ctx context.Context) error {
return nil
}
func (c *observabilityFlags) getSpanExporter() (trace.SpanExporter, error) {
if c.enableJaeger {
// Create the Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint())
if err != nil {
return nil, errors.Wrap(err, "unable to create Jaeger exporter")
}
return exp, nil
}
return nil, nil
}
func (c *observabilityFlags) stopMetrics(ctx context.Context) {
if c.stopPusher != nil {
close(c.stopPusher)