mirror of
https://github.com/syncthing/syncthing.git
synced 2026-04-08 16:38:03 -04:00
Merge branch 'infrastructure'
* infrastructure: chore(stdiscosrv): smooth retry-after delays over a slightly larger normal distribution chore(stdiscosrv): optionally delay shutdown chore(stdiscosrv): adjust desired seen and unseen rate separately
This commit is contained in:
@@ -67,7 +67,7 @@ type contextKey int
|
||||
|
||||
const idKey contextKey = iota
|
||||
|
||||
func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator, useHTTP, compression bool, desiredNotFoundRate float64) *apiSrv {
|
||||
func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator, useHTTP, compression bool, desiredUnseenNotFoundRate, desiredSeenNotFoundRate float64) *apiSrv {
|
||||
return &apiSrv{
|
||||
addr: addr,
|
||||
cert: cert,
|
||||
@@ -78,13 +78,13 @@ func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator,
|
||||
seenTracker: &retryAfterTracker{
|
||||
name: "seenTracker",
|
||||
bucketStarts: time.Now(),
|
||||
desiredRate: desiredNotFoundRate / 2,
|
||||
desiredRate: desiredSeenNotFoundRate,
|
||||
currentDelay: notFoundRetryUnknownMinSeconds,
|
||||
},
|
||||
notSeenTracker: &retryAfterTracker{
|
||||
name: "notSeenTracker",
|
||||
bucketStarts: time.Now(),
|
||||
desiredRate: desiredNotFoundRate / 2,
|
||||
desiredRate: desiredUnseenNotFoundRate,
|
||||
currentDelay: notFoundRetryUnknownMaxSeconds / 2,
|
||||
},
|
||||
}
|
||||
@@ -564,5 +564,17 @@ func (t *retryAfterTracker) retryAfterS() int {
|
||||
}
|
||||
t.curCount++
|
||||
t.mut.Unlock()
|
||||
return t.currentDelay + rand.Intn(t.currentDelay/4)
|
||||
|
||||
// Skewed normal distribution with the mean at currentDelay and the
|
||||
// limits (50% and 150%) at 3 standard deviations
|
||||
nf := rand.NormFloat64()
|
||||
minD := max(notFoundRetryUnknownMinSeconds, t.currentDelay/2)
|
||||
maxD := min(notFoundRetryUnknownMaxSeconds, t.currentDelay*3/2)
|
||||
intv := float64(maxD - t.currentDelay)
|
||||
if nf < 0 {
|
||||
intv = float64(t.currentDelay - minD)
|
||||
}
|
||||
nf = min(max(nf*intv/3+float64(t.currentDelay), notFoundRetryUnknownMinSeconds), notFoundRetryUnknownMaxSeconds)
|
||||
|
||||
return int(nf)
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"regexp"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/protocol"
|
||||
"github.com/syncthing/syncthing/lib/tlsutil"
|
||||
@@ -106,12 +107,59 @@ func addr(host string, port int) *net.TCPAddr {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRetryAfterSHistogram(t *testing.T) {
|
||||
tracker := &retryAfterTracker{
|
||||
name: "test",
|
||||
bucketStarts: time.Now(),
|
||||
desiredRate: 100,
|
||||
currentDelay: 1800,
|
||||
}
|
||||
|
||||
const n = 1000
|
||||
bucketSize := 60 // seconds per histogram bucket
|
||||
numBuckets := (notFoundRetryUnknownMaxSeconds + bucketSize - 1) / bucketSize
|
||||
buckets := make([]int, numBuckets)
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
v := tracker.retryAfterS()
|
||||
if v < notFoundRetryUnknownMinSeconds || v > notFoundRetryUnknownMaxSeconds {
|
||||
t.Fatalf("retryAfterS() = %d, out of range [%d, %d]", v, notFoundRetryUnknownMinSeconds, notFoundRetryUnknownMaxSeconds)
|
||||
}
|
||||
b := (v - 1) / bucketSize
|
||||
if b >= numBuckets {
|
||||
b = numBuckets - 1
|
||||
}
|
||||
buckets[b]++
|
||||
}
|
||||
|
||||
// Print a horizontal histogram
|
||||
maxCount := 0
|
||||
for _, c := range buckets {
|
||||
if c > maxCount {
|
||||
maxCount = c
|
||||
}
|
||||
}
|
||||
barWidth := 60
|
||||
for i, c := range buckets {
|
||||
lo := i*bucketSize + 1
|
||||
hi := (i + 1) * bucketSize
|
||||
if hi > notFoundRetryUnknownMaxSeconds {
|
||||
hi = notFoundRetryUnknownMaxSeconds
|
||||
}
|
||||
bar := ""
|
||||
if maxCount > 0 {
|
||||
bar = strings.Repeat("#", c*barWidth/maxCount)
|
||||
}
|
||||
t.Logf("%4d-%4ds | %-*s %d", lo, hi, barWidth, bar, c)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAPIRequests(b *testing.B) {
|
||||
db := newInMemoryStore(b.TempDir(), 0, nil)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go db.Serve(ctx)
|
||||
api := newAPISrv("127.0.0.1:0", tls.Certificate{}, db, nil, true, true, 1000)
|
||||
api := newAPISrv("127.0.0.1:0", tls.Certificate{}, db, nil, true, true, 1000, 1000)
|
||||
srv := httptest.NewServer(http.HandlerFunc(api.handler))
|
||||
|
||||
kf := b.TempDir() + "/cert"
|
||||
|
||||
@@ -57,13 +57,16 @@ const (
|
||||
var debug = false
|
||||
|
||||
type CLI struct {
|
||||
Cert string `group:"Listen" help:"Certificate file" default:"./cert.pem" env:"DISCOVERY_CERT_FILE"`
|
||||
Key string `group:"Listen" help:"Key file" default:"./key.pem" env:"DISCOVERY_KEY_FILE"`
|
||||
HTTP bool `group:"Listen" help:"Listen on HTTP (behind an HTTPS proxy)" env:"DISCOVERY_HTTP"`
|
||||
Compression bool `group:"Listen" help:"Enable GZIP compression of responses" env:"DISCOVERY_COMPRESSION"`
|
||||
Listen string `group:"Listen" help:"Listen address" default:":8443" env:"DISCOVERY_LISTEN"`
|
||||
MetricsListen string `group:"Listen" help:"Metrics listen address" env:"DISCOVERY_METRICS_LISTEN"`
|
||||
DesiredNotFoundRate float64 `group:"Listen" help:"Desired maximum rate of not-found replies (/s)" default:"1000"`
|
||||
Cert string `group:"Listen" help:"Certificate file" default:"./cert.pem" env:"DISCOVERY_CERT_FILE"`
|
||||
Key string `group:"Listen" help:"Key file" default:"./key.pem" env:"DISCOVERY_KEY_FILE"`
|
||||
HTTP bool `group:"Listen" help:"Listen on HTTP (behind an HTTPS proxy)" env:"DISCOVERY_HTTP"`
|
||||
Compression bool `group:"Listen" help:"Enable GZIP compression of responses" env:"DISCOVERY_COMPRESSION"`
|
||||
Listen string `group:"Listen" help:"Listen address" default:":8443" env:"DISCOVERY_LISTEN"`
|
||||
MetricsListen string `group:"Listen" help:"Metrics listen address" env:"DISCOVERY_METRICS_LISTEN"`
|
||||
DesiredUnseenNotFoundRate float64 `group:"Listen" help:"Desired maximum rate of not-found replies for never seen devices (/s)" default:"1000" env:"DISCOVERY_UNSEEN_RATE"`
|
||||
DesiredSeenNotFoundRate float64 `group:"Listen" help:"Desired maximum rate of not-found replies for previously seen devices (/s)" default:"1000" env:"DISCOVERY_SEEN_RATE"`
|
||||
|
||||
ShutdownDelay float64 `help:"Time to wait before shutdown after receiving a shutdown signal (s)" env:"DISCOVERY_SHUTDOWN_DELAY"`
|
||||
|
||||
DBDir string `group:"Database" help:"Database directory" default:"." env:"DISCOVERY_DB_DIR"`
|
||||
DBFlushInterval time.Duration `group:"Database" help:"Interval between database flushes" default:"5m" env:"DISCOVERY_DB_FLUSH_INTERVAL"`
|
||||
@@ -145,7 +148,7 @@ func main() {
|
||||
}
|
||||
|
||||
// Start the main API server.
|
||||
qs := newAPISrv(cli.Listen, cert, db, repl, cli.HTTP, cli.Compression, cli.DesiredNotFoundRate)
|
||||
qs := newAPISrv(cli.Listen, cert, db, repl, cli.HTTP, cli.Compression, cli.DesiredUnseenNotFoundRate, cli.DesiredSeenNotFoundRate)
|
||||
main.Add(qs)
|
||||
|
||||
// If we have a metrics port configured, start a metrics handler.
|
||||
@@ -167,7 +170,8 @@ func main() {
|
||||
signal.Notify(signalChan, os.Interrupt)
|
||||
go func() {
|
||||
sig := <-signalChan
|
||||
slog.Info("Received signal; shutting down", "signal", sig)
|
||||
slog.Info("Received signal; shutting down", "signal", sig, "delay", cli.ShutdownDelay)
|
||||
time.Sleep(time.Duration(float64(time.Second) * cli.ShutdownDelay))
|
||||
cancel()
|
||||
}()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user