chore(ursrv): separate calculation from serving metrics

This commit is contained in:
Jakob Borg
2025-06-04 09:16:50 +02:00
parent 01257e838b
commit 655ef63c74
3 changed files with 91 additions and 27 deletions

View File

@@ -32,6 +32,21 @@ var (
Subsystem: "ursrv_v2",
Name: "collect_seconds_last",
})
metricsRecalcsTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "ursrv_v2",
Name: "recalcs_total",
})
metricsRecalcSecondsTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "ursrv_v2",
Name: "recalc_seconds_total",
})
metricsRecalcSecondsLast = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "syncthing",
Subsystem: "ursrv_v2",
Name: "recalc_seconds_last",
})
metricsWriteSecondsLast = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "syncthing",
Subsystem: "ursrv_v2",

View File

@@ -7,6 +7,8 @@
package serve
import (
"context"
"log/slog"
"reflect"
"slices"
"strconv"
@@ -28,7 +30,7 @@ type metricsSet struct {
gaugeVecLabels map[string][]string
summaries map[string]*metricSummary
collectMut sync.Mutex
collectMut sync.RWMutex
collectCutoff time.Duration
}
@@ -108,6 +110,60 @@ func nameConstLabels(name string) (string, prometheus.Labels) {
return name, m
}
func (s *metricsSet) Serve(ctx context.Context) error {
s.recalc()
const recalcInterval = 5 * time.Minute
next := time.Until(time.Now().Truncate(recalcInterval).Add(recalcInterval))
recalcTimer := time.NewTimer(next)
defer recalcTimer.Stop()
for {
select {
case <-recalcTimer.C:
s.recalc()
next := time.Until(time.Now().Truncate(recalcInterval).Add(recalcInterval))
recalcTimer.Reset(next)
case <-ctx.Done():
return ctx.Err()
}
}
}
func (s *metricsSet) recalc() {
s.collectMut.Lock()
defer s.collectMut.Unlock()
t0 := time.Now()
defer func() {
dur := time.Since(t0)
slog.Info("Metrics recalculated", "d", dur.String())
metricsRecalcSecondsLast.Set(dur.Seconds())
metricsRecalcSecondsTotal.Add(dur.Seconds())
metricsRecalcsTotal.Inc()
}()
for _, g := range s.gauges {
g.Set(0)
}
for _, g := range s.gaugeVecs {
g.Reset()
}
for _, g := range s.summaries {
g.Reset()
}
cutoff := time.Now().Add(s.collectCutoff)
s.srv.reports.Range(func(key string, r *contract.Report) bool {
if s.collectCutoff < 0 && r.Received.Before(cutoff) {
s.srv.reports.Delete(key)
return true
}
s.addReport(r)
return true
})
}
func (s *metricsSet) addReport(r *contract.Report) {
gaugeVecs := make(map[string][]string)
s.addReportStruct(reflect.ValueOf(r).Elem(), gaugeVecs)
@@ -198,8 +254,8 @@ func (s *metricsSet) Describe(c chan<- *prometheus.Desc) {
}
func (s *metricsSet) Collect(c chan<- prometheus.Metric) {
s.collectMut.Lock()
defer s.collectMut.Unlock()
s.collectMut.RLock()
defer s.collectMut.RUnlock()
t0 := time.Now()
defer func() {
@@ -209,26 +265,6 @@ func (s *metricsSet) Collect(c chan<- prometheus.Metric) {
metricsCollectsTotal.Inc()
}()
for _, g := range s.gauges {
g.Set(0)
}
for _, g := range s.gaugeVecs {
g.Reset()
}
for _, g := range s.summaries {
g.Reset()
}
cutoff := time.Now().Add(s.collectCutoff)
s.srv.reports.Range(func(key string, r *contract.Report) bool {
if s.collectCutoff < 0 && r.Received.Before(cutoff) {
s.srv.reports.Delete(key)
return true
}
s.addReport(r)
return true
})
for _, g := range s.gauges {
c <- g
}

View File

@@ -33,6 +33,7 @@ import (
"github.com/syncthing/syncthing/lib/build"
"github.com/syncthing/syncthing/lib/geoip"
"github.com/syncthing/syncthing/lib/ur/contract"
"github.com/thejerf/suture/v4"
)
type CLI struct {
@@ -187,7 +188,12 @@ func (cli *CLI) Run() error {
// New external metrics endpoint accepts reports from clients and serves
// aggregated usage reporting metrics.
main := suture.NewSimple("main")
main.ServeBackground(context.Background())
ms := newMetricsSet(srv)
main.Add(ms)
reg := prometheus.NewRegistry()
reg.MustRegister(ms)
@@ -198,7 +204,7 @@ func (cli *CLI) Run() error {
metricsSrv := http.Server{
ReadTimeout: 5 * time.Second,
WriteTimeout: 15 * time.Second,
WriteTimeout: 60 * time.Second,
Handler: mux,
}
@@ -227,6 +233,11 @@ func (cli *CLI) downloadDumpFile(blobs blob.Store) error {
}
func (cli *CLI) saveDumpFile(srv *server, blobs blob.Store) error {
t0 := time.Now()
defer func() {
metricsWriteSecondsLast.Set(float64(time.Since(t0)))
}()
fd, err := os.Create(cli.DumpFile + ".tmp")
if err != nil {
return fmt.Errorf("creating dump file: %w", err)
@@ -245,9 +256,10 @@ func (cli *CLI) saveDumpFile(srv *server, blobs blob.Store) error {
if err := os.Rename(cli.DumpFile+".tmp", cli.DumpFile); err != nil {
return fmt.Errorf("renaming dump file: %w", err)
}
slog.Info("Dump file saved")
slog.Info("Dump file saved", "d", time.Since(t0).String())
if blobs != nil {
t1 := time.Now()
key := fmt.Sprintf("reports-%s.jsons.gz", time.Now().UTC().Format("2006-01-02"))
fd, err := os.Open(cli.DumpFile)
if err != nil {
@@ -257,7 +269,7 @@ func (cli *CLI) saveDumpFile(srv *server, blobs blob.Store) error {
return fmt.Errorf("uploading dump file: %w", err)
}
_ = fd.Close()
slog.Info("Dump file uploaded")
slog.Info("Dump file uploaded", "d", time.Since(t1).String())
}
return nil
@@ -388,6 +400,7 @@ func (s *server) save(w io.Writer) error {
}
func (s *server) load(r io.Reader) {
t0 := time.Now()
dec := json.NewDecoder(r)
s.reports.Clear()
for {
@@ -400,7 +413,7 @@ func (s *server) load(r io.Reader) {
}
s.addReport(&rep)
}
slog.Info("Loaded reports", "count", s.reports.Size())
slog.Info("Loaded reports", "count", s.reports.Size(), "d", time.Since(t0).String())
}
var (