feat(cli): added CLI for controlling throttler (#1956)

Supported are:

```
$ kopia throttle set \
    --download-bytes-per-second=N | unlimited
    --upload-bytes-per-second=N | unlimited
    --read-requests-per-second=N | unlimited
    --write-requests-per-second=N | unlimited
    --list-requests-per-second=N | unlimited
    --concurrent-reads=N | unlimited
    --concurrent-writes=N | unlimited
```

To change parameters of a running server use:

```
$ kopia server throttle set \
    --address=<server-url> \
    --server-control-password=<password> \
    --download-bytes-per-second=N | unlimited
    --upload-bytes-per-second=N | unlimited
    --read-requests-per-second=N | unlimited
    --write-requests-per-second=N | unlimited
    --list-requests-per-second=N | unlimited
    --concurrent-reads=N | unlimited
    --concurrent-writes=N | unlimited
```
This commit is contained in:
Jarek Kowalski
2022-05-18 01:27:06 -07:00
committed by GitHub
parent ebd8f113c6
commit 99eeb3c063
22 changed files with 737 additions and 22 deletions

View File

@@ -0,0 +1,81 @@
package cli_test
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/testutil"
"github.com/kopia/kopia/repo/blob/throttling"
"github.com/kopia/kopia/tests/testenv"
)
func TestRepoThrottle(t *testing.T) {
t.Parallel()
env := testenv.NewCLITest(t, testenv.RepoFormatNotImportant, testenv.NewInProcRunner(t))
env.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", env.RepoDir)
defer env.RunAndExpectSuccess(t, "repo", "disconnect")
require.Equal(t, []string{
"Max Download Speed: (unlimited)",
"Max Upload Speed: (unlimited)",
"Max Read Requests Per Second: (unlimited)",
"Max Write Requests Per Second: (unlimited)",
"Max List Requests Per Second: (unlimited)",
"Max Concurrent Reads: (unlimited)",
"Max Concurrent Writes: (unlimited)",
}, env.RunAndExpectSuccess(t, "repo", "throttle", "get"))
env.RunAndExpectSuccess(t, "repo", "throttle", "set",
"--download-bytes-per-second=1000000000",
"--upload-bytes-per-second=2000000000",
"--read-requests-per-second=300",
"--write-requests-per-second=400",
"--list-requests-per-second=500",
"--concurrent-reads=300",
"--concurrent-writes=400",
)
env.RunAndExpectFailure(t, "repo", "throttle", "set", "--download-bytes-per-second=-30")
env.RunAndExpectFailure(t, "repo", "throttle", "set", "--concurrent-reads=-3")
require.Equal(t, []string{
"Max Download Speed: 1 GB/s",
"Max Upload Speed: 2 GB/s",
"Max Read Requests Per Second: 300",
"Max Write Requests Per Second: 400",
"Max List Requests Per Second: 500",
"Max Concurrent Reads: 300",
"Max Concurrent Writes: 400",
}, env.RunAndExpectSuccess(t, "repo", "throttle", "get"))
env.RunAndExpectSuccess(t, "repo", "throttle", "set",
"--upload-bytes-per-second=unlimited",
"--write-requests-per-second=unlimited",
)
require.Equal(t, []string{
"Max Download Speed: 1 GB/s",
"Max Upload Speed: (unlimited)",
"Max Read Requests Per Second: 300",
"Max Write Requests Per Second: (unlimited)",
"Max List Requests Per Second: 500",
"Max Concurrent Reads: 300",
"Max Concurrent Writes: 400",
}, env.RunAndExpectSuccess(t, "repo", "throttle", "get"))
var limits throttling.Limits
testutil.MustParseJSONLines(t, env.RunAndExpectSuccess(t, "repo", "throttle", "get", "--json"), &limits)
require.Equal(t, throttling.Limits{
ReadsPerSecond: 300,
WritesPerSecond: 0,
ListsPerSecond: 500,
UploadBytesPerSecond: 0,
DownloadBytesPerSecond: 1e+09,
ConcurrentReads: 300,
ConcurrentWrites: 400,
}, limits)
}

View File

@@ -10,6 +10,7 @@ type commandRepository struct {
changePassword commandRepositoryChangePassword
status commandRepositoryStatus
syncTo commandRepositorySyncTo
throttle commandRepositoryThrottle
validateProvider commandRepositoryValidateProvider
}
@@ -24,6 +25,7 @@ func (c *commandRepository) setup(svc advancedAppServices, parent commandParent)
c.setParameters.setup(svc, cmd)
c.status.setup(svc, cmd)
c.syncTo.setup(svc, cmd) // nolint:contextcheck
c.throttle.setup(svc, cmd)
c.changePassword.setup(svc, cmd)
c.validateProvider.setup(svc, cmd)
}

