diff --git a/cmd/stdiscosrv/apisrv.go b/cmd/stdiscosrv/apisrv.go index 36047a233..3a94dd3e9 100644 --- a/cmd/stdiscosrv/apisrv.go +++ b/cmd/stdiscosrv/apisrv.go @@ -67,7 +67,7 @@ type contextKey int const idKey contextKey = iota -func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator, useHTTP, compression bool, desiredNotFoundRate float64) *apiSrv { +func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator, useHTTP, compression bool, desiredUnseenNotFoundRate, desiredSeenNotFoundRate float64) *apiSrv { return &apiSrv{ addr: addr, cert: cert, @@ -78,13 +78,13 @@ func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator, seenTracker: &retryAfterTracker{ name: "seenTracker", bucketStarts: time.Now(), - desiredRate: desiredNotFoundRate / 2, + desiredRate: desiredSeenNotFoundRate, currentDelay: notFoundRetryUnknownMinSeconds, }, notSeenTracker: &retryAfterTracker{ name: "notSeenTracker", bucketStarts: time.Now(), - desiredRate: desiredNotFoundRate / 2, + desiredRate: desiredUnseenNotFoundRate, currentDelay: notFoundRetryUnknownMaxSeconds / 2, }, } @@ -564,5 +564,17 @@ func (t *retryAfterTracker) retryAfterS() int { } t.curCount++ t.mut.Unlock() - return t.currentDelay + rand.Intn(t.currentDelay/4) + + // Skewed normal distribution with the mean at currentDelay and the + // limits (50% and 150%) at 3 standard deviations + nf := rand.NormFloat64() + minD := max(notFoundRetryUnknownMinSeconds, t.currentDelay/2) + maxD := min(notFoundRetryUnknownMaxSeconds, t.currentDelay*3/2) + intv := float64(maxD - t.currentDelay) + if nf < 0 { + intv = float64(t.currentDelay - minD) + } + nf = min(max(nf*intv/3+float64(t.currentDelay), notFoundRetryUnknownMinSeconds), notFoundRetryUnknownMaxSeconds) + + return int(nf) } diff --git a/cmd/stdiscosrv/apisrv_test.go b/cmd/stdiscosrv/apisrv_test.go index 99f788763..54ea74ecd 100644 --- a/cmd/stdiscosrv/apisrv_test.go +++ b/cmd/stdiscosrv/apisrv_test.go @@ -18,6 +18,7 @@ import ( "regexp" "strings" "testing" + "time" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/tlsutil" @@ -106,12 +107,59 @@ func addr(host string, port int) *net.TCPAddr { } } +func TestRetryAfterSHistogram(t *testing.T) { + tracker := &retryAfterTracker{ + name: "test", + bucketStarts: time.Now(), + desiredRate: 100, + currentDelay: 1800, + } + + const n = 1000 + bucketSize := 60 // seconds per histogram bucket + numBuckets := (notFoundRetryUnknownMaxSeconds + bucketSize - 1) / bucketSize + buckets := make([]int, numBuckets) + + for i := 0; i < n; i++ { + v := tracker.retryAfterS() + if v < notFoundRetryUnknownMinSeconds || v > notFoundRetryUnknownMaxSeconds { + t.Fatalf("retryAfterS() = %d, out of range [%d, %d]", v, notFoundRetryUnknownMinSeconds, notFoundRetryUnknownMaxSeconds) + } + b := (v - 1) / bucketSize + if b >= numBuckets { + b = numBuckets - 1 + } + buckets[b]++ + } + + // Print a horizontal histogram + maxCount := 0 + for _, c := range buckets { + if c > maxCount { + maxCount = c + } + } + barWidth := 60 + for i, c := range buckets { + lo := i*bucketSize + 1 + hi := (i + 1) * bucketSize + if hi > notFoundRetryUnknownMaxSeconds { + hi = notFoundRetryUnknownMaxSeconds + } + bar := "" + if maxCount > 0 { + bar = strings.Repeat("#", c*barWidth/maxCount) + } + t.Logf("%4d-%4ds | %-*s %d", lo, hi, barWidth, bar, c) + } +} + func BenchmarkAPIRequests(b *testing.B) { db := newInMemoryStore(b.TempDir(), 0, nil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go db.Serve(ctx) - api := newAPISrv("127.0.0.1:0", tls.Certificate{}, db, nil, true, true, 1000) + api := newAPISrv("127.0.0.1:0", tls.Certificate{}, db, nil, true, true, 1000, 1000) srv := httptest.NewServer(http.HandlerFunc(api.handler)) kf := b.TempDir() + "/cert" diff --git a/cmd/stdiscosrv/main.go b/cmd/stdiscosrv/main.go index e6812cf43..2534c9be3 100644 --- a/cmd/stdiscosrv/main.go +++ b/cmd/stdiscosrv/main.go @@ -57,13 +57,16 @@ const ( var debug = false type CLI struct { - Cert string `group:"Listen" help:"Certificate file" default:"./cert.pem" env:"DISCOVERY_CERT_FILE"` - Key string `group:"Listen" help:"Key file" default:"./key.pem" env:"DISCOVERY_KEY_FILE"` - HTTP bool `group:"Listen" help:"Listen on HTTP (behind an HTTPS proxy)" env:"DISCOVERY_HTTP"` - Compression bool `group:"Listen" help:"Enable GZIP compression of responses" env:"DISCOVERY_COMPRESSION"` - Listen string `group:"Listen" help:"Listen address" default:":8443" env:"DISCOVERY_LISTEN"` - MetricsListen string `group:"Listen" help:"Metrics listen address" env:"DISCOVERY_METRICS_LISTEN"` - DesiredNotFoundRate float64 `group:"Listen" help:"Desired maximum rate of not-found replies (/s)" default:"1000"` + Cert string `group:"Listen" help:"Certificate file" default:"./cert.pem" env:"DISCOVERY_CERT_FILE"` + Key string `group:"Listen" help:"Key file" default:"./key.pem" env:"DISCOVERY_KEY_FILE"` + HTTP bool `group:"Listen" help:"Listen on HTTP (behind an HTTPS proxy)" env:"DISCOVERY_HTTP"` + Compression bool `group:"Listen" help:"Enable GZIP compression of responses" env:"DISCOVERY_COMPRESSION"` + Listen string `group:"Listen" help:"Listen address" default:":8443" env:"DISCOVERY_LISTEN"` + MetricsListen string `group:"Listen" help:"Metrics listen address" env:"DISCOVERY_METRICS_LISTEN"` + DesiredUnseenNotFoundRate float64 `group:"Listen" help:"Desired maximum rate of not-found replies for never seen devices (/s)" default:"1000" env:"DISCOVERY_UNSEEN_RATE"` + DesiredSeenNotFoundRate float64 `group:"Listen" help:"Desired maximum rate of not-found replies for previously seen devices (/s)" default:"1000" env:"DISCOVERY_SEEN_RATE"` + + ShutdownDelay float64 `help:"Time to wait before shutdown after receiving a shutdown signal (s)" env:"DISCOVERY_SHUTDOWN_DELAY"` DBDir string `group:"Database" help:"Database directory" default:"." env:"DISCOVERY_DB_DIR"` DBFlushInterval time.Duration `group:"Database" help:"Interval between database flushes" default:"5m" env:"DISCOVERY_DB_FLUSH_INTERVAL"` @@ -145,7 +148,7 @@ func main() { } // Start the main API server. - qs := newAPISrv(cli.Listen, cert, db, repl, cli.HTTP, cli.Compression, cli.DesiredNotFoundRate) + qs := newAPISrv(cli.Listen, cert, db, repl, cli.HTTP, cli.Compression, cli.DesiredUnseenNotFoundRate, cli.DesiredSeenNotFoundRate) main.Add(qs) // If we have a metrics port configured, start a metrics handler. @@ -167,7 +170,8 @@ func main() { signal.Notify(signalChan, os.Interrupt) go func() { sig := <-signalChan - slog.Info("Received signal; shutting down", "signal", sig) + slog.Info("Received signal; shutting down", "signal", sig, "delay", cli.ShutdownDelay) + time.Sleep(time.Duration(float64(time.Second) * cli.ShutdownDelay)) cancel() }()