diff --git a/cmd/infra/ursrv/serve/metrics.go b/cmd/infra/ursrv/serve/metrics.go index 4fa8cb455..22734010b 100644 --- a/cmd/infra/ursrv/serve/metrics.go +++ b/cmd/infra/ursrv/serve/metrics.go @@ -32,6 +32,21 @@ var ( Subsystem: "ursrv_v2", Name: "collect_seconds_last", }) + metricsRecalcsTotal = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "syncthing", + Subsystem: "ursrv_v2", + Name: "recalcs_total", + }) + metricsRecalcSecondsTotal = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "syncthing", + Subsystem: "ursrv_v2", + Name: "recalc_seconds_total", + }) + metricsRecalcSecondsLast = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "syncthing", + Subsystem: "ursrv_v2", + Name: "recalc_seconds_last", + }) metricsWriteSecondsLast = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "syncthing", Subsystem: "ursrv_v2", diff --git a/cmd/infra/ursrv/serve/prometheus.go b/cmd/infra/ursrv/serve/prometheus.go index 23014557f..203af47f7 100644 --- a/cmd/infra/ursrv/serve/prometheus.go +++ b/cmd/infra/ursrv/serve/prometheus.go @@ -7,6 +7,8 @@ package serve import ( + "context" + "log/slog" "reflect" "slices" "strconv" @@ -28,7 +30,7 @@ type metricsSet struct { gaugeVecLabels map[string][]string summaries map[string]*metricSummary - collectMut sync.Mutex + collectMut sync.RWMutex collectCutoff time.Duration } @@ -108,6 +110,60 @@ func nameConstLabels(name string) (string, prometheus.Labels) { return name, m } +func (s *metricsSet) Serve(ctx context.Context) error { + s.recalc() + + const recalcInterval = 5 * time.Minute + next := time.Until(time.Now().Truncate(recalcInterval).Add(recalcInterval)) + recalcTimer := time.NewTimer(next) + defer recalcTimer.Stop() + + for { + select { + case <-recalcTimer.C: + s.recalc() + next := time.Until(time.Now().Truncate(recalcInterval).Add(recalcInterval)) + recalcTimer.Reset(next) + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (s *metricsSet) recalc() { + s.collectMut.Lock() + defer s.collectMut.Unlock() + + t0 := time.Now() + defer func() { + dur := time.Since(t0) + slog.Info("Metrics recalculated", "d", dur.String()) + metricsRecalcSecondsLast.Set(dur.Seconds()) + metricsRecalcSecondsTotal.Add(dur.Seconds()) + metricsRecalcsTotal.Inc() + }() + + for _, g := range s.gauges { + g.Set(0) + } + for _, g := range s.gaugeVecs { + g.Reset() + } + for _, g := range s.summaries { + g.Reset() + } + + cutoff := time.Now().Add(s.collectCutoff) + s.srv.reports.Range(func(key string, r *contract.Report) bool { + if s.collectCutoff < 0 && r.Received.Before(cutoff) { + s.srv.reports.Delete(key) + return true + } + s.addReport(r) + return true + }) +} + func (s *metricsSet) addReport(r *contract.Report) { gaugeVecs := make(map[string][]string) s.addReportStruct(reflect.ValueOf(r).Elem(), gaugeVecs) @@ -198,8 +254,8 @@ func (s *metricsSet) Describe(c chan<- *prometheus.Desc) { } func (s *metricsSet) Collect(c chan<- prometheus.Metric) { - s.collectMut.Lock() - defer s.collectMut.Unlock() + s.collectMut.RLock() + defer s.collectMut.RUnlock() t0 := time.Now() defer func() { @@ -209,26 +265,6 @@ func (s *metricsSet) Collect(c chan<- prometheus.Metric) { metricsCollectsTotal.Inc() }() - for _, g := range s.gauges { - g.Set(0) - } - for _, g := range s.gaugeVecs { - g.Reset() - } - for _, g := range s.summaries { - g.Reset() - } - - cutoff := time.Now().Add(s.collectCutoff) - s.srv.reports.Range(func(key string, r *contract.Report) bool { - if s.collectCutoff < 0 && r.Received.Before(cutoff) { - s.srv.reports.Delete(key) - return true - } - s.addReport(r) - return true - }) - for _, g := range s.gauges { c <- g } diff --git a/cmd/infra/ursrv/serve/serve.go b/cmd/infra/ursrv/serve/serve.go index 3626ef0f2..41c606f28 100644 --- a/cmd/infra/ursrv/serve/serve.go +++ b/cmd/infra/ursrv/serve/serve.go @@ -33,6 +33,7 @@ import ( "github.com/syncthing/syncthing/lib/build" "github.com/syncthing/syncthing/lib/geoip" "github.com/syncthing/syncthing/lib/ur/contract" + "github.com/thejerf/suture/v4" ) type CLI struct { @@ -187,7 +188,12 @@ func (cli *CLI) Run() error { // New external metrics endpoint accepts reports from clients and serves // aggregated usage reporting metrics. + main := suture.NewSimple("main") + main.ServeBackground(context.Background()) + ms := newMetricsSet(srv) + main.Add(ms) + reg := prometheus.NewRegistry() reg.MustRegister(ms) @@ -198,7 +204,7 @@ func (cli *CLI) Run() error { metricsSrv := http.Server{ ReadTimeout: 5 * time.Second, - WriteTimeout: 15 * time.Second, + WriteTimeout: 60 * time.Second, Handler: mux, } @@ -227,6 +233,11 @@ func (cli *CLI) downloadDumpFile(blobs blob.Store) error { } func (cli *CLI) saveDumpFile(srv *server, blobs blob.Store) error { + t0 := time.Now() + defer func() { + metricsWriteSecondsLast.Set(float64(time.Since(t0))) + }() + fd, err := os.Create(cli.DumpFile + ".tmp") if err != nil { return fmt.Errorf("creating dump file: %w", err) @@ -245,9 +256,10 @@ func (cli *CLI) saveDumpFile(srv *server, blobs blob.Store) error { if err := os.Rename(cli.DumpFile+".tmp", cli.DumpFile); err != nil { return fmt.Errorf("renaming dump file: %w", err) } - slog.Info("Dump file saved") + slog.Info("Dump file saved", "d", time.Since(t0).String()) if blobs != nil { + t1 := time.Now() key := fmt.Sprintf("reports-%s.jsons.gz", time.Now().UTC().Format("2006-01-02")) fd, err := os.Open(cli.DumpFile) if err != nil { @@ -257,7 +269,7 @@ func (cli *CLI) saveDumpFile(srv *server, blobs blob.Store) error { return fmt.Errorf("uploading dump file: %w", err) } _ = fd.Close() - slog.Info("Dump file uploaded") + slog.Info("Dump file uploaded", "d", time.Since(t1).String()) } return nil @@ -388,6 +400,7 @@ func (s *server) save(w io.Writer) error { } func (s *server) load(r io.Reader) { + t0 := time.Now() dec := json.NewDecoder(r) s.reports.Clear() for { @@ -400,7 +413,7 @@ func (s *server) load(r io.Reader) { } s.addReport(&rep) } - slog.Info("Loaded reports", "count", s.reports.Size()) + slog.Info("Loaded reports", "count", s.reports.Size(), "d", time.Since(t0).String()) } var (