View File

@@ -0,0 +1,13 @@
package cli
type commandRepositoryThrottle struct {
get commandRepositoryThrottleGet
set commandRepositoryThrottleSet
}
func (c *commandRepositoryThrottle) setup(svc appServices, parent commandParent) {
cmd := parent.Command("throttle", "Commands to manipulate throttle configuration")
c.get.setup(svc, cmd)
c.set.setup(svc, cmd)
}

View File

@@ -0,0 +1,28 @@
package cli
import (
"context"
"github.com/kopia/kopia/repo"
)
type commandRepositoryThrottleGet struct {
ctg commonThrottleGet
}
func (c *commandRepositoryThrottleGet) setup(svc appServices, parent commandParent) {
cmd := parent.Command("get", "Get throttling parameters for a repository")
c.ctg.setup(svc, cmd)
cmd.Action(svc.directRepositoryReadAction(c.run))
}
func (c *commandRepositoryThrottleGet) run(ctx context.Context, rep repo.DirectRepository) error {
limits := rep.Throttler().Limits()
if err := c.ctg.output(&limits); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,38 @@
package cli
import (
"context"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo"
)
type commandRepositoryThrottleSet struct {
cts commonThrottleSet
}
func (c *commandRepositoryThrottleSet) setup(svc appServices, parent commandParent) {
cmd := parent.Command("set", "Set throttling parameters for a repository")
c.cts.setup(cmd)
cmd.Action(svc.directRepositoryWriteAction(c.run))
}
func (c *commandRepositoryThrottleSet) run(ctx context.Context, rep repo.DirectRepositoryWriter) error {
thr := rep.Throttler()
limits := thr.Limits()
var changeCount int
if err := c.cts.apply(ctx, &limits, &changeCount); err != nil {
return err
}
if changeCount == 0 {
log(ctx).Infof("No changes made.")
return nil
}
return errors.Wrap(rep.Throttler().SetLimits(limits), "error setting limits")
}

View File

@@ -17,6 +17,7 @@ type commandServer struct {
resume commandServerResume
start commandServerStart
status commandServerStatus
throttle commandServerThrottle
upload commandServerUpload
shutdown commandServerShutdown
}
@@ -70,6 +71,7 @@ func (c *commandServer) setup(svc advancedAppServices, parent commandParent) {
c.cancel.setup(svc, cmd)
c.pause.setup(svc, cmd)
c.resume.setup(svc, cmd)
c.throttle.setup(svc, cmd)
}
func (c *serverClientFlags) serverAPIClientOptions() (apiclient.Options, error) {

View File

@@ -7,6 +7,7 @@
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/testutil"
"github.com/kopia/kopia/repo/blob/throttling"
"github.com/kopia/kopia/tests/testenv"
)
@@ -92,6 +93,58 @@ func TestServerControl(t *testing.T) {
env.RunAndExpectSuccess(t, "server", "pause", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword, dir1)
env.RunAndExpectSuccess(t, "server", "resume", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword, dir1)
env.RunAndExpectSuccess(t, "server", "throttle", "set", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword,
"--download-bytes-per-second=1000000000",
"--upload-bytes-per-second=2000000000",
"--read-requests-per-second=300",
"--write-requests-per-second=400",
"--list-requests-per-second=500",
"--concurrent-reads=300",
"--concurrent-writes=400",
)
require.Equal(t, []string{
"Max Download Speed: 1 GB/s",
"Max Upload Speed: 2 GB/s",
"Max Read Requests Per Second: 300",
"Max Write Requests Per Second: 400",
"Max List Requests Per Second: 500",
"Max Concurrent Reads: 300",
"Max Concurrent Writes: 400",
}, env.RunAndExpectSuccess(t, "server", "throttle", "get", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword))
env.RunAndExpectSuccess(t, "server", "throttle", "set", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword,
"--upload-bytes-per-second=unlimited",
"--write-requests-per-second=unlimited",
)
env.RunAndExpectFailure(t, "server", "throttle", "set", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword,
"--upload-bytes-per-second=-10",
)
require.Equal(t, []string{
"Max Download Speed: 1 GB/s",
"Max Upload Speed: (unlimited)",
"Max Read Requests Per Second: 300",
"Max Write Requests Per Second: (unlimited)",
"Max List Requests Per Second: 500",
"Max Concurrent Reads: 300",
"Max Concurrent Writes: 400",
}, env.RunAndExpectSuccess(t, "server", "throttle", "get", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword))
var limits throttling.Limits
testutil.MustParseJSONLines(t, env.RunAndExpectSuccess(t, "server", "throttle", "get", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword, "--json"), &limits)
require.Equal(t, throttling.Limits{
ReadsPerSecond: 300,
WritesPerSecond: 0,
ListsPerSecond: 500,
UploadBytesPerSecond: 0,
DownloadBytesPerSecond: 1e+09,
ConcurrentReads: 300,
ConcurrentWrites: 400,
}, limits)
env.RunAndExpectSuccess(t, "server", "shutdown", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword)
select {

View File

@@ -0,0 +1,12 @@
package cli
type commandServerThrottle struct {
get commandServerThrottleGet
set commandServerThrottleSet
}
func (c *commandServerThrottle) setup(svc appServices, parent commandParent) {
cmd := parent.Command("throttle", "Control throttling parameters for a running server")
c.get.setup(svc, cmd)
c.set.setup(svc, cmd)
}

View File

@@ -0,0 +1,33 @@
package cli
import (
"context"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/repo/blob/throttling"
)
type commandServerThrottleGet struct {
sf serverClientFlags
ctg commonThrottleGet
}
func (c *commandServerThrottleGet) setup(svc appServices, parent commandParent) {
cmd := parent.Command("get", "Get throttling parameters for a running server")
c.sf.setup(cmd)
c.ctg.setup(svc, cmd)
cmd.Action(svc.serverAction(&c.sf, c.run))
}
func (c *commandServerThrottleGet) run(ctx context.Context, cli *apiclient.KopiaAPIClient) error {
var limits throttling.Limits
if err := cli.Get(ctx, "control/throttle", nil, &limits); err != nil {
return errors.Wrap(err, "unable to get current throttle")
}
return c.ctg.output(&limits)
}

View File

@@ -0,0 +1,50 @@
package cli
import (
"context"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/repo/blob/throttling"
)
type commandServerThrottleSet struct {
sf serverClientFlags
cts commonThrottleSet
}
func (c *commandServerThrottleSet) setup(svc appServices, parent commandParent) {
cmd := parent.Command("set", "Set throttling parameters for a running server")
c.sf.setup(cmd)
c.cts.setup(cmd)
cmd.Action(svc.serverAction(&c.sf, c.run))
}
func (c *commandServerThrottleSet) run(ctx context.Context, cli *apiclient.KopiaAPIClient) error {
var limits throttling.Limits
if err := cli.Get(ctx, "control/throttle", nil, &limits); err != nil {
return errors.Wrap(err, "unable to get current throttle")
}
var changeCount int
if err := c.cts.apply(ctx, &limits, &changeCount); err != nil {
return err
}
if changeCount == 0 {
log(ctx).Infof("No changes made.")
return nil
}
if err := cli.Put(ctx, "control/throttle", &limits, &serverapi.Empty{}); err != nil {
return errors.Wrap(err, "unable to change throttle")
}
return nil
}

49
cli/throttle_get.go Normal file
View File

@@ -0,0 +1,49 @@
package cli
import (
"strconv"
"github.com/alecthomas/kingpin"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo/blob/throttling"
)
type commonThrottleGet struct {
out textOutput
jo jsonOutput
}
func (c *commonThrottleGet) setup(svc appServices, cmd *kingpin.CmdClause) {
c.out.setup(svc)
c.jo.setup(svc, cmd)
}
func (c *commonThrottleGet) output(limits *throttling.Limits) error {
if c.jo.jsonOutput {
c.out.printStdout("%s\n", c.jo.jsonBytes(limits))
return nil
}
c.printValueOrUnlimited("Max Download Speed:", limits.DownloadBytesPerSecond, units.BytesPerSecondsString)
c.printValueOrUnlimited("Max Upload Speed:", limits.UploadBytesPerSecond, units.BytesPerSecondsString)
c.printValueOrUnlimited("Max Read Requests Per Second:", limits.ReadsPerSecond, c.floatToString)
c.printValueOrUnlimited("Max Write Requests Per Second:", limits.WritesPerSecond, c.floatToString)
c.printValueOrUnlimited("Max List Requests Per Second:", limits.ListsPerSecond, c.floatToString)
c.printValueOrUnlimited("Max Concurrent Reads:", float64(limits.ConcurrentReads), c.floatToString)
c.printValueOrUnlimited("Max Concurrent Writes:", float64(limits.ConcurrentWrites), c.floatToString)
return nil
}
func (c *commonThrottleGet) printValueOrUnlimited(label string, v float64, convert func(v float64) string) {
if v != 0 {
c.out.printStdout("%-30v %v\n", label, convert(v))
} else {
c.out.printStdout("%-30v (unlimited)\n", label)
}
}
func (c *commonThrottleGet) floatToString(v float64) string {
return strconv.FormatFloat(v, 'f', 0, 64) // nolint:gomnd
}

130
cli/throttle_set.go Normal file
View File

@@ -0,0 +1,130 @@
package cli
import (
"context"
"strconv"
"github.com/alecthomas/kingpin"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo/blob/throttling"
)
type commonThrottleSet struct {
setDownloadBytesPerSecond string
setUploadBytesPerSecond string
setReadsPerSecond string
setWritesPerSecond string
setListsPerSecond string
setConcurrentReads string
setConcurrentWrites string
}
func (c *commonThrottleSet) setup(cmd *kingpin.CmdClause) {
cmd.Flag("download-bytes-per-second", "Set the download bytes per second").StringVar(&c.setDownloadBytesPerSecond)
cmd.Flag("upload-bytes-per-second", "Set the upload bytes per second").StringVar(&c.setUploadBytesPerSecond)
cmd.Flag("read-requests-per-second", "Set max reads per second").StringVar(&c.setReadsPerSecond)
cmd.Flag("write-requests-per-second", "Set max writes per second").StringVar(&c.setWritesPerSecond)
cmd.Flag("list-requests-per-second", "Set max lists per second").StringVar(&c.setListsPerSecond)
cmd.Flag("concurrent-reads", "Set max concurrent reads").StringVar(&c.setConcurrentReads)
cmd.Flag("concurrent-writes", "Set max concurrent writes").StringVar(&c.setConcurrentWrites)
}
func (c *commonThrottleSet) apply(ctx context.Context, limits *throttling.Limits, changeCount *int) error {
if err := c.setThrottleFloat64(ctx, "max download speed", true, &limits.DownloadBytesPerSecond, c.setDownloadBytesPerSecond, changeCount); err != nil {
return err
}
if err := c.setThrottleFloat64(ctx, "max upload speed", true, &limits.UploadBytesPerSecond, c.setUploadBytesPerSecond, changeCount); err != nil {
return err
}
if err := c.setThrottleFloat64(ctx, "reads per second", false, &limits.ReadsPerSecond, c.setReadsPerSecond, changeCount); err != nil {
return err
}
if err := c.setThrottleFloat64(ctx, "writes per second", false, &limits.WritesPerSecond, c.setWritesPerSecond, changeCount); err != nil {
return err
}
if err := c.setThrottleFloat64(ctx, "lists per second", false, &limits.ListsPerSecond, c.setListsPerSecond, changeCount); err != nil {
return err
}
if err := c.setThrottleInt(ctx, "concurrent reads", &limits.ConcurrentReads, c.setConcurrentReads, changeCount); err != nil {
return err
}
if err := c.setThrottleInt(ctx, "concurrent writes", &limits.ConcurrentWrites, c.setConcurrentWrites, changeCount); err != nil {
return err
}
return nil
}
func (c *commonThrottleSet) setThrottleFloat64(ctx context.Context, desc string, bps bool, val *float64, str string, changeCount *int) error {
if str == "" {
// not changed
return nil
}
if str == "unlimited" || str == "-" {
*changeCount++
log(ctx).Infof("Setting %v to a unlimited.", desc)
*val = 0
return nil
}
// nolint:gomnd
v, err := strconv.ParseFloat(str, 64)
if err != nil {
return errors.Wrapf(err, "can't parse the %v %q", desc, str)
}
*changeCount++
if bps {
log(ctx).Infof("Setting %v to %v.", desc, units.BytesPerSecondsString(v))
} else {
log(ctx).Infof("Setting %v to %v.", desc, v)
}
*val = v
return nil
}
func (c *commonThrottleSet) setThrottleInt(ctx context.Context, desc string, val *int, str string, changeCount *int) error {
if str == "" {
// not changed
return nil
}
if str == "unlimited" || str == "-" {
*changeCount++
log(ctx).Infof("Setting %v to a unlimited.", desc)
*val = 0
return nil
}
// nolint:gomnd
v, err := strconv.ParseInt(str, 10, 64)
if err != nil {
return errors.Wrapf(err, "can't parse the %v %q", desc, str)
}
*changeCount++
log(ctx).Infof("Setting %v to %v.", desc, v)
*val = int(v)
return nil
}

View File

@@ -177,6 +177,8 @@ func (s *Server) SetupControlAPIHandlers(m *mux.Router) {
m.HandleFunc("/api/v1/control/cancel-snapshot", s.handleServerControlAPI(handleCancel)).Methods(http.MethodPost)
m.HandleFunc("/api/v1/control/pause-source", s.handleServerControlAPI(handlePause)).Methods(http.MethodPost)
m.HandleFunc("/api/v1/control/resume-source", s.handleServerControlAPI(handleResume)).Methods(http.MethodPost)
m.HandleFunc("/api/v1/control/throttle", s.handleServerControlAPI(handleRepoGetThrottle)).Methods(http.MethodGet)
m.HandleFunc("/api/v1/control/throttle", s.handleServerControlAPI(handleRepoSetThrottle)).Methods(http.MethodPut)
}
func isAuthenticated(rc requestContext) bool {

View File

@@ -108,10 +108,12 @@ func MyTestMain(m *testing.M, cleanups ...func()) {
v := m.Run()
if err := releasable.Verify(); err != nil {
log.Printf("found leaks: %v", err)
if v == 0 {
if err := releasable.Verify(); err != nil {
log.Printf("found leaks: %v", err)
v = 1
v = 1
}
}
for _, c := range cleanups {

View File

@@ -14,8 +14,12 @@ type SettableThrottler interface {
Limits() Limits
SetLimits(limits Limits) error
OnUpdate(handler UpdatedHandler)
}
// UpdatedHandler is invoked as part of SetLimits() after limits are updated.
type UpdatedHandler func(l Limits) error
type tokenBucketBasedThrottler struct {
mu sync.Mutex
// +checklocks:mu
@@ -26,7 +30,13 @@ type tokenBucketBasedThrottler struct {
listOps *tokenBucket
upload *tokenBucket
download *tokenBucket
window time.Duration // +checklocksignore
concurrentReads *semaphore
concurrentWrites *semaphore
window time.Duration // +checklocksignore
onUpdate []UpdatedHandler
}
func (t *tokenBucketBasedThrottler) BeforeOperation(ctx context.Context, op string) {
@@ -35,8 +45,20 @@ func (t *tokenBucketBasedThrottler) BeforeOperation(ctx context.Context, op stri
t.listOps.Take(ctx, 1)
case operationGetBlob, operationGetMetadata:
t.readOps.Take(ctx, 1)
t.concurrentReads.Acquire()
case operationPutBlob, operationDeleteBlob:
t.writeOps.Take(ctx, 1)
t.concurrentWrites.Acquire()
}
}
func (t *tokenBucketBasedThrottler) AfterOperation(ctx context.Context, op string) {
switch op {
case operationListBlobs:
case operationGetBlob, operationGetMetadata:
t.concurrentReads.Release()
case operationPutBlob, operationDeleteBlob:
t.concurrentWrites.Release()
}
}
@@ -64,8 +86,23 @@ func (t *tokenBucketBasedThrottler) SetLimits(limits Limits) error {
t.mu.Lock()
defer t.mu.Unlock()
if err := t.setLimits(limits); err != nil {
_ = t.setLimits(t.limits)
return err
}
t.limits = limits
for _, h := range t.onUpdate {
if err := h(limits); err != nil {
return err
}
}
return nil
}
func (t *tokenBucketBasedThrottler) setLimits(limits Limits) error {
if err := t.readOps.SetLimit(limits.ReadsPerSecond * t.window.Seconds()); err != nil {
return errors.Wrap(err, "ReadsPerSecond")
}
@@ -86,9 +123,21 @@ func (t *tokenBucketBasedThrottler) SetLimits(limits Limits) error {
return errors.Wrap(err, "DownloadBytesPerSecond")
}
if err := t.concurrentReads.SetLimit(limits.ConcurrentReads); err != nil {
return errors.Wrap(err, "ConcurrentReads")
}
if err := t.concurrentWrites.SetLimit(limits.ConcurrentWrites); err != nil {
return errors.Wrap(err, "ConcurrentWrites")
}
return nil
}
func (t *tokenBucketBasedThrottler) OnUpdate(handler UpdatedHandler) {
t.onUpdate = append(t.onUpdate, handler)
}
// Limits encapsulates all limits for a Throttler.
type Limits struct {
ReadsPerSecond float64 `json:"readsPerSecond,omitempty"`
@@ -96,6 +145,8 @@ type Limits struct {
ListsPerSecond float64 `json:"listsPerSecond,omitempty"`
UploadBytesPerSecond float64 `json:"maxUploadSpeedBytesPerSecond,omitempty"`
DownloadBytesPerSecond float64 `json:"maxDownloadSpeedBytesPerSecond,omitempty"`
ConcurrentReads int `json:"concurrentReads,omitempty"`
ConcurrentWrites int `json:"concurrentWrites,omitempty"`
}
var _ Throttler = (*tokenBucketBasedThrottler)(nil)
@@ -103,12 +154,14 @@ type Limits struct {
// NewThrottler returns a Throttler with provided limits.
func NewThrottler(limits Limits, window time.Duration, initialFillRatio float64) (SettableThrottler, error) {
t := &tokenBucketBasedThrottler{
readOps: newTokenBucket("read-ops", initialFillRatio*limits.ReadsPerSecond*window.Seconds(), 0, window),
writeOps: newTokenBucket("write-ops", initialFillRatio*limits.WritesPerSecond*window.Seconds(), 0, window),
listOps: newTokenBucket("list-ops", initialFillRatio*limits.ListsPerSecond*window.Seconds(), 0, window),
upload: newTokenBucket("upload-bytes", initialFillRatio*limits.UploadBytesPerSecond*window.Seconds(), 0, window),
download: newTokenBucket("download-bytes", initialFillRatio*limits.DownloadBytesPerSecond*window.Seconds(), 0, window),
window: window,
readOps: newTokenBucket("read-ops", initialFillRatio*limits.ReadsPerSecond*window.Seconds(), 0, window),
writeOps: newTokenBucket("write-ops", initialFillRatio*limits.WritesPerSecond*window.Seconds(), 0, window),
listOps: newTokenBucket("list-ops", initialFillRatio*limits.ListsPerSecond*window.Seconds(), 0, window),
upload: newTokenBucket("upload-bytes", initialFillRatio*limits.UploadBytesPerSecond*window.Seconds(), 0, window),
download: newTokenBucket("download-bytes", initialFillRatio*limits.DownloadBytesPerSecond*window.Seconds(), 0, window),
concurrentReads: newSemaphore(),
concurrentWrites: newSemaphore(),
window: window,
}
if err := t.SetLimits(limits); err != nil {

View File

@@ -0,0 +1,63 @@
package throttling
import (
"sync"
"github.com/pkg/errors"
)
type semaphore struct {
mu sync.Mutex
// +checklocks:mu
sem chan struct{}
}
func (s *semaphore) getChan() chan struct{} {
s.mu.Lock()
defer s.mu.Unlock()
return s.sem
}
func (s *semaphore) Acquire() {
ch := s.getChan()
if ch == nil {
return
}
// push to channel, may block
ch <- struct{}{}
}
func (s *semaphore) Release() {
ch := s.getChan()
if ch == nil {
return
}
select {
case <-ch: // removed one entry from channel
default: // this can happen when we reset a channel to a lower value
}
}
func (s *semaphore) SetLimit(limit int) error {
s.mu.Lock()
defer s.mu.Unlock()
if limit < 0 {
return errors.Errorf("invalid limit")
}
if limit > 0 {
s.sem = make(chan struct{}, limit)
} else {
s.sem = nil
}
return nil
}
func newSemaphore() *semaphore {
return &semaphore{}
}

View File

@@ -0,0 +1,64 @@
package throttling
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestThrottlingSemaphore(t *testing.T) {
s := newSemaphore()
// default is unlimited
s.Acquire()
s.Release()
require.Error(t, s.SetLimit(-1))
for _, lim := range []int{3, 5, 7} {
require.NoError(t, s.SetLimit(lim))
var (
wg sync.WaitGroup
mu sync.Mutex
concurrency int
maxConcurrency int
)
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10; j++ {
s.Acquire()
mu.Lock()
concurrency++
if concurrency > maxConcurrency {
maxConcurrency = concurrency
}
mu.Unlock()
time.Sleep(10 * time.Millisecond)
mu.Lock()
concurrency--
mu.Unlock()
s.Release()
}
}()
}
wg.Wait()
// Equal() would probably work here due to Sleep(), but not risking a flake.
require.LessOrEqual(t, maxConcurrency, lim)
require.Greater(t, maxConcurrency, 0)
}
}

View File

@@ -25,6 +25,7 @@
// attempted to ensure we don't exceed the desired rate of operations/bytes uploaded/downloaded.
type Throttler interface {
BeforeOperation(ctx context.Context, op string)
AfterOperation(ctx context.Context, op string)
// BeforeDownload acquires the specified number of downloaded bytes
// possibly blocking until enough are available.
@@ -51,6 +52,8 @@ func (s *throttlingStorage) GetBlob(ctx context.Context, id blob.ID, offset, len
}
s.throttler.BeforeOperation(ctx, operationGetBlob)
defer s.throttler.AfterOperation(ctx, operationGetBlob)
s.throttler.BeforeDownload(ctx, acquired)
output.Reset()
@@ -73,17 +76,22 @@ func (s *throttlingStorage) GetBlob(ctx context.Context, id blob.ID, offset, len
func (s *throttlingStorage) GetMetadata(ctx context.Context, id blob.ID) (blob.Metadata, error) {
s.throttler.BeforeOperation(ctx, operationGetMetadata)
defer s.throttler.AfterOperation(ctx, operationGetMetadata)
return s.Storage.GetMetadata(ctx, id) // nolint:wrapcheck
}
func (s *throttlingStorage) ListBlobs(ctx context.Context, blobIDPrefix blob.ID, cb func(bm blob.Metadata) error) error {
s.throttler.BeforeOperation(ctx, operationListBlobs)
defer s.throttler.AfterOperation(ctx, operationListBlobs)
return s.Storage.ListBlobs(ctx, blobIDPrefix, cb) // nolint:wrapcheck
}
func (s *throttlingStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes, opts blob.PutOptions) error {
s.throttler.BeforeOperation(ctx, operationPutBlob)
defer s.throttler.AfterOperation(ctx, operationPutBlob)
s.throttler.BeforeUpload(ctx, int64(data.Length()))
return s.Storage.PutBlob(ctx, id, data, opts) // nolint:wrapcheck
@@ -91,6 +99,8 @@ func (s *throttlingStorage) PutBlob(ctx context.Context, id blob.ID, data blob.B
func (s *throttlingStorage) DeleteBlob(ctx context.Context, id blob.ID) error {
s.throttler.BeforeOperation(ctx, operationDeleteBlob)
defer s.throttler.AfterOperation(ctx, operationDeleteBlob)
return s.Storage.DeleteBlob(ctx, id) // nolint:wrapcheck
}

View File

@@ -28,6 +28,10 @@ func (m *mockThrottler) BeforeOperation(ctx context.Context, op string) {
m.activity = append(m.activity, fmt.Sprintf("BeforeOperation(%v)", op))
}
func (m *mockThrottler) AfterOperation(ctx context.Context, op string) {
m.activity = append(m.activity, fmt.Sprintf("AfterOperation(%v)", op))
}
func (m *mockThrottler) BeforeDownload(ctx context.Context, numBytes int64) {
m.activity = append(m.activity, fmt.Sprintf("BeforeDownload(%v)", numBytes))
}
@@ -66,6 +70,7 @@ func TestThrottling(t *testing.T) {
"inner.concurrency level reached",
"inner.GetBlob",
"ReturnUnusedDownloadBytes(20000000)",
"AfterOperation(GetBlob)",
}, m.activity)
// upload blob of 7 bytes
@@ -75,6 +80,7 @@ func TestThrottling(t *testing.T) {
"BeforeOperation(PutBlob)",
"BeforeUpload(7)",
"inner.PutBlob",
"AfterOperation(PutBlob)",
}, m.activity)
// upload another blob of 30MB
@@ -84,6 +90,7 @@ func TestThrottling(t *testing.T) {
"BeforeOperation(PutBlob)",
"BeforeUpload(30000000)",
"inner.PutBlob",
"AfterOperation(PutBlob)",
}, m.activity)
m.Reset()
@@ -93,6 +100,7 @@ func TestThrottling(t *testing.T) {
"BeforeDownload(20000000)", // length is unknown, we assume 20MB
"inner.GetBlob",
"ReturnUnusedDownloadBytes(19999993)", // refund all but 7 bytes
"AfterOperation(GetBlob)",
}, m.activity)
m.Reset()
@@ -102,6 +110,7 @@ func TestThrottling(t *testing.T) {
"BeforeDownload(20000000)", // length is unknown, we assume 20MB
"inner.GetBlob",
"BeforeDownload(10000000)", // we downloaded more than expected, acquire more
"AfterOperation(GetBlob)",
}, m.activity)
m.Reset()
@@ -110,6 +119,7 @@ func TestThrottling(t *testing.T) {
"BeforeOperation(GetBlob)",
"BeforeDownload(4)",
"inner.GetBlob",
"AfterOperation(GetBlob)",
}, m.activity)
m.Reset()
@@ -119,6 +129,7 @@ func TestThrottling(t *testing.T) {
require.Equal(t, []string{
"BeforeOperation(GetMetadata)",
"inner.GetMetadata",
"AfterOperation(GetMetadata)",
}, m.activity)
m.Reset()
@@ -126,6 +137,7 @@ func TestThrottling(t *testing.T) {
require.Equal(t, []string{
"BeforeOperation(DeleteBlob)",
"inner.DeleteBlob",
"AfterOperation(DeleteBlob)",
}, m.activity)
m.Reset()
@@ -135,5 +147,6 @@ func TestThrottling(t *testing.T) {
require.Equal(t, []string{
"BeforeOperation(ListBlobs)",
"inner.ListBlobs",
"AfterOperation(ListBlobs)",
}, m.activity)
}

View File

@@ -13,6 +13,7 @@
"github.com/kopia/kopia/internal/atomicfile"
"github.com/kopia/kopia/internal/ospath"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/throttling"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/object"
)
@@ -32,6 +33,8 @@ type ClientOptions struct {
EnableActions bool `json:"enableActions"`
FormatBlobCacheDuration time.Duration `json:"formatBlobCacheDuration,omitempty"`
Throttling *throttling.Limits `json:"throttlingLimits,omitempty"`
}
// ApplyDefaults returns a copy of ClientOptions with defaults filled out.

View File

@@ -294,11 +294,27 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
cmOpts.RepositoryFormatBytes = nil
}
st, throttler, err := addThrottler(ctx, st)
limits := throttlingLimitsFromConnectionInfo(ctx, st.ConnectionInfo())
if lc.Throttling != nil {
limits = *lc.Throttling
}
st, throttler, err := addThrottler(st, limits)
if err != nil {
return nil, errors.Wrap(err, "unable to add throttler")
}
throttler.OnUpdate(func(l throttling.Limits) error {
lc2, err2 := LoadConfigFromFile(configFile)
if err2 != nil {
return err2
}
lc2.Throttling = &l
return lc2.writeToFile(configFile)
})
if blobcfg.IsRetentionEnabled() {
st = wrapLockingStorage(st, blobcfg)
}
@@ -328,12 +344,11 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
}
dr := &directRepository{
cmgr: cm,
omgr: om,
blobs: st,
mmgr: manifests,
sm: scm,
throttler: throttler,
cmgr: cm,
omgr: om,
blobs: st,
mmgr: manifests,
sm: scm,
directRepositoryParameters: directRepositoryParameters{
uniqueID: ufb.f.UniqueID,
cachingOptions: *cacheOpts,
@@ -344,6 +359,7 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
cliOpts: lc.ClientOptions.ApplyDefaults(ctx, "Repository in "+st.DisplayName()),
configFile: configFile,
nextWriterID: new(int32),
throttler: throttler,
},
closed: make(chan struct{}),
}
@@ -373,9 +389,8 @@ func wrapLockingStorage(st blob.Storage, r content.BlobCfgBlob) blob.Storage {
})
}
func addThrottler(ctx context.Context, st blob.Storage) (blob.Storage, throttling.SettableThrottler, error) {
throttler, err := throttling.NewThrottler(
throttlingLimitsFromConnectionInfo(ctx, st.ConnectionInfo()), throttlingWindow, throttleBucketInitialFill)
func addThrottler(st blob.Storage, limits throttling.Limits) (blob.Storage, throttling.SettableThrottler, error) {
throttler, err := throttling.NewThrottler(limits, throttlingWindow, throttleBucketInitialFill)
if err != nil {
return nil, nil, errors.Wrap(err, "unable to create throttler")
}

View File

@@ -87,6 +87,7 @@ type directRepositoryParameters struct {
blobCfgBlob content.BlobCfgBlob
formatEncryptionKey []byte
nextWriterID *int32
throttler throttling.SettableThrottler
}
// directRepository is an implementation of repository that directly manipulates underlying storage.
@@ -99,8 +100,6 @@ type directRepository struct {
mmgr *manifest.Manager
sm *content.SharedManager
throttler throttling.SettableThrottler
closed chan struct{}
}